OXIESEC PANEL
- Current Dir:
/
/
usr
/
lib
/
python3
/
dist-packages
/
twisted
/
test
Server IP: 139.59.38.164
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📄
__init__.py
103 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📁
__pycache__
-
03/31/2022 06:22:39 AM
rwxr-xr-x
📄
cert.pem.no_trailing_newline
1.38 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
crash_test_dummy.py
543 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
iosim.py
17.3 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
key.pem.no_trailing_newline
1.67 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
mock_win32process.py
1.46 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
myrebuilder1.py
158 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
myrebuilder2.py
158 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
plugin_basic.py
943 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
plugin_extra1.py
407 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
plugin_extra2.py
579 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_cmdline.py
162 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_echoer.py
214 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_fds.py
945 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
process_getargv.py
283 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
process_getenv.py
268 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
process_linger.py
286 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_reader.py
188 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_signal.py
214 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_stdinreader.py
857 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
process_tester.py
1.01 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
process_tty.py
130 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📄
process_twisted.py
1.18 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
proto_helpers.py
26.33 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
raiser.c
93.05 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
raiser.cpython-36m-x86_64-linux-gnu.so
19.16 KB
03/22/2022 11:03:56 AM
rw-r--r--
📄
raiser.pyx
466 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
reflect_helper_IE.py
61 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
reflect_helper_VE.py
82 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
reflect_helper_ZDE.py
47 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
server.pem
4.34 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
ssl_helpers.py
1.01 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_consumer.py
1.19 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_halfclose.py
1.89 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_hostpeer.py
1021 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_lastwrite.py
1.18 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_loseconn.py
1.51 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_producer.py
1.47 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_write.py
923 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
stdio_test_writeseq.py
915 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_abstract.py
3.42 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_adbapi.py
25.53 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_amp.py
107.96 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_application.py
32.05 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_compat.py
27.32 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_context.py
1.48 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_cooperator.py
20.96 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_defer.py
100.93 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_defgen.py
10.45 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_dict.py
1.41 KB
09/23/2017 05:52:22 AM
rw-r--r--
📄
test_dirdbm.py
6.76 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
test_error.py
8.39 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_factories.py
4.53 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_failure.py
29.92 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_fdesc.py
7.2 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_finger.py
1.95 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_formmethod.py
3.56 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_ftp.py
127.27 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_ftp_options.py
2.62 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_htb.py
3.12 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_ident.py
6.85 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_internet.py
45.33 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_iosim.py
8.49 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
test_iutils.py
13.13 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_lockfile.py
15.14 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_log.py
35.48 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_logfile.py
17.8 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_loopback.py
14.15 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_main.py
2.44 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
test_memcache.py
24.55 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_modules.py
17.47 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_monkey.py
5.5 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_nooldstyle.py
5.82 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
test_paths.py
72.61 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_pcp.py
12.26 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_persisted.py
14.28 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_plugin.py
25.5 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_policies.py
32.04 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_postfix.py
3.53 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_process.py
84.1 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_protocols.py
7.28 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_randbytes.py
3.28 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_rebuild.py
8.3 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_reflect.py
25.47 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_roots.py
1.77 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_shortcut.py
1.89 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
test_sip.py
24.69 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_sob.py
5.5 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_socks.py
17.32 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_ssl.py
23.29 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_sslverify.py
104.28 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_stateful.py
1.97 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_stdio.py
12.85 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_strerror.py
5.06 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_stringtransport.py
12.95 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_strports.py
1.75 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
test_task.py
38.4 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_tcp.py
64.07 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_tcp_internals.py
8.54 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_text.py
6.3 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_threadable.py
3.65 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_threadpool.py
22.47 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_threads.py
12.96 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_tpfile.py
1.56 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_twistd.py
61.05 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_twisted.py
18.42 KB
09/09/2017 05:11:54 AM
rw-r--r--
📄
test_udp.py
24.1 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_unix.py
14.8 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_usage.py
23.09 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
testutils.py
5.19 KB
09/08/2017 10:38:36 AM
rw-r--r--
Editing: test_stringtransport.py
Close
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Tests for L{twisted.test.proto_helpers}. """ from zope.interface.verify import verifyObject from twisted.internet.interfaces import (ITransport, IPushProducer, IConsumer, IReactorTCP, IReactorSSL, IReactorUNIX, IAddress, IListeningPort, IConnector) from twisted.internet.address import IPv4Address from twisted.trial.unittest import TestCase from twisted.test.proto_helpers import (StringTransport, MemoryReactor, RaisingMemoryReactor, NonStreamingProducer) from twisted.internet.protocol import ClientFactory, Factory class StringTransportTests(TestCase): """ Tests for L{twisted.test.proto_helpers.StringTransport}. """ def setUp(self): self.transport = StringTransport() def test_interfaces(self): """ L{StringTransport} instances provide L{ITransport}, L{IPushProducer}, and L{IConsumer}. """ self.assertTrue(verifyObject(ITransport, self.transport)) self.assertTrue(verifyObject(IPushProducer, self.transport)) self.assertTrue(verifyObject(IConsumer, self.transport)) def test_registerProducer(self): """ L{StringTransport.registerProducer} records the arguments supplied to it as instance attributes. """ producer = object() streaming = object() self.transport.registerProducer(producer, streaming) self.assertIs(self.transport.producer, producer) self.assertIs(self.transport.streaming, streaming) def test_disallowedRegisterProducer(self): """ L{StringTransport.registerProducer} raises L{RuntimeError} if a producer is already registered. """ producer = object() self.transport.registerProducer(producer, True) self.assertRaises( RuntimeError, self.transport.registerProducer, object(), False) self.assertIs(self.transport.producer, producer) self.assertTrue(self.transport.streaming) def test_unregisterProducer(self): """ L{StringTransport.unregisterProducer} causes the transport to forget about the registered producer and makes it possible to register a new one. """ oldProducer = object() newProducer = object() self.transport.registerProducer(oldProducer, False) self.transport.unregisterProducer() self.assertIsNone(self.transport.producer) self.transport.registerProducer(newProducer, True) self.assertIs(self.transport.producer, newProducer) self.assertTrue(self.transport.streaming) def test_invalidUnregisterProducer(self): """ L{StringTransport.unregisterProducer} raises L{RuntimeError} if called when no producer is registered. """ self.assertRaises(RuntimeError, self.transport.unregisterProducer) def test_initialProducerState(self): """ L{StringTransport.producerState} is initially C{'producing'}. """ self.assertEqual(self.transport.producerState, 'producing') def test_pauseProducing(self): """ L{StringTransport.pauseProducing} changes the C{producerState} of the transport to C{'paused'}. """ self.transport.pauseProducing() self.assertEqual(self.transport.producerState, 'paused') def test_resumeProducing(self): """ L{StringTransport.resumeProducing} changes the C{producerState} of the transport to C{'producing'}. """ self.transport.pauseProducing() self.transport.resumeProducing() self.assertEqual(self.transport.producerState, 'producing') def test_stopProducing(self): """ L{StringTransport.stopProducing} changes the C{'producerState'} of the transport to C{'stopped'}. """ self.transport.stopProducing() self.assertEqual(self.transport.producerState, 'stopped') def test_stoppedTransportCannotPause(self): """ L{StringTransport.pauseProducing} raises L{RuntimeError} if the transport has been stopped. """ self.transport.stopProducing() self.assertRaises(RuntimeError, self.transport.pauseProducing) def test_stoppedTransportCannotResume(self): """ L{StringTransport.resumeProducing} raises L{RuntimeError} if the transport has been stopped. """ self.transport.stopProducing() self.assertRaises(RuntimeError, self.transport.resumeProducing) def test_disconnectingTransportCannotPause(self): """ L{StringTransport.pauseProducing} raises L{RuntimeError} if the transport is being disconnected. """ self.transport.loseConnection() self.assertRaises(RuntimeError, self.transport.pauseProducing) def test_disconnectingTransportCannotResume(self): """ L{StringTransport.resumeProducing} raises L{RuntimeError} if the transport is being disconnected. """ self.transport.loseConnection() self.assertRaises(RuntimeError, self.transport.resumeProducing) def test_loseConnectionSetsDisconnecting(self): """ L{StringTransport.loseConnection} toggles the C{disconnecting} instance variable to C{True}. """ self.assertFalse(self.transport.disconnecting) self.transport.loseConnection() self.assertTrue(self.transport.disconnecting) def test_specifiedHostAddress(self): """ If a host address is passed to L{StringTransport.__init__}, that value is returned from L{StringTransport.getHost}. """ address = object() self.assertIs(StringTransport(address).getHost(), address) def test_specifiedPeerAddress(self): """ If a peer address is passed to L{StringTransport.__init__}, that value is returned from L{StringTransport.getPeer}. """ address = object() self.assertIs( StringTransport(peerAddress=address).getPeer(), address) def test_defaultHostAddress(self): """ If no host address is passed to L{StringTransport.__init__}, an L{IPv4Address} is returned from L{StringTransport.getHost}. """ address = StringTransport().getHost() self.assertIsInstance(address, IPv4Address) def test_defaultPeerAddress(self): """ If no peer address is passed to L{StringTransport.__init__}, an L{IPv4Address} is returned from L{StringTransport.getPeer}. """ address = StringTransport().getPeer() self.assertIsInstance(address, IPv4Address) class ReactorTests(TestCase): """ Tests for L{MemoryReactor} and L{RaisingMemoryReactor}. """ def test_memoryReactorProvides(self): """ L{MemoryReactor} provides all of the attributes described by the interfaces it advertises. """ memoryReactor = MemoryReactor() verifyObject(IReactorTCP, memoryReactor) verifyObject(IReactorSSL, memoryReactor) verifyObject(IReactorUNIX, memoryReactor) def test_raisingReactorProvides(self): """ L{RaisingMemoryReactor} provides all of the attributes described by the interfaces it advertises. """ raisingReactor = RaisingMemoryReactor() verifyObject(IReactorTCP, raisingReactor) verifyObject(IReactorSSL, raisingReactor) verifyObject(IReactorUNIX, raisingReactor) def test_connectDestination(self): """ L{MemoryReactor.connectTCP}, L{MemoryReactor.connectSSL}, and L{MemoryReactor.connectUNIX} will return an L{IConnector} whose C{getDestination} method returns an L{IAddress} with attributes which reflect the values passed. """ memoryReactor = MemoryReactor() for connector in [memoryReactor.connectTCP( "test.example.com", 8321, ClientFactory()), memoryReactor.connectSSL( "test.example.com", 8321, ClientFactory(), None)]: verifyObject(IConnector, connector) address = connector.getDestination() verifyObject(IAddress, address) self.assertEqual(address.host, "test.example.com") self.assertEqual(address.port, 8321) connector = memoryReactor.connectUNIX(b"/fake/path", ClientFactory()) verifyObject(IConnector, connector) address = connector.getDestination() verifyObject(IAddress, address) self.assertEqual(address.name, b"/fake/path") def test_listenDefaultHost(self): """ L{MemoryReactor.listenTCP}, L{MemoryReactor.listenSSL} and L{MemoryReactor.listenUNIX} will return an L{IListeningPort} whose C{getHost} method returns an L{IAddress}; C{listenTCP} and C{listenSSL} will have a default host of C{'0.0.0.0'}, and a port that reflects the value passed, and C{listenUNIX} will have a name that reflects the path passed. """ memoryReactor = MemoryReactor() for port in [memoryReactor.listenTCP(8242, Factory()), memoryReactor.listenSSL(8242, Factory(), None)]: verifyObject(IListeningPort, port) address = port.getHost() verifyObject(IAddress, address) self.assertEqual(address.host, '0.0.0.0') self.assertEqual(address.port, 8242) port = memoryReactor.listenUNIX(b"/path/to/socket", Factory()) verifyObject(IListeningPort, port) address = port.getHost() verifyObject(IAddress, address) self.assertEqual(address.name, b"/path/to/socket") def test_readers(self): """ Adding, removing, and listing readers works. """ reader = object() reactor = MemoryReactor() reactor.addReader(reader) reactor.addReader(reader) self.assertEqual(reactor.getReaders(), [reader]) reactor.removeReader(reader) self.assertEqual(reactor.getReaders(), []) def test_writers(self): """ Adding, removing, and listing writers works. """ writer = object() reactor = MemoryReactor() reactor.addWriter(writer) reactor.addWriter(writer) self.assertEqual(reactor.getWriters(), [writer]) reactor.removeWriter(writer) self.assertEqual(reactor.getWriters(), []) class TestConsumer(object): """ A very basic test consumer for use with the NonStreamingProducerTests. """ def __init__(self): self.writes = [] self.producer = None self.producerStreaming = None def registerProducer(self, producer, streaming): """ Registers a single producer with this consumer. Just keeps track of it. @param producer: The producer to register. @param streaming: Whether the producer is a streaming one or not. """ self.producer = producer self.producerStreaming = streaming def unregisterProducer(self): """ Forget the producer we had previously registered. """ self.producer = None self.producerStreaming = None def write(self, data): """ Some data was written to the consumer: stores it for later use. @param data: The data to write. """ self.writes.append(data) class NonStreamingProducerTests(TestCase): """ Tests for the L{NonStreamingProducer} to validate behaviour. """ def test_producesOnly10Times(self): """ When the L{NonStreamingProducer} has resumeProducing called 10 times, it writes the counter each time and then fails. """ consumer = TestConsumer() producer = NonStreamingProducer(consumer) consumer.registerProducer(producer, False) self.assertIs(consumer.producer, producer) self.assertIs(producer.consumer, consumer) self.assertFalse(consumer.producerStreaming) for _ in range(10): producer.resumeProducing() # We should have unregistered the producer and printed the 10 results. expectedWrites = [ b'0', b'1', b'2', b'3', b'4', b'5', b'6', b'7', b'8', b'9' ] self.assertIsNone(consumer.producer) self.assertIsNone(consumer.producerStreaming) self.assertIsNone(producer.consumer) self.assertEqual(consumer.writes, expectedWrites) # Another attempt to produce fails. self.assertRaises(RuntimeError, producer.resumeProducing) def test_cannotPauseProduction(self): """ When the L{NonStreamingProducer} is paused, it raises a L{RuntimeError}. """ consumer = TestConsumer() producer = NonStreamingProducer(consumer) consumer.registerProducer(producer, False) # Produce once, just to be safe. producer.resumeProducing() self.assertRaises(RuntimeError, producer.pauseProducing)