[or-cvs] r19413: {torctl} Fix lots of SQLAlchemy weirdness (including some rather anno (torctl/trunk/python/TorCtl)

mikeperry at seul.org mikeperry at seul.org
Mon May 4 06:57:13 UTC 2009


Author: mikeperry
Date: 2009-05-04 02:57:13 -0400 (Mon, 04 May 2009)
New Revision: 19413

Modified:
   torctl/trunk/python/TorCtl/SQLSupport.py
Log:

Fix lots of SQLAlchemy weirdness (including some rather
annoying 0.4 to 0.5 breakage) and a couple of stream handling
bugs.



Modified: torctl/trunk/python/TorCtl/SQLSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/SQLSupport.py	2009-05-03 15:17:59 UTC (rev 19412)
+++ torctl/trunk/python/TorCtl/SQLSupport.py	2009-05-04 06:57:13 UTC (rev 19413)
@@ -19,8 +19,10 @@
 from TorUtil import meta_port, meta_host, control_port, control_host, control_pass
 from TorCtl import EVENT_TYPE, EVENT_STATE, TorCtlError
 
+import sqlalchemy
 from sqlalchemy.orm import scoped_session, sessionmaker, eagerload, lazyload, eagerload_all
-from sqlalchemy import create_engine, and_, or_, not_
+from sqlalchemy import create_engine, and_, or_, not_, func
+from sqlalchemy.sql import func,select
 from sqlalchemy.schema import ThreadLocalMetaData,MetaData
 from elixir import *
 
@@ -28,7 +30,7 @@
 # for higher-valued nodes
 MIN_RATIO=0.5
 
-NO_FPE=2**-20
+NO_FPE=2**-50
 
 #################### Model #######################
 
@@ -40,12 +42,13 @@
 tc_metadata.echo=False
 tc_session = scoped_session(sessionmaker(autoflush=True))
 
-def setup_db(db_uri):
+def setup_db(db_uri, drop=False):
   tc_engine = create_engine(db_uri, echo=False)
   tc_metadata.bind = tc_engine
   tc_metadata.echo = False
 
   setup_all()
+  if drop: drop_all()
   create_all()
 
 class Router(Entity):
@@ -54,9 +57,9 @@
   idhex = Field(CHAR(40), primary_key=True, index=True)
   orhash = Field(CHAR(27))
   published = Field(Time)
-  nick = Field(Text)
+  nickname = Field(Text)
 
-  OS = Field(Text)
+  os = Field(Text)
   rate_limited = Field(Boolean)
   guard = Field(Boolean)
   exit = Field(Boolean)
@@ -80,8 +83,8 @@
     self.bw = router.bw
     self.idhex = router.idhex
     self.orhash = router.orhash
-    self.nick = router.nickname
-    self.OS = router.os
+    self.nickname = router.nickname
+    self.os = router.os
     self.rate_limited = router.rate_limited
     self.guard = "Guard" in router.flags
     self.exit = "Exit" in router.flags
@@ -158,13 +161,14 @@
   using_mapper_options(save_on_init=False)
   tgt_host = Field(Text)
   tgt_port = Field(Integer)
-  circuit = ManyToOne('Circuit')
-  detached_circuits = ManyToMany('Circuit')
+  circuit = ManyToOne('Circuit', inverse='streams')
+  detached_circuits = ManyToMany('Circuit', inverse='detatched_streams')
   ignored = Field(Boolean) # Directory streams
   strm_id = Field(Integer, index=True)
   start_time = Field(Float)
   tot_read_bytes = Field(Integer)
   tot_write_bytes = Field(Integer)
+  init_status = Field(Text)
   close_reason = Field(Text) # Shared by Failed and Closed. Unused here.
 
 class FailedStream(Stream):
@@ -259,10 +263,12 @@
       if rs.circ_try_to+rs.circ_try_from > 0:
         rs.circ_bi_rate = (1.0*rs.circ_fail_to+rs.circ_fail_from)/(rs.circ_try_to+rs.circ_try_from)
 
-      tc_session.update(rs)
+      tc_session.add(rs)
+    tc_session.commit()
   _compute_stats_relation = Callable(_compute_stats_relation)
 
   def _compute_stats_query(stats_clause):
+    tc_session.clear()
     # http://www.sqlalchemy.org/docs/04/sqlexpression.html#sql_update
     to_s = select([func.count(Extension.id)], 
         and_(stats_clause, Extension.table.c.to_node_idhex
@@ -289,21 +295,21 @@
        RouterStats.table.c.circ_fail_from:f_from_s,
        RouterStats.table.c.avg_first_ext:avg_ext}).execute()
 
-    RouterStats.table.update(values=
-      {RouterStats.table.c.circ_from_rate :
+    RouterStats.table.update(stats_clause, values=
+      {RouterStats.table.c.circ_from_rate:
          RouterStats.table.c.circ_fail_from/RouterStats.table.c.circ_try_from,
-       RouterStats.table.c.circ_to_rate :
-         RouterStats.table.c.circ_fail_to/RouterStats.table.c.circ_try_to,
-       RouterStats.table.c.circ_bi_rate :
+       RouterStats.table.c.circ_to_rate:
+          RouterStats.table.c.circ_fail_to/RouterStats.table.c.circ_try_to,
+       RouterStats.table.c.circ_bi_rate:
          (RouterStats.table.c.circ_fail_to+RouterStats.table.c.circ_fail_from)
                           /
-         (RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from)}).execute()
+      (RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from)}).execute()
 
-    tc_session.clear()
 
     # TODO: Give the streams relation table a sane name and reduce this too
-    for rs in RouterStats.query.options(eagerload('router'), 
-                        eagerload('router.streams')).all():
+    for rs in RouterStats.query.filter(stats_clause).\
+                        options(eagerload('router'), 
+                                eagerload('router.streams')).all():
       tot_bw = 0.0
       s_cnt = 0
       for s in rs.router.streams:
@@ -311,7 +317,9 @@
           tot_bw += s.read_bandwidth
           s_cnt += 1
       if s_cnt > 0: rs.sbw = tot_bw/s_cnt
-      tc_session.update(rs)
+      else: rs.sbw = None
+      tc_session.add(rs)
+    tc_session.commit()
   _compute_stats_query = Callable(_compute_stats_query)
 
   def _compute_stats(stats_clause):
@@ -320,6 +328,7 @@
   _compute_stats = Callable(_compute_stats)
 
   def _compute_ranks():
+    tc_session.clear()
     min_r = select([func.min(BwHistory.rank)], 
         BwHistory.table.c.router_idhex
             == RouterStats.table.c.router_idhex).as_scalar()
@@ -339,20 +348,17 @@
         RouterStats.table.c.max_rank:max_r,
         RouterStats.table.c.avg_bw:avg_bw}).execute()
 
