[or-cvs] r19413: {torctl} Fix lots of SQLAlchemy weirdness (including some rather anno (torctl/trunk/python/TorCtl)
mikeperry at seul.org
mikeperry at seul.org
Mon May 4 06:57:13 UTC 2009
Author: mikeperry
Date: 2009-05-04 02:57:13 -0400 (Mon, 04 May 2009)
New Revision: 19413
Modified:
torctl/trunk/python/TorCtl/SQLSupport.py
Log:
Fix lots of SQLAlchemy weirdness (including some rather
annoying 0.4 to 0.5 breakage) and a couple of stream handling
bugs.
Modified: torctl/trunk/python/TorCtl/SQLSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/SQLSupport.py 2009-05-03 15:17:59 UTC (rev 19412)
+++ torctl/trunk/python/TorCtl/SQLSupport.py 2009-05-04 06:57:13 UTC (rev 19413)
@@ -19,8 +19,10 @@
from TorUtil import meta_port, meta_host, control_port, control_host, control_pass
from TorCtl import EVENT_TYPE, EVENT_STATE, TorCtlError
+import sqlalchemy
from sqlalchemy.orm import scoped_session, sessionmaker, eagerload, lazyload, eagerload_all
-from sqlalchemy import create_engine, and_, or_, not_
+from sqlalchemy import create_engine, and_, or_, not_, func
+from sqlalchemy.sql import func,select
from sqlalchemy.schema import ThreadLocalMetaData,MetaData
from elixir import *
@@ -28,7 +30,7 @@
# for higher-valued nodes
MIN_RATIO=0.5
-NO_FPE=2**-20
+NO_FPE=2**-50
#################### Model #######################
@@ -40,12 +42,13 @@
tc_metadata.echo=False
tc_session = scoped_session(sessionmaker(autoflush=True))
-def setup_db(db_uri):
+def setup_db(db_uri, drop=False):
tc_engine = create_engine(db_uri, echo=False)
tc_metadata.bind = tc_engine
tc_metadata.echo = False
setup_all()
+ if drop: drop_all()
create_all()
class Router(Entity):
@@ -54,9 +57,9 @@
idhex = Field(CHAR(40), primary_key=True, index=True)
orhash = Field(CHAR(27))
published = Field(Time)
- nick = Field(Text)
+ nickname = Field(Text)
- OS = Field(Text)
+ os = Field(Text)
rate_limited = Field(Boolean)
guard = Field(Boolean)
exit = Field(Boolean)
@@ -80,8 +83,8 @@
self.bw = router.bw
self.idhex = router.idhex
self.orhash = router.orhash
- self.nick = router.nickname
- self.OS = router.os
+ self.nickname = router.nickname
+ self.os = router.os
self.rate_limited = router.rate_limited
self.guard = "Guard" in router.flags
self.exit = "Exit" in router.flags
@@ -158,13 +161,14 @@
using_mapper_options(save_on_init=False)
tgt_host = Field(Text)
tgt_port = Field(Integer)
- circuit = ManyToOne('Circuit')
- detached_circuits = ManyToMany('Circuit')
+ circuit = ManyToOne('Circuit', inverse='streams')
+ detached_circuits = ManyToMany('Circuit', inverse='detatched_streams')
ignored = Field(Boolean) # Directory streams
strm_id = Field(Integer, index=True)
start_time = Field(Float)
tot_read_bytes = Field(Integer)
tot_write_bytes = Field(Integer)
+ init_status = Field(Text)
close_reason = Field(Text) # Shared by Failed and Closed. Unused here.
class FailedStream(Stream):
@@ -259,10 +263,12 @@
if rs.circ_try_to+rs.circ_try_from > 0:
rs.circ_bi_rate = (1.0*rs.circ_fail_to+rs.circ_fail_from)/(rs.circ_try_to+rs.circ_try_from)
- tc_session.update(rs)
+ tc_session.add(rs)
+ tc_session.commit()
_compute_stats_relation = Callable(_compute_stats_relation)
def _compute_stats_query(stats_clause):
+ tc_session.clear()
# http://www.sqlalchemy.org/docs/04/sqlexpression.html#sql_update
to_s = select([func.count(Extension.id)],
and_(stats_clause, Extension.table.c.to_node_idhex
@@ -289,21 +295,21 @@
RouterStats.table.c.circ_fail_from:f_from_s,
RouterStats.table.c.avg_first_ext:avg_ext}).execute()
- RouterStats.table.update(values=
- {RouterStats.table.c.circ_from_rate :
+ RouterStats.table.update(stats_clause, values=
+ {RouterStats.table.c.circ_from_rate:
RouterStats.table.c.circ_fail_from/RouterStats.table.c.circ_try_from,
- RouterStats.table.c.circ_to_rate :
- RouterStats.table.c.circ_fail_to/RouterStats.table.c.circ_try_to,
- RouterStats.table.c.circ_bi_rate :
+ RouterStats.table.c.circ_to_rate:
+ RouterStats.table.c.circ_fail_to/RouterStats.table.c.circ_try_to,
+ RouterStats.table.c.circ_bi_rate:
(RouterStats.table.c.circ_fail_to+RouterStats.table.c.circ_fail_from)
/
- (RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from)}).execute()
+ (RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from)}).execute()
- tc_session.clear()
# TODO: Give the streams relation table a sane name and reduce this too
- for rs in RouterStats.query.options(eagerload('router'),
- eagerload('router.streams')).all():
+ for rs in RouterStats.query.filter(stats_clause).\
+ options(eagerload('router'),
+ eagerload('router.streams')).all():
tot_bw = 0.0
s_cnt = 0
for s in rs.router.streams:
@@ -311,7 +317,9 @@
tot_bw += s.read_bandwidth
s_cnt += 1
if s_cnt > 0: rs.sbw = tot_bw/s_cnt
- tc_session.update(rs)
+ else: rs.sbw = None
+ tc_session.add(rs)
+ tc_session.commit()
_compute_stats_query = Callable(_compute_stats_query)
def _compute_stats(stats_clause):
@@ -320,6 +328,7 @@
_compute_stats = Callable(_compute_stats)
def _compute_ranks():
+ tc_session.clear()
min_r = select([func.min(BwHistory.rank)],
BwHistory.table.c.router_idhex
== RouterStats.table.c.router_idhex).as_scalar()
@@ -339,20 +348,17 @@
RouterStats.table.c.max_rank:max_r,
RouterStats.table.c.avg_bw:avg_bw}).execute()
- min_avg_rank = select([func.min(RouterStats.avg_rank)]).as_scalar()
+ #min_avg_rank = select([func.min(RouterStats.avg_rank)]).as_scalar()
max_avg_rank = select([func.max(RouterStats.avg_rank)]).as_scalar()
- RouterStats.query.filter('1=1').min(RouterStats.avg_rank)
- max_avg_rank = RouterStats.query.filter('1=1').max(RouterStats.avg_rank)
-
RouterStats.table.update(values=
{RouterStats.table.c.percentile:
- (100.0*rs.avg_rank)/max_avg_rank}).execute()
-
- tc_session.clear()
+ (100.0*RouterStats.table.c.avg_rank)/max_avg_rank}).execute()
+ tc_session.commit()
_compute_ranks = Callable(_compute_ranks)
def _compute_ratios(stats_clause):
+ tc_session.clear()
avg_from_rate = select([func.avg(RouterStats.circ_from_rate)],
stats_clause).as_scalar()
avg_to_rate = select([func.avg(RouterStats.circ_to_rate)],
@@ -364,18 +370,18 @@
avg_sbw = select([func.avg(RouterStats.sbw)],
stats_clause).as_scalar()
- RouterStats.update(stats_clause, values=
+ RouterStats.table.update(stats_clause, values=
{RouterStats.table.c.circ_from_ratio:
(1-RouterStats.table.c.circ_from_rate)/(1-avg_from_rate),
RouterStats.table.c.circ_to_ratio:
(1-RouterStats.table.c.circ_to_rate)/(1-avg_to_rate),
RouterStats.table.c.circ_bi_ratio:
(1-RouterStats.table.c.circ_bi_rate)/(1-avg_bi_rate),
- RouterStats.table.c.avg_first_ext:
+ RouterStats.table.c.ext_ratio:
(RouterStats.table.c.avg_first_ext)/(avg_ext),
RouterStats.table.c.sbw_ratio:
(RouterStats.table.c.sbw)/(avg_sbw)})
- tc_session.clear()
+ tc_session.commit()
_compute_ratios = Callable(_compute_ratios)
def _compute_filtered_query(min_ratio): # broken.. don't use.
@@ -393,6 +399,7 @@
avg_sbw = RouterStats.query.filter('1=1').avg(RouterStats.filt_sbw)
for rs in RouterStats.query.all():
rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw
+ tc_session.commit()
_compute_filtered_query = Callable(_compute_filtered_query)
def _compute_filtered_relational(min_ratio, stats_clause, filter_clause):
@@ -401,8 +408,7 @@
# TODO: Turn this into a single query....
for rs in RouterStats.query.filter(stats_clause).\
- options(eagerload_all('router.streams.circuit.routers')).\
- all():
+ options(eagerload_all('router.streams.circuit.routers')).all():
tot_sbw = 0
sbw_cnt = 0
for s in rs.router.streams:
@@ -415,12 +421,20 @@
if not skip:
tot_sbw += s.read_bandwidth
sbw_cnt += 1
- rs.filt_sbw = tot_sbw/sbw_cnt
- tc_session.update(rs)
- avg_sbw = RouterStats.query.filter(stats_clause).avg(RouterStats.filt_sbw)
+ if sbw_cnt: rs.filt_sbw = tot_sbw/sbw_cnt
+ else: rs.filt_sbw = None
+ tc_session.add(rs)
+ if sqlalchemy.__version__ < "0.5.0":
+ avg_sbw = RouterStats.query.filter(stats_clause).avg(RouterStats.filt_sbw)
+ else:
+ avg_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(stats_clause).scalar()
for rs in RouterStats.query.filter(stats_clause).all():
- rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw
- tc_session.update(rs)
+ if type(rs.filt_sbw) == float and avg_sbw:
+ rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw
+ else:
+ rs.filt_sbw_ratio = None
+ tc_session.add(rs)
+ tc_session.commit()
_compute_filtered_relational = Callable(_compute_filtered_relational)
def _compute_filtered_ratios(min_ratio, stats_clause, filter_clause):
@@ -436,7 +450,8 @@
rs = RouterStats()
rs.router = r
r.stats = rs
- tc_session.update(r)
+ tc_session.add(r)
+ tc_session.clear()
tc_session.commit()
reset = Callable(reset)
@@ -457,12 +472,7 @@
compute = Callable(compute)
def write_stats(f, pct_low=0, pct_high=100, order_by=None, recompute=False, stat_clause=None, filter_clause=None):
- ratio_key = """SQLSupport Statistics:
- SR=Stream avg ratio AR=Advertised bw ratio BRR=Adv. bw avg ratio
- CSR=Circ suspect ratio CFR=Circ Fail Ratio SSR=Stream suspect ratio
- SFR=Stream fail ratio CC=Circuit Count SC=Stream Count
- P=Percentile Rank U=Uptime (h)\n"""
-
+
if not order_by:
order_by=RouterStats.avg_first_ext
@@ -472,34 +482,57 @@
pct_clause = and_(RouterStats.percentile >= pct_low,
RouterStats.percentile < pct_high)
- circ_from_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_from_rate)
- circ_to_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_to_rate)
- circ_bi_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_bi_rate)
+ # This is Fail City and sqlalchemy is running for mayor.
+ if sqlalchemy.__version__ < "0.5.0":
+ circ_from_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_from_rate)
+ circ_to_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_to_rate)
+ circ_bi_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_bi_rate)
- avg_first_ext = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.avg_first_ext)
- sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.sbw)
- filt_sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.filt_sbw)
- percentile = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.percentile)
+ avg_first_ext = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.avg_first_ext)
+ sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.sbw)
+ filt_sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.filt_sbw)
+ percentile = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.percentile)
+ else:
+ circ_from_rate = tc_session.query(func.avg(RouterStats.circ_from_rate)).filter(pct_clause).filter(stat_clause).scalar()
+ circ_to_rate = tc_session.query(func.avg(RouterStats.circ_to_rate)).filter(pct_clause).filter(stat_clause).scalar()
+ circ_bi_rate = tc_session.query(func.avg(RouterStats.circ_bi_rate)).filter(pct_clause).filter(stat_clause).scalar()
+
+ avg_first_ext = tc_session.query(func.avg(RouterStats.avg_first_ext)).filter(pct_clause).filter(stat_clause).scalar()
+ sbw = tc_session.query(func.avg(RouterStats.sbw)).filter(pct_clause).filter(stat_clause).scalar()
+ filt_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(pct_clause).filter(stat_clause).scalar()
+ percentile = tc_session.query(func.avg(RouterStats.percentile)).filter(pct_clause).filter(stat_clause).scalar()
+ def cvt(a,b):
+ if type(a) == float: return round(a,b)
+ elif type(a) == type(None): return "None"
+ else: return type(a)
+
+ sql_key = """SQLSupport Statistics:
+ CFR=Circ From Rate CTR=Circ To Rate CBR=Circ To/From Rate
+ CFE=Avg 1st Ext time (s) SBW=Avg Stream BW FBW=Filtered stream bw
+ PR=Percentile Rank\n\n"""
+
+ f.write(sql_key)
f.write("Average Statistics:\n")
- f.write(" CFR="+str(round(circ_from_rate,2))+"\n")
- f.write(" CTR="+str(round(circ_to_rate,2))+"\n")
- f.write(" CBR="+str(round(circ_bi_rate,2))+"\n")
- f.write(" CFE="+str(round(avg_first_ext,2))+"\n")
- f.write(" SBW="+str(round(sbw,2))+"\n")
- f.write(" FBW="+str(round(filt_sbw,2))+"\n")
- f.write(" PR="+str(round(percentile,2))+"\n\n")
+ f.write(" CFR="+str(cvt(circ_from_rate,2))+"\n")
+ f.write(" CTR="+str(cvt(circ_to_rate,2))+"\n")
+ f.write(" CBR="+str(cvt(circ_bi_rate,2))+"\n")
+ f.write(" CFE="+str(cvt(avg_first_ext,2))+"\n")
+ f.write(" SBW="+str(cvt(sbw,2))+"\n")
+ f.write(" FBW="+str(cvt(filt_sbw,2))+"\n")
+ f.write(" PR="+str(cvt(percentile,2))+"\n\n")
for s in RouterStats.query.filter(pct_clause).filter(stat_clause).\
order_by(order_by).all():
f.write(s.router.idhex+"="+s.router.nickname+"\n")
- f.write(" CFR="+str(round(s.circ_from_rate,2))+" ")
- f.write(" CTR="+str(round(s.circ_to_rate,2))+" ")
- f.write(" CBR="+str(round(s.circ_bi_rate,2))+" ")
- f.write(" CFE="+str(round(s.avg_first_ext,2))+" ")
- f.write(" SBW="+str(round(s.sbw,2))+" ")
- f.write(" FBW="+str(round(s.filt_sbw,2))+" ")
- f.write(" PR="+str(round(s.percentile,1))+"\n")
+ f.write(" CFR="+str(cvt(s.circ_from_rate,2))+" ")
+ f.write(" CTR="+str(cvt(s.circ_to_rate,2))+" ")
+ f.write(" CBR="+str(cvt(s.circ_bi_rate,2))+" ")
+ f.write(" CFE="+str(cvt(s.avg_first_ext,2))+" ")
+ f.write(" SBW="+str(cvt(s.sbw,2))+" ")
+ f.write(" FBW="+str(cvt(s.filt_sbw,2))+" ")
+ f.write(" PR="+str(cvt(s.percentile,1))+"\n")
+ f.flush()
write_stats = Callable(write_stats)
@@ -513,7 +546,7 @@
r.circuits = []
r.streams = []
r.stats = None
- tc_session.update(r)
+ tc_session.add(r)
BwHistory.table.drop() # Will drop subclasses
Extension.table.drop()
@@ -534,7 +567,7 @@
class ConsensusTrackerListener(TorCtl.DualEventListener):
def __init__(self):
TorCtl.DualEventListener.__init__(self)
- self.last_desc_at = time.time()
+ self.last_desc_at = time.time()-10.0
self.consensus = None
# TODO: What about non-running routers and uptime information?
@@ -547,8 +580,8 @@
bwh = BwHistory(router=r, rank=rc.list_rank, bw=rc.bw,
pub_time=r.published)
r.bw_history.append(bwh)
- tc_session.save_or_update(bwh)
- tc_session.update(r)
+ tc_session.add(bwh)
+ tc_session.add(r)
tc_session.commit()
def _update_db(self, idlist):
@@ -564,7 +597,7 @@
if not r: r = Router()
r.from_router(rc)
- tc_session.save_or_update(r)
+ tc_session.add(r)
tc_session.commit()
def update_consensus(self):
@@ -587,12 +620,13 @@
if not OP:
OP = Router(idhex="0000000000000000000000000000000000000000",
orhash="000000000000000000000000000",
- nick="!!TorClient", published=datetime.datetime.utcnow())
- tc_session.save_or_update(OP)
+ nickname="!!TorClient",
+ published=datetime.datetime.utcnow())
+ tc_session.add(OP)
tc_session.commit()
self.update_consensus()
# So ghetto
- if e.arrived_at - self.last_desc_at > 20.0:
+ if e.arrived_at - self.last_desc_at > 30.0:
plog("INFO", "Newdesc timer is up. Assuming we have full consensus now")
self.last_desc_at = 0x7fffffff
self._update_rank_history(self.consensus.ns_map.iterkeys())
@@ -624,7 +658,7 @@
self.track_parent = False
def circ_status_event(self, c):
- if self.track_parent and c.cird_id not in self.parent_handler.circuits:
+ if self.track_parent and c.circ_id not in self.parent_handler.circuits:
return # Ignore circuits that aren't ours
# TODO: Hrmm, consider making this sane in TorCtl.
if c.reason: lreason = c.reason
@@ -646,17 +680,17 @@
for r in self.parent_handler.circuits[c.circ_id].path:
rq = Router.query.options(eagerload('circuits')).filter_by(
idhex=r.idhex).one()
- circ.routers.append(rq)
- rq.circuits.append(circ)
- tc_session.update(rq)
- tc_session.save_or_update(circ)
+ circ.routers.append(rq)
+ #rq.circuits.append(circ) # done automagically?
+ #tc_session.add(rq)
+ tc_session.add(circ)
tc_session.commit()
elif c.status == "EXTENDED":
circ = Circuit.query.options(eagerload('extensions')).filter_by(
circ_id = c.circ_id).first()
if not circ: return # Skip circuits from before we came online
- e = Extension(circ=circ, hop=len(c.path), time=c.arrived_at)
+ e = Extension(circ=circ, hop=len(c.path)-1, time=c.arrived_at)
if len(c.path) == 1:
e.from_node = OP
@@ -673,13 +707,13 @@
# FIXME: Eager load here?
circ.routers.append(e.to_node)
e.to_node.circuits.append(circ)
- tc_session.update(e.to_node)
+ tc_session.add(e.to_node)
e.delta = c.arrived_at - circ.last_extend
circ.last_extend = c.arrived_at
circ.extensions.append(e)
- tc_session.save_or_update(e)
- tc_session.update(circ)
+ tc_session.add(e)
+ tc_session.add(circ)
tc_session.commit()
elif c.status == "FAILED":
circ = Circuit.query.filter_by(circ_id = c.circ_id).first()
@@ -701,7 +735,7 @@
eagerload('extensions')).filter_by(id=circ.id).one()
circ.fail_reason = reason
circ.fail_time = c.arrived_at
- e = FailedExtension(circ=circ, hop=len(c.path)+1, time=c.arrived_at)
+ e = FailedExtension(circ=circ, hop=len(c.path), time=c.arrived_at)
if len(c.path) == 0:
e.from_node = OP
@@ -712,7 +746,7 @@
e.from_node = Router.query.filter_by(idhex=r_ext[1:]).one()
if self.track_parent:
- r=self.parent_handler.circuits[c.circ_id].path[len(c.path)+1]
+ r=self.parent_handler.circuits[c.circ_id].path[len(c.path)]
e.to_node = Router.query.filter_by(idhex=r.idhex).one()
else:
e.to_node = None # We have no idea..
@@ -721,9 +755,9 @@
e.reason = reason
circ.extensions.append(e)
circ.fail_time = c.arrived_at
- tc_session.save_or_update(e)
+ tc_session.add(e)
- tc_session.save_or_update(circ)
+ tc_session.add(circ)
tc_session.commit()
elif c.status == "BUILT":
circ = Circuit.query.filter_by(
@@ -738,7 +772,7 @@
circ.built_time = c.arrived_at
circ.tot_delta = c.arrived_at - circ.launch_time
- tc_session.save_or_update(circ)
+ tc_session.add(circ)
tc_session.commit()
elif c.status == "CLOSED":
circ = BuiltCircuit.query.filter_by(circ_id = c.circ_id).first()
@@ -757,16 +791,17 @@
circ = DestroyedCircuit.query.filter_by(id=circ.id).one()
circ.destroy_reason = reason
circ.destroy_time = c.arrived_at
- tc_session.save_or_update(circ)
+ tc_session.add(circ)
tc_session.commit()
class StreamListener(CircuitListener):
def stream_bw_event(self, s):
strm = Stream.query.filter_by(strm_id = s.strm_id).first()
if strm:
+ plog("DEBUG", "Got stream bw: "+str(s.strm_id))
strm.tot_read_bytes += s.bytes_read
strm.tot_write_bytes += s.bytes_written
- tc_session.update(strm)
+ tc_session.add(strm)
tc_session.commit()
def stream_status_event(self, s):
@@ -774,22 +809,29 @@
else: lreason = "NONE"
if s.remote_reason: rreason = s.remote_reason
else: rreason = "NONE"
- reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
if s.status in ("NEW", "NEWRESOLVE"):
strm = Stream(strm_id=s.strm_id, tgt_host=s.target_host,
tgt_port=s.target_port, init_status=s.status,
tot_read_bytes=0, tot_write_bytes=0)
- tc_session.save(strm)
+ tc_session.add(strm)
tc_session.commit()
return
strm = Stream.query.filter_by(strm_id = s.strm_id).first()
+ if self.track_parent and s.strm_id not in self.parent_handler.streams:
+ if strm:
+ tc_session.delete(strm)
+ tc_session.commit()
+ return # Ignore streams that aren't ours
+
if not strm:
plog("NOTICE", "Ignoring prior stream "+str(s.strm_id))
return # Ignore prior streams
- if s.statis == "SENTCONNECT":
+ reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+strm.init_status
+
+ if s.status == "SENTCONNECT":
# New circuit
strm.circuit = Circuit.query.filter_by(circ_id=s.circ_id).first()
if not strm.circuit:
@@ -811,7 +853,7 @@
plog("WARN", "No circuit for "+str(s.strm_id)+" circ: "+str(s.circ_id))
if not strm.circuit:
- plog("WARN", "No stream circuit for "+str(s.strm_id)+" circ: "+str(s.circ_id))
+ plog("INFO", "No stream circuit for "+str(s.strm_id)+" circ: "+str(s.circ_id))
strm.circuit = circ
# XXX: Verify circ id matches stream.circ
@@ -819,18 +861,20 @@
if s.status == "SUCCEEDED":
strm.start_time = s.arrived_at
for r in strm.circuit.routers:
- r.streams.add(strm)
- tc_session.update(r)
- tc_session.update(strm)
+ plog("DEBUG", "Added router "+r.idhex+" to stream "+str(s.strm_id))
+ r.streams.append(strm)
+ tc_session.add(r)
+ tc_session.add(strm)
tc_session.commit()
elif s.status == "DETACHED":
- strm.detached_circuits.append(circ)
+ for r in strm.circuit.routers:
+ r.detached_streams.append(strm)
+ tc_session.add(r)
+ #strm.detached_circuits.append(strm.circuit)
strm.circuit.detached_streams.append(strm)
- for r in strm.circuit.routers:
- r.detached_streams.add(strm)
- tc_session.update(r)
- tc_session.update(circ)
- tc_session.update(strm)
+ strm.circuit.streams.remove(strm)
+ strm.circuit = None
+ tc_session.add(strm)
tc_session.commit()
elif s.status == "FAILED":
strm.expunge()
@@ -840,7 +884,7 @@
strm = FailedStream.query.filter_by(id=strm.id).one()
strm.fail_time = s.arrived_at
strm.fail_reason = reason
- tc_session.update(strm)
+ tc_session.add(strm)
tc_session.commit()
elif s.status == "CLOSED":
if isinstance(strm, FailedStream):
@@ -854,7 +898,6 @@
strm = FailedStream.query.filter_by(id=strm.id).one()
strm.fail_time = s.arrived_at
else:
- strm.expunge()
# Convert to destroyed circuit
Stream.table.update(Stream.id ==
strm.id).execute(row_type='closedstream')
@@ -863,7 +906,7 @@
strm.write_bandwidth = strm.tot_write_bytes/(s.arrived_at-strm.start_time)
strm.end_time = s.arrived_at
strm.close_reason = reason
- tc_session.update(strm)
+ tc_session.add(strm)
tc_session.commit()
def run_example(host, port):
More information about the tor-commits
mailing list