[or-cvs] r19576: {torctl} Add support for parsing w line in NS dox, create RankRestric (torctl/trunk/python/TorCtl)

mikeperry at seul.org mikeperry at seul.org
Thu May 28 11:50:55 UTC 2009


Author: mikeperry
Date: 2009-05-28 07:50:55 -0400 (Thu, 28 May 2009)
New Revision: 19576

Modified:
   torctl/trunk/python/TorCtl/PathSupport.py
   torctl/trunk/python/TorCtl/SQLSupport.py
   torctl/trunk/python/TorCtl/TorCtl.py
Log:

Add support for parsing w line in NS dox, create
RankRestriction (currently unused), add average router bw to
bws stats.



Modified: torctl/trunk/python/TorCtl/PathSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/PathSupport.py	2009-05-28 11:49:06 UTC (rev 19575)
+++ torctl/trunk/python/TorCtl/PathSupport.py	2009-05-28 11:50:55 UTC (rev 19576)
@@ -222,7 +222,23 @@
 
   def __str__(self):
     return self.__class__.__name__+"("+str(self.pct_skip)+","+str(self.pct_fast)+")"
+
+class RankRestriction(NodeRestriction):
+  """Restriction to cut out a list-rank slice of the network."""
+  def __init__(self, rank_skip, rank_stop):
+    self.rank_skip = rank_skip
+    self.rank_stop = rank_stop
+
+  def r_is_ok(self, r):
+    "Returns true if r is in the boundaries (by rank)"
+    if r.list_rank < self.rank_skip: return False
+    elif r.list_rank > self.rank_stop: return False
     
+    return True
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.rank_skip)+","+str(self.rank_stop)+")"
+    
 class OSRestriction(NodeRestriction):
   "Restriction based on operating system"
   def __init__(self, ok, bad=[]):
@@ -1281,6 +1297,18 @@
       for g in xrange(0, len(r._generated)):
         r._generated[g] = 0
 