-    min_avg_rank = select([func.min(RouterStats.avg_rank)]).as_scalar()
+    #min_avg_rank = select([func.min(RouterStats.avg_rank)]).as_scalar()
     max_avg_rank = select([func.max(RouterStats.avg_rank)]).as_scalar()
 
-    RouterStats.query.filter('1=1').min(RouterStats.avg_rank)
-    max_avg_rank = RouterStats.query.filter('1=1').max(RouterStats.avg_rank)
-
     RouterStats.table.update(values=
        {RouterStats.table.c.percentile:
-            (100.0*rs.avg_rank)/max_avg_rank}).execute()
-    
-    tc_session.clear()
+            (100.0*RouterStats.table.c.avg_rank)/max_avg_rank}).execute()
+    tc_session.commit()
   _compute_ranks = Callable(_compute_ranks)
 
   def _compute_ratios(stats_clause):
+    tc_session.clear()
     avg_from_rate = select([func.avg(RouterStats.circ_from_rate)],
                            stats_clause).as_scalar()
     avg_to_rate = select([func.avg(RouterStats.circ_to_rate)],
@@ -364,18 +370,18 @@
     avg_sbw = select([func.avg(RouterStats.sbw)],
                            stats_clause).as_scalar()
 
-    RouterStats.update(stats_clause, values=
+    RouterStats.table.update(stats_clause, values=
        {RouterStats.table.c.circ_from_ratio:
          (1-RouterStats.table.c.circ_from_rate)/(1-avg_from_rate),
         RouterStats.table.c.circ_to_ratio:
          (1-RouterStats.table.c.circ_to_rate)/(1-avg_to_rate),
         RouterStats.table.c.circ_bi_ratio:
          (1-RouterStats.table.c.circ_bi_rate)/(1-avg_bi_rate),
-        RouterStats.table.c.avg_first_ext:
+        RouterStats.table.c.ext_ratio:
          (RouterStats.table.c.avg_first_ext)/(avg_ext),
         RouterStats.table.c.sbw_ratio:
          (RouterStats.table.c.sbw)/(avg_sbw)})
-    tc_session.clear()
+    tc_session.commit()
   _compute_ratios = Callable(_compute_ratios)
 
   def _compute_filtered_query(min_ratio): # broken.. don't use.
@@ -393,6 +399,7 @@
     avg_sbw = RouterStats.query.filter('1=1').avg(RouterStats.filt_sbw)
     for rs in RouterStats.query.all():
       rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw
+    tc_session.commit()
   _compute_filtered_query = Callable(_compute_filtered_query)
 
   def _compute_filtered_relational(min_ratio, stats_clause, filter_clause):
@@ -401,8 +408,7 @@
 
     # TODO: Turn this into a single query....
     for rs in RouterStats.query.filter(stats_clause).\
-          options(eagerload_all('router.streams.circuit.routers')).\
-             all():
+          options(eagerload_all('router.streams.circuit.routers')).all():
       tot_sbw = 0
       sbw_cnt = 0
       for s in rs.router.streams:
@@ -415,12 +421,20 @@
           if not skip:
             tot_sbw += s.read_bandwidth   
             sbw_cnt += 1
-      rs.filt_sbw = tot_sbw/sbw_cnt
-      tc_session.update(rs)
-    avg_sbw = RouterStats.query.filter(stats_clause).avg(RouterStats.filt_sbw)
+      if sbw_cnt: rs.filt_sbw = tot_sbw/sbw_cnt
+      else: rs.filt_sbw = None
+      tc_session.add(rs)
+    if sqlalchemy.__version__ < "0.5.0":
+      avg_sbw = RouterStats.query.filter(stats_clause).avg(RouterStats.filt_sbw)
+    else:
+      avg_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(stats_clause).scalar()
     for rs in RouterStats.query.filter(stats_clause).all():
-      rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw
-      tc_session.update(rs)
+      if type(rs.filt_sbw) == float and avg_sbw:
+        rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw
+      else:
+        rs.filt_sbw_ratio = None
+      tc_session.add(rs)
+    tc_session.commit()
   _compute_filtered_relational = Callable(_compute_filtered_relational)
 
   def _compute_filtered_ratios(min_ratio, stats_clause, filter_clause):
@@ -436,7 +450,8 @@
       rs = RouterStats()
       rs.router = r
       r.stats = rs
-      tc_session.update(r)
+      tc_session.add(r)
+    tc_session.clear()
     tc_session.commit()
   reset = Callable(reset)
 
@@ -457,12 +472,7 @@
   compute = Callable(compute)  
 
   def write_stats(f, pct_low=0, pct_high=100, order_by=None, recompute=False, stat_clause=None, filter_clause=None):
-    ratio_key = """SQLSupport Statistics:
-    SR=Stream avg ratio     AR=Advertised bw ratio    BRR=Adv. bw avg ratio
-    CSR=Circ suspect ratio  CFR=Circ Fail Ratio       SSR=Stream suspect ratio
-    SFR=Stream fail ratio   CC=Circuit Count          SC=Stream Count
-    P=Percentile Rank       U=Uptime (h)\n"""
- 
+
     if not order_by:
       order_by=RouterStats.avg_first_ext
 
@@ -472,34 +482,57 @@
     pct_clause = and_(RouterStats.percentile >= pct_low, 
                          RouterStats.percentile < pct_high)
 
-    circ_from_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_from_rate)
-    circ_to_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_to_rate)
-    circ_bi_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_bi_rate)
+    # This is Fail City and sqlalchemy is running for mayor.
+    if sqlalchemy.__version__ < "0.5.0":
+      circ_from_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_from_rate)
+      circ_to_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_to_rate)
+      circ_bi_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_bi_rate)
 
