[tor-commits] [flashproxy/master] Do WebSocket linking of locals and remotes.
dcf at torproject.org
dcf at torproject.org
Mon Apr 9 04:08:42 UTC 2012
commit aae557431a0b1bbdb2d60d92aeaf98727317a35c
Author: David Fifield <david at bamsoftware.com>
Date: Thu Mar 29 18:40:13 2012 -0700
Do WebSocket linking of locals and remotes.
---
connector.py | 144 ++++++++++++++++++++++++++++++++++++---------------------
1 files changed, 91 insertions(+), 53 deletions(-)
diff --git a/connector.py b/connector.py
index ec30401..07f0f3e 100755
--- a/connector.py
+++ b/connector.py
@@ -111,20 +111,6 @@ def format_addr(addr):
return u"%s:%d" % (host, port)
-class BufferSocket(object):
- """A socket containing a time of creation and a buffer of data received. The
- buffer stores data to make the socket selectable again."""
- def __init__(self, fd):
- self.fd = fd
- self.birthday = time.time()
- self.buf = ""
-
- def __getattr__(self, name):
- return getattr(self.fd, name)
-
- def is_expired(self, timeout):
- return time.time() - self.birthday > timeout
-
def apply_mask(payload, mask_key):
result = []
@@ -545,21 +531,36 @@ def report_pending():
log(u"locals (%d): %s" % (len(locals), [format_peername(x) for x in locals]))
log(u"remotes (%d): %s" % (len(remotes), [format_peername(x) for x in remotes]))
+def proxy_chunk_local_to_remote(local, remote):
+ try:
+ data = local.recv(65536)
+ except socket.error, e: # Can be "Connection reset by peer".
+ log(u"Socket error from local: %s" % repr(str(e)))
+ remote.close()
+ return False
+ if not data:
+ log(u"EOF from local %s." % format_peername(local))
+ local.close()
+ remote.close()
+ return False
+ else:
+ remote.send_chunk(data)
+ return True
-def proxy_chunk(fd_r, fd_w, label):
+def proxy_chunk_remote_to_local(remote, local):
try:
- data = fd_r.recv(65536)
+ data = remote.recv(65536)
except socket.error, e: # Can be "Connection reset by peer".
- log(u"Socket error from %s: %s" % (label, repr(str(e))))
- fd_w.close()
+ log(u"Socket error from remote: %s" % repr(str(e)))
+ local.close()
return False
if not data:
- log(u"EOF from %s %s." % (label, format_peername(fd_r)))
- fd_r.close()
- fd_w.close()
+ log(u"EOF from remote %s." % format_peername(remote))
+ remote.close()
+ local.close()
return False
else:
- fd_w.sendall(data)
+ local.send_chunk(data)
return True
def receive_unlinked(fd, label):
@@ -587,28 +588,59 @@ def receive_unlinked(fd, label):
return True
def match_proxies():
- while locals and remotes:
- remote = remotes.pop(0)
- local = locals.pop(0)
+ while unlinked_remotes and unlinked_locals:
+ remote = unlinked_remotes.pop(0)
+ local = unlinked_locals.pop(0)
remote_addr, remote_port = remote.getpeername()
local_addr, local_port = local.getpeername()
log(u"Linking %s and %s." % (format_peername(local), format_peername(remote)))
+ remote.partner = local
+ local.partner = remote
if local.buf:
- remote.sendall(local.buf)
+ remote.send_chunk(local.buf)
if remote.buf:
- local.sendall(remote.buf)
- remote_for[local.fd] = remote.fd
- local_for[remote.fd] = local.fd
+ local.send_chunk(remote.buf)
+
+class RemoteSocket(object):
+ def __init__(self, fd, protocols):
+ self.fd = fd
+ self.buf = ""
+ self.partner = None
+ self.dec = WebSocketBinaryDecoder(protocols, use_mask = True)
+ self.enc = WebSocketBinaryEncoder(protocols, use_mask = False)
+
+ def send_chunk(self, data):
+ self.sendall(self.enc.encode(data))
+
+ def __getattr__(self, name):
+ return getattr(self.fd, name)
+
+class LocalSocket(object):
+ def __init__(self, fd):
+ self.fd = fd
+ self.buf = ""
+ self.partner = None
+
+ def send_chunk(self, data):
+ self.partner.dec.feed(data)
+ while True:
+ data = self.partner.dec.read()
+ if not data:
+ break
+ self.sendall(data)
+
+ def __getattr__(self, name):
+ return getattr(self.fd, name)
def main():
while True:
- rset = [remote_s, local_s] + websocket_pending + socks_pending + remote_for.keys() + local_for.keys() + locals + remotes
+ rset = [remote_s, local_s] + websocket_pending + socks_pending + locals + remotes
rset, _, _ = select.select(rset, [], [])
for fd in rset:
if fd == remote_s:
remote_c, addr = fd.accept()
log(u"Remote connection from %s." % format_addr(addr))
- websocket_pending.append(BufferSocket(remote_c))
+ websocket_pending.append(remote_c)
elif fd == local_s:
local_c, addr = fd.accept()
log(u"Local connection from %s." % format_addr(addr))
@@ -617,7 +649,9 @@ def main():
log(u"Data from WebSocket-pending %s." % format_addr(addr))
protocols = handle_websocket_request(fd)
if protocols is not None:
- remotes.append(fd)
+ wrapped = RemoteSocket(fd, protocols)
+ remotes.append(wrapped)
+ unlinked_remotes.append(wrapped)
else:
fd.close()
websocket_pending.remove(fd)
@@ -625,29 +659,33 @@ def main():
elif fd in socks_pending:
log(u"SOCKS request from %s." % format_addr(addr))
if handle_socks_request(fd):
- locals.append(BufferSocket(fd))
+ wrapped = LocalSocket(fd)
+ locals.append(wrapped)
+ unlinked_locals.append(wrapped)
else:
fd.close()
socks_pending.remove(fd)
report_pending()
- elif fd in local_for:
- local = local_for[fd]
- if not proxy_chunk(fd, local, "remote"):
- del local_for[fd]
- del remote_for[local]
- elif fd in remote_for:
- remote = remote_for[fd]
- if not proxy_chunk(fd, remote, "local"):
- del remote_for[fd]
- del local_for[remote]
- elif fd in locals:
- if not receive_unlinked(fd, "local"):
- locals.remove(fd)
- report_pending()
elif fd in remotes:
- if not receive_unlinked(fd, "remote"):
- remotes.remove(fd)
+ local = fd.partner
+ if local:
+ if not proxy_chunk_remote_to_local(fd, local):
+ remotes.remove(fd)
+ locals.remove(local)
+ else:
+ if not receive_unlinked(fd, "remote"):
+ remotes.remove(fd)
report_pending()
+ elif fd in locals:
+ remote = fd.partner
+ if remote:
+ if not proxy_chunk_local_to_remote(fd, remote):
+ remotes.remove(remote)
+ locals.remove(fd)
+ else:
+ if not receive_unlinked(fd, "local"):
+ locals.remove(fd)
+ report_pending()
match_proxies()
if __name__ == "__main__":
@@ -692,14 +730,14 @@ if __name__ == "__main__":
websocket_pending = []
# Remote connection sockets.
remotes = []
+ # Remotes not yet linked with a local. This is a subset of remotes.
+ unlinked_remotes = []
# New local sockets waiting to finish their SOCKS negotiation.
socks_pending = []
# Local Tor sockets, after SOCKS negotiation.
locals = []
-
- # Bidirectional mapping between local sockets and remote sockets.
- local_for = {}
- remote_for = {}
+ # Locals not yet linked with a remote. This is a subset of remotes.
+ unlinked_locals = []
if options.daemonize:
log(u"Daemonizing.")
More information about the tor-commits
mailing list