+  def is_urgent_event(event):
+    # If event is stream:NEW*/DETACHED or circ BUILT/FAILED, 
+    # it is high priority and requires immediate action.
+    if isinstance(event, TorCtl.CircuitEvent):
+      if event.status in ("BUILT", "FAILED", "CLOSED"):
+        return True
+    elif isinstance(event, TorCtl.StreamEvent):
+      if event.status in ("NEW", "NEWRESOLVE", "DETACHED"):
+        return True
+    return False
+  is_urgent_event = Callable(is_urgent_event)
+
   def schedule_selmgr(self, job):
     """
     Schedules an immediate job to be run before the next event is
@@ -1311,16 +1339,11 @@
         imm_job = self.low_prio_jobs.get_nowait()
         imm_job(self)
       return
-    
+
     # If event is stream:NEW*/DETACHED or circ BUILT/FAILED, 
     # don't run low prio jobs.. No need to delay streams for them.
-    if isinstance(event, TorCtl.CircuitEvent):
-      if event.status in ("BUILT", "FAILED"):
-        return
-    elif isinstance(event, TorCtl.StreamEvent):
-      if event.status in ("NEW", "NEWRESOLVE", "DETACHED"):
-        return
-    
+    if PathBuilder.is_urgent_event(event): return
+   
     # Do the low prio jobs one at a time in case a 
     # higher priority event is queued   
     if not self.low_prio_jobs.empty():

Modified: torctl/trunk/python/TorCtl/SQLSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/SQLSupport.py	2009-05-28 11:49:06 UTC (rev 19575)
+++ torctl/trunk/python/TorCtl/SQLSupport.py	2009-05-28 11:50:55 UTC (rev 19576)
@@ -462,7 +462,7 @@
   def reset():
     RouterStats.table.drop()
     RouterStats.table.create()
-    for r in Router.query.all(): # Is this needed?
+    for r in Router.query.all():
       rs = RouterStats()
       rs.router = r
       r.stats = rs
@@ -587,13 +587,18 @@
 
     f.write(str(int(time.time()))+"\n")
 
-    # XXX: print out avg consensus bw too.. Hrmm, but only
-    # during active scan period..
+    def cvt(a,b,c=1):
+      if type(a) == float: return round(a/c,b)
+      elif type(a) == int: return a
+      elif type(a) == type(None): return "None"
+      else: return type(a)
+
     for s in RouterStats.query.filter(pct_clause).filter(stat_clause).\
            order_by(order_by).all():
       f.write("node_id=$"+s.router.idhex+" nick="+s.router.nickname)
-      f.write(" strm_bw="+str(s.sbw))
-      f.write(" filt_bw="+str(s.filt_sbw)+"\n")
+      f.write(" strm_bw="+str(int(cvt(s.sbw,0))))
+      f.write(" filt_bw="+str(int(cvt(s.filt_sbw,0))))
+      f.write(" ns_bw="+str(int(cvt(s.avg_bw,0)))+"\n")
 
     f.flush()
   write_bws = Callable(write_bws)  
@@ -604,12 +609,12 @@
 #################### Model Support ################
 def reset_all():
   # Need to keep routers around.. 
-  for r in Router.query.all():
-    r.bw_history = [] # XXX: Is this sufficient/correct/necessary?
-    r.circuits = []
-    r.streams = []
-    r.stats = None
-    tc_session.add(r)
+  #for r in Router.query.all():
+  #  r.bw_history = [] # XXX: Is this sufficient/correct/necessary?
+  #  r.circuits = []
+  #  r.streams = []
+  #  r.stats = None
+  #  tc_session.add(r)
 
   BwHistory.table.drop() # Will drop subclasses
   Extension.table.drop()
@@ -623,6 +628,7 @@
   Stream.table.create() 
   Circuit.table.create()
 
+  tc_session.clear()
   tc_session.commit()
 
 ##################### End Model Support ####################
@@ -689,11 +695,26 @@
           tc_session.add(OP)
           tc_session.commit()
         self.update_consensus()
-      # So ghetto
+      # XXX: This hack exists because update_rank_history is expensive.
+      # However, even if we delay it till the end of the consensus update, 
+      # it still delays event processing for up to 30 seconds on a fast 
+      # machine.
+      # 
+      # The correct way to do this is give SQL processing
+      # to a dedicated worker thread that pulls events off of a secondary
+      # queue, that way we don't block stream handling on this processing.
+      # The problem is we are pretty heavily burdened with the need to 
+      # stay in sync with our parent event handler. A queue will break this 
+      # coupling (even if we could get all the locking right).
+      #
+      # A lighterweight hack might be to just make the scanners pause
+      # on a condition used to signal we are doing this (and other) heavy 
+      # lifting. We could have them possibly check self.last_desc_at..
       if e.arrived_at - self.last_desc_at > 30.0:
-        plog("INFO", "Newdesc timer is up. Assuming we have full consensus now")
-        self.last_desc_at = 0x7fffffff
-        self._update_rank_history(self.consensus.ns_map.iterkeys())
+        if not PathSupport.PathBuilder.is_urgent_event(e):
+          plog("INFO", "Newdesc timer is up. Assuming we have full consensus")
+          self.last_desc_at = 0x7fffffff
+          self._update_rank_history(self.consensus.ns_map.iterkeys())
 
   def new_consensus_event(self, n):
     if n.state == EVENT_STATE.POSTLISTEN:

Modified: torctl/trunk/python/TorCtl/TorCtl.py
===================================================================
--- torctl/trunk/python/TorCtl/TorCtl.py	2009-05-28 11:49:06 UTC (rev 19575)
+++ torctl/trunk/python/TorCtl/TorCtl.py	2009-05-28 11:50:55 UTC (rev 19576)
@@ -89,7 +89,7 @@
 
 class NetworkStatus:
   "Filled in during NS events"
-  def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags):
+  def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags, bandwidth=None):
     self.nickname = nickname
     self.idhash = idhash
     self.orhash = orhash
@@ -98,6 +98,7 @@
     self.dirport = int(dirport)
     self.flags = flags
     self.idhex = (self.idhash + "=").decode("base64").encode("hex").upper()
+    self.bandwidth = bandwidth
     m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
     self.updated = datetime.datetime(*map(int, m.groups()))
 
@@ -275,10 +276,13 @@
         self.__dict__[i] =  copy.deepcopy(args[0].__dict__[i])
       return
     else:
-      (idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime, published, contact, rate_limited, orhash) = args
+      (idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime, published, contact, rate_limited, orhash, ns_bandwidth) = args
     self.idhex = idhex
     self.nickname = name
-    self.bw = bw
+    if ns_bandwidth != None:
+      self.bw = ns_bandwidth
+    else:
+      self.bw = bw
     self.exitpolicy = exitpolicy
     self.flags = flags # Technicaly from NS doc
     self.down = down
@@ -366,7 +370,7 @@
       plog("INFO", "No version and/or OS for router " + ns.nickname)
     return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy,
         ns.flags, ip, version, os, uptime, published, contact, rate_limited,
-        ns.orhash)
+        ns.orhash, ns.bandwidth)
   build_from_desc = Callable(build_from_desc)
 
   def update_to(self, new):
@@ -917,8 +921,12 @@
     m = re.search(r"^s((?:\s\S*)+)", nsline, re.M)
     flags = m.groups()
     flags = flags[0].strip().split(" ")
-    m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)
-    nslist.append(NetworkStatus(*(m.groups() + (flags,))))
+    m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)    
+    w = re.search(r"^w Bandwidth=(\d+)", nsline, re.M)
+    if w:
+      nslist.append(NetworkStatus(*(m.groups() + (flags,) + int(w.group(0)))))
+    else:
+      nslist.append(NetworkStatus(*(m.groups() + (flags,))))
   return nslist
 
 class EventSink:



More information about the tor-commits mailing list