-    avg_first_ext = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.avg_first_ext)
-    sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.sbw)
-    filt_sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.filt_sbw)
-    percentile = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.percentile) 
+      avg_first_ext = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.avg_first_ext)
+      sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.sbw)
+      filt_sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.filt_sbw)
+      percentile = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.percentile)
+    else:
+      circ_from_rate = tc_session.query(func.avg(RouterStats.circ_from_rate)).filter(pct_clause).filter(stat_clause).scalar()
+      circ_to_rate = tc_session.query(func.avg(RouterStats.circ_to_rate)).filter(pct_clause).filter(stat_clause).scalar()
+      circ_bi_rate = tc_session.query(func.avg(RouterStats.circ_bi_rate)).filter(pct_clause).filter(stat_clause).scalar()
+      
+      avg_first_ext = tc_session.query(func.avg(RouterStats.avg_first_ext)).filter(pct_clause).filter(stat_clause).scalar()
+      sbw = tc_session.query(func.avg(RouterStats.sbw)).filter(pct_clause).filter(stat_clause).scalar()
+      filt_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(pct_clause).filter(stat_clause).scalar()
+      percentile = tc_session.query(func.avg(RouterStats.percentile)).filter(pct_clause).filter(stat_clause).scalar()
 
+    def cvt(a,b):
+      if type(a) == float: return round(a,b)
+      elif type(a) == type(None): return "None"
+      else: return type(a) 
+
+    sql_key = """SQLSupport Statistics:
+    CFR=Circ From Rate         CTR=Circ To Rate     CBR=Circ To/From Rate
+    CFE=Avg 1st Ext time (s)   SBW=Avg Stream BW    FBW=Filtered stream bw
+    PR=Percentile Rank\n\n"""
+ 
+    f.write(sql_key)
     f.write("Average Statistics:\n")
