OXIESEC PANEL
- Current Dir:
/
/
usr
/
lib
/
python3
/
dist-packages
/
twisted
/
test
Server IP: 139.59.38.164
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📄
__init__.py
103 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📁
__pycache__
-
03/31/2022 06:22:39 AM
rwxr-xr-x
📄
cert.pem.no_trailing_newline
1.38 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
crash_test_dummy.py
543 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
iosim.py
17.3 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
key.pem.no_trailing_newline
1.67 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
mock_win32process.py
1.46 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
myrebuilder1.py
158 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
myrebuilder2.py
158 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
plugin_basic.py
943 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
plugin_extra1.py
407 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
plugin_extra2.py
579 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_cmdline.py
162 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_echoer.py
214 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_fds.py
945 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
process_getargv.py
283 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
process_getenv.py
268 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
process_linger.py
286 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_reader.py
188 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_signal.py
214 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_stdinreader.py
857 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
process_tester.py
1.01 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_tty.py
130 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
process_twisted.py
1.18 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
proto_helpers.py
26.33 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
raiser.c
93.05 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
raiser.cpython-36m-x86_64-linux-gnu.so
19.16 KB
03/22/2022 11:03:56 AM
rw-r--r--
📄
raiser.pyx
466 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
reflect_helper_IE.py
61 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
reflect_helper_VE.py
82 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
reflect_helper_ZDE.py
47 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
server.pem
4.34 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
ssl_helpers.py
1.01 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_consumer.py
1.19 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_halfclose.py
1.89 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_hostpeer.py
1021 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_lastwrite.py
1.18 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_loseconn.py
1.51 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_producer.py
1.47 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_write.py
923 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_writeseq.py
915 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_abstract.py
3.42 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_adbapi.py
25.53 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_amp.py
107.96 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_application.py
32.05 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_compat.py
27.32 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_context.py
1.48 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_cooperator.py
20.96 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_defer.py
100.93 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_defgen.py
10.45 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_dict.py
1.41 KB
09/23/2017 05:52:22 AM
rw-r--r--
📄
test_dirdbm.py
6.76 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
test_error.py
8.39 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_factories.py
4.53 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_failure.py
29.92 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_fdesc.py
7.2 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_finger.py
1.95 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_formmethod.py
3.56 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_ftp.py
127.27 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_ftp_options.py
2.62 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_htb.py
3.12 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_ident.py
6.85 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_internet.py
45.33 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_iosim.py
8.49 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
test_iutils.py
13.13 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_lockfile.py
15.14 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_log.py
35.48 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_logfile.py
17.8 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_loopback.py
14.15 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_main.py
2.44 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
test_memcache.py
24.55 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_modules.py
17.47 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_monkey.py
5.5 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_nooldstyle.py
5.82 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
test_paths.py
72.61 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_pcp.py
12.26 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_persisted.py
14.28 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_plugin.py
25.5 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_policies.py
32.04 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_postfix.py
3.53 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_process.py
84.1 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_protocols.py
7.28 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_randbytes.py
3.28 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_rebuild.py
8.3 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_reflect.py
25.47 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_roots.py
1.77 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_shortcut.py
1.89 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
test_sip.py
24.69 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_sob.py
5.5 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_socks.py
17.32 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_ssl.py
23.29 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_sslverify.py
104.28 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_stateful.py
1.97 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_stdio.py
12.85 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_strerror.py
5.06 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_stringtransport.py
12.95 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_strports.py
1.75 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
test_task.py
38.4 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_tcp.py
64.07 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_tcp_internals.py
8.54 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_text.py
6.3 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_threadable.py
3.65 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_threadpool.py
22.47 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_threads.py
12.96 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_tpfile.py
1.56 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_twistd.py
61.05 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_twisted.py
18.42 KB
09/09/2017 05:11:54 AM
rw-r--r--
📄
test_udp.py
24.1 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_unix.py
14.8 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_usage.py
23.09 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
testutils.py
5.19 KB
09/08/2017 10:38:36 AM
rw-r--r--
Editing: test_threadpool.py
Close
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Tests for L{twisted.python.threadpool} """ from __future__ import division, absolute_import import pickle, time, weakref, gc, threading from twisted.python.compat import range from twisted.trial import unittest from twisted.python import threadpool, threadable, failure, context from twisted._threads import Team, createMemoryWorker # # See the end of this module for the remainder of the imports. # class Synchronization(object): failures = 0 def __init__(self, N, waiting): self.N = N self.waiting = waiting self.lock = threading.Lock() self.runs = [] def run(self): # This is the testy part: this is supposed to be invoked # serially from multiple threads. If that is actually the # case, we will never fail to acquire this lock. If it is # *not* the case, we might get here while someone else is # holding the lock. if self.lock.acquire(False): if not len(self.runs) % 5: time.sleep(0.0002) # Constant selected based on # empirical data to maximize the # chance of a quick failure if this # code is broken. self.lock.release() else: self.failures += 1 # This is just the only way I can think of to wake up the test # method. It doesn't actually have anything to do with the # test. self.lock.acquire() self.runs.append(None) if len(self.runs) == self.N: self.waiting.release() self.lock.release() synchronized = ["run"] threadable.synchronize(Synchronization) class ThreadPoolTests(unittest.SynchronousTestCase): """ Test threadpools. """ def getTimeout(self): """ Return number of seconds to wait before giving up. """ return 5 # Really should be order of magnitude less def _waitForLock(self, lock): items = range(1000000) for i in items: if lock.acquire(False): break time.sleep(1e-5) else: self.fail("A long time passed without succeeding") def test_attributes(self): """ L{ThreadPool.min} and L{ThreadPool.max} are set to the values passed to L{ThreadPool.__init__}. """ pool = threadpool.ThreadPool(12, 22) self.assertEqual(pool.min, 12) self.assertEqual(pool.max, 22) def test_start(self): """ L{ThreadPool.start} creates the minimum number of threads specified. """ pool = threadpool.ThreadPool(0, 5) pool.start() self.addCleanup(pool.stop) self.assertEqual(len(pool.threads), 0) pool = threadpool.ThreadPool(3, 10) self.assertEqual(len(pool.threads), 0) pool.start() self.addCleanup(pool.stop) self.assertEqual(len(pool.threads), 3) def test_adjustingWhenPoolStopped(self): """ L{ThreadPool.adjustPoolsize} only modifies the pool size and does not start new workers while the pool is not running. """ pool = threadpool.ThreadPool(0, 5) pool.start() pool.stop() pool.adjustPoolsize(2) self.assertEqual(len(pool.threads), 0) def test_threadCreationArguments(self): """ Test that creating threads in the threadpool with application-level objects as arguments doesn't results in those objects never being freed, with the thread maintaining a reference to them as long as it exists. """ tp = threadpool.ThreadPool(0, 1) tp.start() self.addCleanup(tp.stop) # Sanity check - no threads should have been started yet. self.assertEqual(tp.threads, []) # Here's our function def worker(arg): pass # weakref needs an object subclass class Dumb(object): pass # And here's the unique object unique = Dumb() workerRef = weakref.ref(worker) uniqueRef = weakref.ref(unique) # Put some work in tp.callInThread(worker, unique) # Add an event to wait completion event = threading.Event() tp.callInThread(event.set) event.wait(self.getTimeout()) del worker del unique gc.collect() self.assertIsNone(uniqueRef()) self.assertIsNone(workerRef()) def test_threadCreationArgumentsCallInThreadWithCallback(self): """ As C{test_threadCreationArguments} above, but for callInThreadWithCallback. """ tp = threadpool.ThreadPool(0, 1) tp.start() self.addCleanup(tp.stop) # Sanity check - no threads should have been started yet. self.assertEqual(tp.threads, []) # this holds references obtained in onResult refdict = {} # name -> ref value onResultWait = threading.Event() onResultDone = threading.Event() resultRef = [] # result callback def onResult(success, result): onResultWait.wait(self.getTimeout()) refdict['workerRef'] = workerRef() refdict['uniqueRef'] = uniqueRef() onResultDone.set() resultRef.append(weakref.ref(result)) # Here's our function def worker(arg, test): return Dumb() # weakref needs an object subclass class Dumb(object): pass # And here's the unique object unique = Dumb() onResultRef = weakref.ref(onResult) workerRef = weakref.ref(worker) uniqueRef = weakref.ref(unique) # Put some work in tp.callInThreadWithCallback(onResult, worker, unique, test=unique) del worker del unique # let onResult collect the refs onResultWait.set() # wait for onResult onResultDone.wait(self.getTimeout()) gc.collect() self.assertIsNone(uniqueRef()) self.assertIsNone(workerRef()) # XXX There's a race right here - has onResult in the worker thread # returned and the locals in _worker holding it and the result been # deleted yet? del onResult gc.collect() self.assertIsNone(onResultRef()) self.assertIsNone(resultRef[0]()) def test_persistence(self): """ Threadpools can be pickled and unpickled, which should preserve the number of threads and other parameters. """ pool = threadpool.ThreadPool(7, 20) self.assertEqual(pool.min, 7) self.assertEqual(pool.max, 20) # check that unpickled threadpool has same number of threads copy = pickle.loads(pickle.dumps(pool)) self.assertEqual(copy.min, 7) self.assertEqual(copy.max, 20) def _threadpoolTest(self, method): """ Test synchronization of calls made with C{method}, which should be one of the mechanisms of the threadpool to execute work in threads. """ # This is a schizophrenic test: it seems to be trying to test # both the callInThread()/dispatch() behavior of the ThreadPool as well # as the serialization behavior of threadable.synchronize(). It # would probably make more sense as two much simpler tests. N = 10 tp = threadpool.ThreadPool() tp.start() self.addCleanup(tp.stop) waiting = threading.Lock() waiting.acquire() actor = Synchronization(N, waiting) for i in range(N): method(tp, actor) self._waitForLock(waiting) self.assertFalse(actor.failures, "run() re-entered %d times" % (actor.failures,)) def test_callInThread(self): """ Call C{_threadpoolTest} with C{callInThread}. """ return self._threadpoolTest( lambda tp, actor: tp.callInThread(actor.run)) def test_callInThreadException(self): """ L{ThreadPool.callInThread} logs exceptions raised by the callable it is passed. """ class NewError(Exception): pass def raiseError(): raise NewError() tp = threadpool.ThreadPool(0, 1) tp.callInThread(raiseError) tp.start() tp.stop() errors = self.flushLoggedErrors(NewError) self.assertEqual(len(errors), 1) def test_callInThreadWithCallback(self): """ L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a two-tuple of C{(True, result)} where C{result} is the value returned by the callable supplied. """ waiter = threading.Lock() waiter.acquire() results = [] def onResult(success, result): waiter.release() results.append(success) results.append(result) tp = threadpool.ThreadPool(0, 1) tp.callInThreadWithCallback(onResult, lambda: "test") tp.start() try: self._waitForLock(waiter) finally: tp.stop() self.assertTrue(results[0]) self.assertEqual(results[1], "test") def test_callInThreadWithCallbackExceptionInCallback(self): """ L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a two-tuple of C{(False, failure)} where C{failure} represents the exception raised by the callable supplied. """ class NewError(Exception): pass def raiseError(): raise NewError() waiter = threading.Lock() waiter.acquire() results = [] def onResult(success, result): waiter.release() results.append(success) results.append(result) tp = threadpool.ThreadPool(0, 1) tp.callInThreadWithCallback(onResult, raiseError) tp.start() try: self._waitForLock(waiter) finally: tp.stop() self.assertFalse(results[0]) self.assertIsInstance(results[1], failure.Failure) self.assertTrue(issubclass(results[1].type, NewError)) def test_callInThreadWithCallbackExceptionInOnResult(self): """ L{ThreadPool.callInThreadWithCallback} logs the exception raised by C{onResult}. """ class NewError(Exception): pass waiter = threading.Lock() waiter.acquire() results = [] def onResult(success, result): results.append(success) results.append(result) raise NewError() tp = threadpool.ThreadPool(0, 1) tp.callInThreadWithCallback(onResult, lambda : None) tp.callInThread(waiter.release) tp.start() try: self._waitForLock(waiter) finally: tp.stop() errors = self.flushLoggedErrors(NewError) self.assertEqual(len(errors), 1) self.assertTrue(results[0]) self.assertIsNone(results[1]) def test_callbackThread(self): """ L{ThreadPool.callInThreadWithCallback} calls the function it is given and the C{onResult} callback in the same thread. """ threadIds = [] event = threading.Event() def onResult(success, result): threadIds.append(threading.currentThread().ident) event.set() def func(): threadIds.append(threading.currentThread().ident) tp = threadpool.ThreadPool(0, 1) tp.callInThreadWithCallback(onResult, func) tp.start() self.addCleanup(tp.stop) event.wait(self.getTimeout()) self.assertEqual(len(threadIds), 2) self.assertEqual(threadIds[0], threadIds[1]) def test_callbackContext(self): """ The context L{ThreadPool.callInThreadWithCallback} is invoked in is shared by the context the callable and C{onResult} callback are invoked in. """ myctx = context.theContextTracker.currentContext().contexts[-1] myctx['testing'] = 'this must be present' contexts = [] event = threading.Event() def onResult(success, result): ctx = context.theContextTracker.currentContext().contexts[-1] contexts.append(ctx) event.set() def func(): ctx = context.theContextTracker.currentContext().contexts[-1] contexts.append(ctx) tp = threadpool.ThreadPool(0, 1) tp.callInThreadWithCallback(onResult, func) tp.start() self.addCleanup(tp.stop) event.wait(self.getTimeout()) self.assertEqual(len(contexts), 2) self.assertEqual(myctx, contexts[0]) self.assertEqual(myctx, contexts[1]) def test_existingWork(self): """ Work added to the threadpool before its start should be executed once the threadpool is started: this is ensured by trying to release a lock previously acquired. """ waiter = threading.Lock() waiter.acquire() tp = threadpool.ThreadPool(0, 1) tp.callInThread(waiter.release) # before start() tp.start() try: self._waitForLock(waiter) finally: tp.stop() def test_workerStateTransition(self): """ As the worker receives and completes work, it transitions between the working and waiting states. """ pool = threadpool.ThreadPool(0, 1) pool.start() self.addCleanup(pool.stop) # sanity check self.assertEqual(pool.workers, 0) self.assertEqual(len(pool.waiters), 0) self.assertEqual(len(pool.working), 0) # fire up a worker and give it some 'work' threadWorking = threading.Event() threadFinish = threading.Event() def _thread(): threadWorking.set() threadFinish.wait(10) pool.callInThread(_thread) threadWorking.wait(10) self.assertEqual(pool.workers, 1) self.assertEqual(len(pool.waiters), 0) self.assertEqual(len(pool.working), 1) # finish work, and spin until state changes threadFinish.set() while not len(pool.waiters): time.sleep(0.0005) # make sure state changed correctly self.assertEqual(len(pool.waiters), 1) self.assertEqual(len(pool.working), 0) class RaceConditionTests(unittest.SynchronousTestCase): def setUp(self): self.threadpool = threadpool.ThreadPool(0, 10) self.event = threading.Event() self.threadpool.start() def done(): self.threadpool.stop() del self.threadpool self.addCleanup(done) def getTimeout(self): """ A reasonable number of seconds to time out. """ return 5 def test_synchronization(self): """ If multiple threads are waiting on an event (via blocking on something in a callable passed to L{threadpool.ThreadPool.callInThread}), and there is spare capacity in the threadpool, sending another callable which will cause those to un-block to L{threadpool.ThreadPool.callInThread} will reliably run that callable and un-block the blocked threads promptly. @note: This is not really a unit test, it is a stress-test. You may need to run it with C{trial -u} to fail reliably if there is a problem. It is very hard to regression-test for this particular bug - one where the thread pool may consider itself as having "enough capacity" when it really needs to spin up a new thread if it possibly can - in a deterministic way, since the bug can only be provoked by subtle race conditions. """ timeout = self.getTimeout() self.threadpool.callInThread(self.event.set) self.event.wait(timeout) self.event.clear() for i in range(3): self.threadpool.callInThread(self.event.wait) self.threadpool.callInThread(self.event.set) self.event.wait(timeout) if not self.event.isSet(): self.event.set() self.fail( "'set' did not run in thread; timed out waiting on 'wait'." ) def test_singleThread(self): """ The submission of a new job to a thread pool in response to the C{onResult} callback does not cause a new thread to be added to the thread pool. This requires that the thread which calls C{onResult} to have first marked itself as available so that when the new job is queued, that thread may be considered to run it. This is desirable so that when only N jobs are ever being executed in the thread pool at once only N threads will ever be created. """ # Ensure no threads running self.assertEqual(self.threadpool.workers, 0) event = threading.Event() event.clear() def onResult(success, counter): event.set() for i in range(10): self.threadpool.callInThreadWithCallback( onResult, lambda: None) event.wait(10) event.clear() self.assertEqual(self.threadpool.workers, 1) class MemoryPool(threadpool.ThreadPool): """ A deterministic threadpool that uses in-memory data structures to queue work rather than threads to execute work. """ def __init__(self, coordinator, failTest, newWorker, *args, **kwargs): """ Initialize this L{MemoryPool} with a test case. @param coordinator: a worker used to coordinate work in the L{Team} underlying this threadpool. @type coordinator: L{twisted._threads.IExclusiveWorker} @param failTest: A 1-argument callable taking an exception and raising a test-failure exception. @type failTest: 1-argument callable taking (L{Failure}) and raising L{unittest.FailTest}. @param newWorker: a 0-argument callable that produces a new L{twisted._threads.IWorker} provider on each invocation. @type newWorker: 0-argument callable returning L{twisted._threads.IWorker}. """ self._coordinator = coordinator self._failTest = failTest self._newWorker = newWorker threadpool.ThreadPool.__init__(self, *args, **kwargs) def _pool(self, currentLimit, threadFactory): """ Override testing hook to create a deterministic threadpool. @param currentLimit: A 1-argument callable which returns the current threadpool size limit. @param threadFactory: ignored in this invocation; a 0-argument callable that would produce a thread. @return: a L{Team} backed by the coordinator and worker passed to L{MemoryPool.__init__}. """ def respectLimit(): # The expression in this method copied and pasted from # twisted.threads._pool, which is unfortunately bound up # with lots of actual-threading stuff. stats = team.statistics() if (stats.busyWorkerCount + stats.idleWorkerCount >= currentLimit()): return None return self._newWorker() team = Team(coordinator=self._coordinator, createWorker=respectLimit, logException=self._failTest) return team class PoolHelper(object): """ A L{PoolHelper} constructs a L{threadpool.ThreadPool} that doesn't actually use threads, by using the internal interfaces in L{twisted._threads}. @ivar performCoordination: a 0-argument callable that will perform one unit of "coordination" - work involved in delegating work to other threads - and return L{True} if it did any work, L{False} otherwise. @ivar workers: the workers which represent the threads within the pool - the workers other than the coordinator. @type workers: L{list} of 2-tuple of (L{IWorker}, C{workPerformer}) where C{workPerformer} is a 0-argument callable like C{performCoordination}. @ivar threadpool: a modified L{threadpool.ThreadPool} to test. @type threadpool: L{MemoryPool} """ def __init__(self, testCase, *args, **kwargs): """ Create a L{PoolHelper}. @param testCase: a test case attached to this helper. @type args: The arguments passed to a L{threadpool.ThreadPool}. @type kwargs: The arguments passed to a L{threadpool.ThreadPool} """ coordinator, self.performCoordination = createMemoryWorker() self.workers = [] def newWorker(): self.workers.append(createMemoryWorker()) return self.workers[-1][0] self.threadpool = MemoryPool(coordinator, testCase.fail, newWorker, *args, **kwargs) def performAllCoordination(self): """ Perform all currently scheduled "coordination", which is the work involved in delegating work to other threads. """ while self.performCoordination(): pass class MemoryBackedTests(unittest.SynchronousTestCase): """ Tests using L{PoolHelper} to deterministically test properties of the threadpool implementation. """ def test_workBeforeStarting(self): """ If a threadpool is told to do work before starting, then upon starting up, it will start enough workers to handle all of the enqueued work that it's been given. """ helper = PoolHelper(self, 0, 10) n = 5 for x in range(n): helper.threadpool.callInThread(lambda: None) helper.performAllCoordination() self.assertEqual(helper.workers, []) helper.threadpool.start() helper.performAllCoordination() self.assertEqual(len(helper.workers), n) def test_tooMuchWorkBeforeStarting(self): """ If the amount of work before starting exceeds the maximum number of threads allowed to the threadpool, only the maximum count will be started. """ helper = PoolHelper(self, 0, 10) n = 50 for x in range(n): helper.threadpool.callInThread(lambda: None) helper.performAllCoordination() self.assertEqual(helper.workers, []) helper.threadpool.start() helper.performAllCoordination() self.assertEqual(len(helper.workers), helper.threadpool.max)