OXIESEC PANEL
- Current Dir:
/
/
usr
/
lib
/
python3
/
dist-packages
/
twisted
/
names
/
test
Server IP: 139.59.38.164
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📄
__init__.py
26 bytes
09/08/2017 10:38:36 AM
rw-r--r--
📁
__pycache__
-
03/31/2022 06:22:39 AM
rwxr-xr-x
📄
test_cache.py
4.8 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_client.py
39.31 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_common.py
4.11 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_dns.py
150.45 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_examples.py
5.22 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_hosts.py
8.35 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_names.py
42.09 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_resolve.py
1.05 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_rfc1982.py
13.66 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_rootresolve.py
25.04 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_server.py
40.27 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_srvconnect.py
9.89 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_tap.py
4.84 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
test_util.py
3.82 KB
09/08/2017 10:38:36 AM
rw-r--r--
Editing: test_util.py
Close
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Utilities for Twisted.names tests. """ from __future__ import division, absolute_import from random import randrange from zope.interface import implementer from zope.interface.verify import verifyClass from twisted.internet.address import IPv4Address from twisted.internet.defer import succeed from twisted.internet.task import Clock from twisted.internet.interfaces import IReactorUDP, IUDPTransport @implementer(IUDPTransport) class MemoryDatagramTransport(object): """ This L{IUDPTransport} implementation enforces the usual connection rules and captures sent traffic in a list for later inspection. @ivar _host: The host address to which this transport is bound. @ivar _protocol: The protocol connected to this transport. @ivar _sentPackets: A C{list} of two-tuples of the datagrams passed to C{write} and the addresses to which they are destined. @ivar _connectedTo: L{None} if this transport is unconnected, otherwise an address to which all traffic is supposedly sent. @ivar _maxPacketSize: An C{int} giving the maximum length of a datagram which will be successfully handled by C{write}. """ def __init__(self, host, protocol, maxPacketSize): self._host = host self._protocol = protocol self._sentPackets = [] self._connectedTo = None self._maxPacketSize = maxPacketSize def getHost(self): """ Return the address which this transport is pretending to be bound to. """ return IPv4Address('UDP', *self._host) def connect(self, host, port): """ Connect this transport to the given address. """ if self._connectedTo is not None: raise ValueError("Already connected") self._connectedTo = (host, port) def write(self, datagram, addr=None): """ Send the given datagram. """ if addr is None: addr = self._connectedTo if addr is None: raise ValueError("Need an address") if len(datagram) > self._maxPacketSize: raise ValueError("Packet too big") self._sentPackets.append((datagram, addr)) def stopListening(self): """ Shut down this transport. """ self._protocol.stopProtocol() return succeed(None) def setBroadcastAllowed(self, enabled): """ Dummy implementation to satisfy L{IUDPTransport}. """ pass def getBroadcastAllowed(self): """ Dummy implementation to satisfy L{IUDPTransport}. """ pass verifyClass(IUDPTransport, MemoryDatagramTransport) @implementer(IReactorUDP) class MemoryReactor(Clock): """ An L{IReactorTime} and L{IReactorUDP} provider. Time is controlled deterministically via the base class, L{Clock}. UDP is handled in-memory by connecting protocols to instances of L{MemoryDatagramTransport}. @ivar udpPorts: A C{dict} mapping port numbers to instances of L{MemoryDatagramTransport}. """ def __init__(self): Clock.__init__(self) self.udpPorts = {} def listenUDP(self, port, protocol, interface='', maxPacketSize=8192): """ Pretend to bind a UDP port and connect the given protocol to it. """ if port == 0: while True: port = randrange(1, 2 ** 16) if port not in self.udpPorts: break if port in self.udpPorts: raise ValueError("Address in use") transport = MemoryDatagramTransport( (interface, port), protocol, maxPacketSize) self.udpPorts[port] = transport protocol.makeConnection(transport) return transport verifyClass(IReactorUDP, MemoryReactor)