-    f.write(" CFR="+str(round(circ_from_rate,2))+"\n")
-    f.write(" CTR="+str(round(circ_to_rate,2))+"\n")
-    f.write(" CBR="+str(round(circ_bi_rate,2))+"\n")
-    f.write(" CFE="+str(round(avg_first_ext,2))+"\n")
-    f.write(" SBW="+str(round(sbw,2))+"\n")
-    f.write(" FBW="+str(round(filt_sbw,2))+"\n")
-    f.write(" PR="+str(round(percentile,2))+"\n\n")
+    f.write(" CFR="+str(cvt(circ_from_rate,2))+"\n")
+    f.write(" CTR="+str(cvt(circ_to_rate,2))+"\n")
+    f.write(" CBR="+str(cvt(circ_bi_rate,2))+"\n")
+    f.write(" CFE="+str(cvt(avg_first_ext,2))+"\n")
+    f.write(" SBW="+str(cvt(sbw,2))+"\n")
+    f.write(" FBW="+str(cvt(filt_sbw,2))+"\n")
+    f.write(" PR="+str(cvt(percentile,2))+"\n\n")
 
     for s in RouterStats.query.filter(pct_clause).filter(stat_clause).\
            order_by(order_by).all():
       f.write(s.router.idhex+"="+s.router.nickname+"\n")
