[or-cvs] r20512: {torctl} Add the ability to filter out streams from other sources. (torctl/trunk/python/TorCtl)
mikeperry at seul.org
mikeperry at seul.org
Wed Sep 9 08:15:08 UTC 2009
Author: mikeperry
Date: 2009-09-09 04:15:07 -0400 (Wed, 09 Sep 2009)
New Revision: 20512
Modified:
torctl/trunk/python/TorCtl/PathSupport.py
torctl/trunk/python/TorCtl/ScanSupport.py
torctl/trunk/python/TorCtl/StatsSupport.py
Log:
Add the ability to filter out streams from other
sources.
Modified: torctl/trunk/python/TorCtl/PathSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/PathSupport.py 2009-09-09 08:00:16 UTC (rev 20511)
+++ torctl/trunk/python/TorCtl/PathSupport.py 2009-09-09 08:15:07 UTC (rev 20512)
@@ -57,6 +57,8 @@
import time
import TorUtil
import traceback
+import sets
+import threading
from TorUtil import *
__all__ = ["NodeRestrictionList", "PathRestrictionList",
@@ -73,7 +75,8 @@
"CountryCodeRestriction", "CountryRestriction",
"UniqueCountryRestriction", "SingleCountryRestriction",
"ContinentRestriction", "ContinentJumperRestriction",
-"UniqueContinentRestriction", "MetaPathRestriction", "RateLimitedRestriction"]
+"UniqueContinentRestriction", "MetaPathRestriction", "RateLimitedRestriction",
+"SmartSocket"]
#################### Path Support Interfaces #####################
@@ -1278,6 +1281,98 @@
"Returns the age of the stream"
return now-self.attached_at
+_origsocket = socket.socket
+class _SocketWrapper(socket.socket):
+ """ Ghetto wrapper to workaround python same_slots_added() and
+ socket __base__ braindamage """
+ pass
+
+class SmartSocket(_SocketWrapper):
+ """ A SmartSocket is a socket that tracks global socket creation
+ for local ports. It has a member StreamSelector that can
+ be used as a PathBuilder stream StreamSelector (see below).
+
+ Most users will want to reset the base class of SocksiPy to
+ use this class:
+ __oldsocket = socket.socket
+ socket.socket = PathSupport.SmartSocket
+ import SocksiPy
+ socket.socket = __oldsocket
+ """
+ port_table = sets.Set()
+ _table_lock = threading.Lock()
+
+ def connect(self, args):
+ ret = super(SmartSocket, self).connect(args)
+ myaddr = self.getsockname()
+ self.__local_addr = myaddr[0]+":"+str(myaddr[1])
+ SmartSocket._table_lock.acquire()
+ assert(self.__local_addr not in SmartSocket.port_table)
+ SmartSocket.port_table.add(myaddr[0]+":"+str(myaddr[1]))
+ SmartSocket._table_lock.release()
+ plog("DEBUG", "Added "+self.__local_addr+" to our local port list")
+ return ret
+
+ def connect_ex(self, args):
+ ret = super(SmartSocket, self).connect_ex(args)
+ myaddr = ret.getsockname()
+ self.__local_addr = myaddr[0]+":"+str(myaddr[1])
+ SmartSocket._table_lock.acquire()
+ assert(self.__local_addr not in SmartSocket.port_table)
+ SmartSocket.port_table.add(myaddr[0]+":"+str(myaddr[1]))
+ SmartSocket._table_lock.release()
+ plog("DEBUG", "Added "+self.__local_addr+" to our local port list")
+ return ret
+
+ def __del__(self):
+ SmartSocket._table_lock.acquire()
+ SmartSocket.port_table.remove(self.__local_addr)
+ SmartSocket._table_lock.release()
+ plog("DEBUG", "Removed "+self.__local_addr+" from our local port list")
+
+ def table_size():
+ SmartSocket._table_lock.acquire()
+ ret = len(SmartSocket.port_table)
+ SmartSocket._table_lock.release()
+ return ret
+ table_size = Callable(table_size)
+
+ def clear_port_table():
+ """ WARNING: Calling this periodically is a *really good idea*.
+ Relying on __del__ can expose you to race conditions on garbage
+ collection between your processes. """
+ SmartSocket._table_lock.acquire()
+ for i in list(SmartSocket.port_table):
+ plog("DEBUG", "Cleared "+i+" from our local port list")
+ SmartSocket.port_table.remove(i)
+ SmartSocket._table_lock.release()
+ clear_port_table = Callable(clear_port_table)
+
+ def StreamSelector(host, port):
+ to_test = host+":"+str(port)
+ SmartSocket._table_lock.acquire()
+ ret = (to_test in SmartSocket.port_table)
+ SmartSocket._table_lock.release()
+ return ret
+ StreamSelector = Callable(StreamSelector)
+
+
+def StreamSelector(host, port):
+ """ A StreamSelector is a function that takes a host and a port as
+ arguments (parsed from Tor's SOURCE_ADDR field in STREAM NEW
+ events) and decides if it is a stream from this process or not.
+
+ This StreamSelector is just a placeholder that always returns True.
+ When you define your own, be aware that you MUST DO YOUR OWN
+ LOCKING inside this function, as it is called from the Eventhandler
+ thread.
+
+ See PathSupport.SmartSocket.StreamSelctor for an actual
+ implementation.
+
+ """
+ return True
+
# TODO: Make passive "PathWatcher" so people can get aggregate
# node reliability stats for normal usage without us attaching streams
# Can use __metaclass__ and type
@@ -1291,7 +1386,8 @@
schedule_* functions to schedule work to be done in the thread
of the EventHandler.
"""
- def __init__(self, c, selmgr, RouterClass=TorCtl.Router):
+ def __init__(self, c, selmgr, RouterClass=TorCtl.Router,
+ strm_selector=StreamSelector):
"""Constructor. 'c' is a Connection, 'selmgr' is a SelectionManager,
and 'RouterClass' is a class that inherits from Router and is used
to create annotated Routers."""
@@ -1308,6 +1404,7 @@
self.low_prio_jobs = Queue.Queue()
self.run_all_jobs = False
self.do_reconfigure = False
+ self.strm_selector = strm_selector
plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(self.ns_map))+" routers")
def schedule_immediate(self, job):
@@ -1542,11 +1639,12 @@
if s.reason: output.append("REASON=" + s.reason)
if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
if s.purpose: output.append("PURPOSE=" + s.purpose)
+ if s.source_addr: output.append("SOURCE_ADDR="+s.source_addr)
plog("DEBUG", " ".join(output))
if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
s.target_host = "255.255.255.255" # ignore DNS for exit policy check
- # Hack to ignore Tor-handled streams (Currently only directory streams)
+ # Hack to ignore Tor-handled streams
if s.strm_id in self.streams and self.streams[s.strm_id].ignored:
if s.status == "CLOSED":
plog("DEBUG", "Deleting ignored stream: " + str(s.strm_id))
@@ -1563,11 +1661,19 @@
if s.circ_id == 0:
self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, s.status)
# Remember Tor-handled streams (Currently only directory streams)
+
if s.purpose and s.purpose.find("DIR_") == 0:
self.streams[s.strm_id].ignored = True
plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
return
- elif s.circ_id == 0:
+ elif s.source_addr:
+ src_addr = s.source_addr.split(":")
+ src_addr[1] = int(src_addr[1])
+ if not self.strm_selector(*src_addr):
+ self.streams[s.strm_id].ignored = True
+ plog("INFO", "Ignoring foreign stream: " + str(s.strm_id))
+ return
+ if s.circ_id == 0:
self.attach_stream_any(self.streams[s.strm_id],
self.streams[s.strm_id].detached_from)
elif s.status == "DETACHED":
@@ -1583,9 +1689,12 @@
plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit with reason: "+str(s.reason))
else:
self.streams[s.strm_id].detached_from.append(s.circ_id)
-
- if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
- self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+
+ if self.streams[s.strm_id].pending_circ and \
+ self.streams[s.strm_id] in \
+ self.streams[s.strm_id].pending_circ.pending_streams:
+ self.streams[s.strm_id].pending_circ.pending_streams.remove(
+ self.streams[s.strm_id])
self.streams[s.strm_id].pending_circ = None
self.attach_stream_any(self.streams[s.strm_id],
self.streams[s.strm_id].detached_from)
Modified: torctl/trunk/python/TorCtl/ScanSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/ScanSupport.py 2009-09-09 08:00:16 UTC (rev 20511)
+++ torctl/trunk/python/TorCtl/ScanSupport.py 2009-09-09 08:15:07 UTC (rev 20512)
@@ -245,4 +245,3 @@
plog("INFO", "Consensus OK")
-
Modified: torctl/trunk/python/TorCtl/StatsSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/StatsSupport.py 2009-09-09 08:00:16 UTC (rev 20511)
+++ torctl/trunk/python/TorCtl/StatsSupport.py 2009-09-09 08:15:07 UTC (rev 20512)
@@ -192,14 +192,14 @@
def __init__(self, router): # Promotion constructor :)
"""'Promotion Constructor' that converts a Router directly into a
StatsRouter without a copy."""
- # TODO: Use __metaclass__ and type to do this instead?
+ # TODO: Use __bases__ to do this instead?
self.__dict__ = router.__dict__
self.reset()
# StatsRouters should not be destroyed when Tor forgets about them
# Give them an extra refcount:
self.refcount += 1
plog("DEBUG", "Stats refcount "+str(self.refcount)+" for "+self.idhex)
-
+
def reset(self):
"Reset all stats on this Router"
self.circ_uncounted = 0
More information about the tor-commits
mailing list