OXIESEC PANEL
- Current Dir:
/
/
usr
/
lib
/
python3
/
dist-packages
/
twisted
/
conch
Server IP: 139.59.38.164
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📄
__init__.py
515 bytes
09/08/2017 10:38:35 AM
rw-r--r--
📁
__pycache__
-
03/31/2022 06:22:39 AM
rwxr-xr-x
📄
avatar.py
1.4 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
checkers.py
19.28 KB
09/08/2017 10:38:36 AM
rw-r--r--
📁
client
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📄
endpoints.py
28.33 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
error.py
2.65 KB
09/08/2017 10:38:36 AM
rw-r--r--
📁
insults
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📄
interfaces.py
12.76 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
ls.py
2.49 KB
09/08/2017 10:38:35 AM
rw-r--r--
📄
manhole.py
11.3 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
manhole_ssh.py
3.9 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
manhole_tap.py
5.24 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
mixin.py
1.34 KB
09/08/2017 10:38:36 AM
rw-r--r--
📁
openssh_compat
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📄
recvline.py
11.25 KB
09/08/2017 10:38:36 AM
rw-r--r--
📁
scripts
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📁
ssh
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📄
stdio.py
2.71 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
tap.py
3.11 KB
09/08/2017 10:38:36 AM
rw-r--r--
📄
telnet.py
37.64 KB
09/08/2017 10:38:36 AM
rw-r--r--
📁
test
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📄
ttymodes.py
2.19 KB
09/08/2017 10:38:36 AM
rw-r--r--
📁
ui
-
03/31/2022 06:22:38 AM
rwxr-xr-x
📄
unix.py
15.91 KB
09/23/2017 05:51:46 AM
rw-r--r--
Editing: mixin.py
Close
# -*- test-case-name: twisted.conch.test.test_mixin -*- # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Experimental optimization This module provides a single mixin class which allows protocols to collapse numerous small writes into a single larger one. @author: Jp Calderone """ from twisted.internet import reactor class BufferingMixin: """ Mixin which adds write buffering. """ _delayedWriteCall = None data = None DELAY = 0.0 def schedule(self): return reactor.callLater(self.DELAY, self.flush) def reschedule(self, token): token.reset(self.DELAY) def write(self, data): """ Buffer some bytes to be written soon. Every call to this function delays the real write by C{self.DELAY} seconds. When the delay expires, all collected bytes are written to the underlying transport using L{ITransport.writeSequence}. """ if self._delayedWriteCall is None: self.data = [] self._delayedWriteCall = self.schedule() else: self.reschedule(self._delayedWriteCall) self.data.append(data) def flush(self): """ Flush the buffer immediately. """ self._delayedWriteCall = None self.transport.writeSequence(self.data) self.data = None