-      f.write(" CFR="+str(round(s.circ_from_rate,2))+" ")
-      f.write(" CTR="+str(round(s.circ_to_rate,2))+" ")
-      f.write(" CBR="+str(round(s.circ_bi_rate,2))+" ")
-      f.write(" CFE="+str(round(s.avg_first_ext,2))+" ")
-      f.write(" SBW="+str(round(s.sbw,2))+" ")
-      f.write(" FBW="+str(round(s.filt_sbw,2))+" ")
-      f.write(" PR="+str(round(s.percentile,1))+"\n")
+      f.write(" CFR="+str(cvt(s.circ_from_rate,2))+" ")
+      f.write(" CTR="+str(cvt(s.circ_to_rate,2))+" ")
+      f.write(" CBR="+str(cvt(s.circ_bi_rate,2))+" ")
+      f.write(" CFE="+str(cvt(s.avg_first_ext,2))+" ")
+      f.write(" SBW="+str(cvt(s.sbw,2))+" ")
+      f.write(" FBW="+str(cvt(s.filt_sbw,2))+" ")
+      f.write(" PR="+str(cvt(s.percentile,1))+"\n")
+    f.flush()
   write_stats = Callable(write_stats)  
     
 
@@ -513,7 +546,7 @@
     r.circuits = []
     r.streams = []
     r.stats = None
-    tc_session.update(r)
+    tc_session.add(r)
 
   BwHistory.table.drop() # Will drop subclasses
   Extension.table.drop()
@@ -534,7 +567,7 @@
 class ConsensusTrackerListener(TorCtl.DualEventListener):
   def __init__(self):
     TorCtl.DualEventListener.__init__(self)
-    self.last_desc_at = time.time()
+    self.last_desc_at = time.time()-10.0
     self.consensus = None
 
   # TODO: What about non-running routers and uptime information?
@@ -547,8 +580,8 @@
       bwh = BwHistory(router=r, rank=rc.list_rank, bw=rc.bw, 
                       pub_time=r.published)
       r.bw_history.append(bwh)
-      tc_session.save_or_update(bwh)
-      tc_session.update(r)
+      tc_session.add(bwh)
+      tc_session.add(r)
     tc_session.commit()
  
   def _update_db(self, idlist):
@@ -564,7 +597,7 @@
         if not r: r = Router()
  
         r.from_router(rc)
-        tc_session.save_or_update(r)
+        tc_session.add(r)
     tc_session.commit()
 
   def update_consensus(self):
@@ -587,12 +620,13 @@
         if not OP:
           OP = Router(idhex="0000000000000000000000000000000000000000", 
                     orhash="000000000000000000000000000", 
-                    nick="!!TorClient", published=datetime.datetime.utcnow())
-          tc_session.save_or_update(OP)
+                    nickname="!!TorClient", 
+                    published=datetime.datetime.utcnow())
+          tc_session.add(OP)
           tc_session.commit()
         self.update_consensus()
       # So ghetto
-      if e.arrived_at - self.last_desc_at > 20.0:
+      if e.arrived_at - self.last_desc_at > 30.0:
         plog("INFO", "Newdesc timer is up. Assuming we have full consensus now")
         self.last_desc_at = 0x7fffffff
         self._update_rank_history(self.consensus.ns_map.iterkeys())
@@ -624,7 +658,7 @@
       self.track_parent = False
 
   def circ_status_event(self, c):
-    if self.track_parent and c.cird_id not in self.parent_handler.circuits:
+    if self.track_parent and c.circ_id not in self.parent_handler.circuits:
       return # Ignore circuits that aren't ours
     # TODO: Hrmm, consider making this sane in TorCtl.
     if c.reason: lreason = c.reason
@@ -646,17 +680,17 @@
         for r in self.parent_handler.circuits[c.circ_id].path:
           rq = Router.query.options(eagerload('circuits')).filter_by(
                                 idhex=r.idhex).one()
-          circ.routers.append(rq)
-          rq.circuits.append(circ)
-          tc_session.update(rq)
-      tc_session.save_or_update(circ)
+          circ.routers.append(rq) 
+          #rq.circuits.append(circ) # done automagically?
+          #tc_session.add(rq)
+      tc_session.add(circ)
       tc_session.commit()
     elif c.status == "EXTENDED":
       circ = Circuit.query.options(eagerload('extensions')).filter_by(
                        circ_id = c.circ_id).first()
       if not circ: return # Skip circuits from before we came online
 
-      e = Extension(circ=circ, hop=len(c.path), time=c.arrived_at)
+      e = Extension(circ=circ, hop=len(c.path)-1, time=c.arrived_at)
 
       if len(c.path) == 1:
         e.from_node = OP
@@ -673,13 +707,13 @@
         # FIXME: Eager load here?
         circ.routers.append(e.to_node)
         e.to_node.circuits.append(circ)
-        tc_session.update(e.to_node)
+        tc_session.add(e.to_node)
  
       e.delta = c.arrived_at - circ.last_extend
       circ.last_extend = c.arrived_at
       circ.extensions.append(e)
-      tc_session.save_or_update(e)
-      tc_session.update(circ)
+      tc_session.add(e)
+      tc_session.add(circ)
       tc_session.commit()
     elif c.status == "FAILED":
       circ = Circuit.query.filter_by(circ_id = c.circ_id).first()
@@ -701,7 +735,7 @@
                   eagerload('extensions')).filter_by(id=circ.id).one()
         circ.fail_reason = reason
         circ.fail_time = c.arrived_at
-        e = FailedExtension(circ=circ, hop=len(c.path)+1, time=c.arrived_at)
+        e = FailedExtension(circ=circ, hop=len(c.path), time=c.arrived_at)
 
         if len(c.path) == 0:
           e.from_node = OP
@@ -712,7 +746,7 @@
           e.from_node = Router.query.filter_by(idhex=r_ext[1:]).one()
 
         if self.track_parent:
-          r=self.parent_handler.circuits[c.circ_id].path[len(c.path)+1]
+          r=self.parent_handler.circuits[c.circ_id].path[len(c.path)]
           e.to_node = Router.query.filter_by(idhex=r.idhex).one()
         else:
           e.to_node = None # We have no idea..
@@ -721,9 +755,9 @@
         e.reason = reason
         circ.extensions.append(e)
         circ.fail_time = c.arrived_at
-        tc_session.save_or_update(e)
+        tc_session.add(e)
 
-      tc_session.save_or_update(circ)
+      tc_session.add(circ)
       tc_session.commit()
     elif c.status == "BUILT":
       circ = Circuit.query.filter_by(
@@ -738,7 +772,7 @@
       
       circ.built_time = c.arrived_at
       circ.tot_delta = c.arrived_at - circ.launch_time
-      tc_session.save_or_update(circ)
+      tc_session.add(circ)
       tc_session.commit()
     elif c.status == "CLOSED":
       circ = BuiltCircuit.query.filter_by(circ_id = c.circ_id).first()
@@ -757,16 +791,17 @@
           circ = DestroyedCircuit.query.filter_by(id=circ.id).one()
           circ.destroy_reason = reason
           circ.destroy_time = c.arrived_at
-        tc_session.save_or_update(circ)
+        tc_session.add(circ)
         tc_session.commit()
 
 class StreamListener(CircuitListener):
   def stream_bw_event(self, s):
     strm = Stream.query.filter_by(strm_id = s.strm_id).first()
     if strm:
+      plog("DEBUG", "Got stream bw: "+str(s.strm_id))
       strm.tot_read_bytes += s.bytes_read
       strm.tot_write_bytes += s.bytes_written
-      tc_session.update(strm)
+      tc_session.add(strm)
       tc_session.commit()
  
   def stream_status_event(self, s):
@@ -774,22 +809,29 @@
     else: lreason = "NONE"
     if s.remote_reason: rreason = s.remote_reason
     else: rreason = "NONE"
-    reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
 
     if s.status in ("NEW", "NEWRESOLVE"):
       strm = Stream(strm_id=s.strm_id, tgt_host=s.target_host, 
                     tgt_port=s.target_port, init_status=s.status,
                     tot_read_bytes=0, tot_write_bytes=0)
-      tc_session.save(strm)
+      tc_session.add(strm)
       tc_session.commit()
       return
 
     strm = Stream.query.filter_by(strm_id = s.strm_id).first()
+    if self.track_parent and s.strm_id not in self.parent_handler.streams:
+      if strm:
+        tc_session.delete(strm)
+        tc_session.commit()
+      return # Ignore streams that aren't ours
+
     if not strm: 
       plog("NOTICE", "Ignoring prior stream "+str(s.strm_id))
       return # Ignore prior streams
 
-    if s.statis == "SENTCONNECT":
+    reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+strm.init_status
+
+    if s.status == "SENTCONNECT":
       # New circuit
       strm.circuit = Circuit.query.filter_by(circ_id=s.circ_id).first()
       if not strm.circuit:
@@ -811,7 +853,7 @@
         plog("WARN", "No circuit for "+str(s.strm_id)+" circ: "+str(s.circ_id))
 
       if not strm.circuit:
-        plog("WARN", "No stream circuit for "+str(s.strm_id)+" circ: "+str(s.circ_id))
+        plog("INFO", "No stream circuit for "+str(s.strm_id)+" circ: "+str(s.circ_id))
         strm.circuit = circ
 
       # XXX: Verify circ id matches stream.circ
@@ -819,18 +861,20 @@
     if s.status == "SUCCEEDED":
       strm.start_time = s.arrived_at
       for r in strm.circuit.routers: 
-        r.streams.add(strm)
-        tc_session.update(r)
-      tc_session.update(strm)
+        plog("DEBUG", "Added router "+r.idhex+" to stream "+str(s.strm_id))
+        r.streams.append(strm)
+        tc_session.add(r)
+      tc_session.add(strm)
       tc_session.commit()
     elif s.status == "DETACHED":
-      strm.detached_circuits.append(circ)
+      for r in strm.circuit.routers:
+        r.detached_streams.append(strm)
+        tc_session.add(r)
+      #strm.detached_circuits.append(strm.circuit)
       strm.circuit.detached_streams.append(strm)
-      for r in strm.circuit.routers: 
-        r.detached_streams.add(strm)
-        tc_session.update(r)
-      tc_session.update(circ)
-      tc_session.update(strm)
+      strm.circuit.streams.remove(strm)
+      strm.circuit = None
+      tc_session.add(strm)
       tc_session.commit()
     elif s.status == "FAILED":
       strm.expunge()
@@ -840,7 +884,7 @@
       strm = FailedStream.query.filter_by(id=strm.id).one()
       strm.fail_time = s.arrived_at
       strm.fail_reason = reason
-      tc_session.update(strm)
+      tc_session.add(strm)
       tc_session.commit()
     elif s.status == "CLOSED":
       if isinstance(strm, FailedStream):
@@ -854,7 +898,6 @@
           strm = FailedStream.query.filter_by(id=strm.id).one()
           strm.fail_time = s.arrived_at
         else: 
-          strm.expunge()
           # Convert to destroyed circuit
           Stream.table.update(Stream.id ==
                     strm.id).execute(row_type='closedstream')
@@ -863,7 +906,7 @@
           strm.write_bandwidth = strm.tot_write_bytes/(s.arrived_at-strm.start_time)
           strm.end_time = s.arrived_at
         strm.close_reason = reason
-      tc_session.update(strm)
+      tc_session.add(strm)
       tc_session.commit()
 
 def run_example(host, port):



More information about the tor-commits mailing list