[or-cvs] r19411: {torctl} Add stream tracking code and attempt to reduce stats calcula (torctl/trunk/python/TorCtl)
mikeperry at seul.org
mikeperry at seul.org
Sun May 3 11:31:38 UTC 2009
Author: mikeperry
Date: 2009-05-03 07:31:38 -0400 (Sun, 03 May 2009)
New Revision: 19411
Modified:
torctl/trunk/python/TorCtl/SQLSupport.py
Log:
Add stream tracking code and attempt to reduce stats
calculations to single SQL update statements where possible.
Modified: torctl/trunk/python/TorCtl/SQLSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/SQLSupport.py 2009-05-03 09:58:03 UTC (rev 19410)
+++ torctl/trunk/python/TorCtl/SQLSupport.py 2009-05-03 11:31:38 UTC (rev 19411)
@@ -20,10 +20,16 @@
from TorCtl import EVENT_TYPE, EVENT_STATE, TorCtlError
from sqlalchemy.orm import scoped_session, sessionmaker, eagerload, lazyload, eagerload_all
-from sqlalchemy import create_engine
+from sqlalchemy import create_engine, and_, or_, not_
from sqlalchemy.schema import ThreadLocalMetaData,MetaData
from elixir import *
+# Nodes with a ratio below this value will be removed from consideration
+# for higher-valued nodes
+MIN_RATIO=0.5
+
+NO_FPE=2**-20
+
#################### Model #######################
# In elixir, the session (DB connection) is a property of the model..
@@ -65,6 +71,7 @@
router = Field(PickleType(mutable=False))
circuits = ManyToMany('Circuit')
streams = ManyToMany('Stream')
+ detached_streams = ManyToMany('Stream')
bw_history = OneToMany('BwHistory')
stats = OneToOne('RouterStats', inverse="router")
@@ -98,7 +105,8 @@
using_options(order_by='-launch_time', session=tc_session, metadata=tc_metadata)
using_mapper_options(save_on_init=False)
routers = ManyToMany('Router')
- streams = OneToMany('Stream')
+ streams = OneToMany('Stream', inverse='circuit')
+ detached_streams = ManyToMany('Stream', inverse='detached_circuits')
extensions = OneToMany('Extension', inverse='circ')
circ_id = Field(Integer, index=True)
launch_time = Field(Float)
@@ -148,22 +156,29 @@
using_options(session=tc_session, metadata=tc_metadata)
using_options(order_by='-start_time')
using_mapper_options(save_on_init=False)
+ tgt_host = Field(Text)
+ tgt_port = Field(Integer)
circuit = ManyToOne('Circuit')
+ detached_circuits = ManyToMany('Circuit')
+ ignored = Field(Boolean) # Directory streams
strm_id = Field(Integer, index=True)
start_time = Field(Float)
- tot_bytes = Field(Integer)
+ tot_read_bytes = Field(Integer)
+ tot_write_bytes = Field(Integer)
+ close_reason = Field(Text) # Shared by Failed and Closed. Unused here.
class FailedStream(Stream):
using_options(session=tc_session, metadata=tc_metadata)
using_mapper_options(save_on_init=False)
- reason = Field(Text)
+ fail_reason = Field(Text)
fail_time = Field(Float)
class ClosedStream(Stream):
using_options(session=tc_session, metadata=tc_metadata)
using_mapper_options(save_on_init=False)
end_time = Field(Float)
- bandwidth = Field(Float)
+ read_bandwidth = Field(Float)
+ write_bandwidth = Field(Float)
class RouterStats(Entity):
using_options(session=tc_session, metadata=tc_metadata)
@@ -200,64 +215,43 @@
avg_first_ext = Field(Float)
ext_ratio = Field(Float)
- avg_sbw = Field(Float)
+ sbw = Field(Float)
sbw_ratio = Field(Float)
-
- # FIXME: Figure out how to efficiently compute these..
- filt_to_ratio = Field(Float)
- filt_from_ratio = Field(Float)
- filt_bi_ratio = Field(Float)
+ filt_sbw = Field(Float)
filt_sbw_ratio = Field(Float)
- def _compute_stats_relation(r):
- rs = r.stats
- rs.circ_fail_to = 0
- rs.circ_try_to = 0
- rs.circ_fail_from = 0
- rs.circ_try_from = 0
- tot_extend_time = 0
- tot_extends = 0
- for c in r.circuits:
- for e in c.extensions:
- if e.to_node == r:
- rs.circ_try_to += 1
- if isinstance(e, FailedExtension):
- rs.circ_fail_to += 1
- elif e.hop == 0:
- tot_extend_time += e.delta
- tot_extends += 1
- elif e.from_node == r:
- rs.circ_try_from += 1
- if isinstance(e, FailedExtension):
- rs.circ_fail_from += 1
-
- if isinstance(c, FailedCircuit):
- pass # TODO: Also count timeouts against earlier nodes?
- elif isinstance(c, DestroyedCircuit):
- pass # TODO: Count these somehow..
+ def _compute_stats_relation(stats_clause):
+ for rs in RouterStats.query.\
+ filter(stats_clause).\
+ options(eagerload_all('router.circuits.extensions')).\
+ all():
+ rs.circ_fail_to = 0
+ rs.circ_try_to = 0
+ rs.circ_fail_from = 0
+ rs.circ_try_from = 0
+ tot_extend_time = 0
+ tot_extends = 0
+ for c in rs.router.circuits:
+ for e in c.extensions:
+ if e.to_node == r:
+ rs.circ_try_to += 1
+ if isinstance(e, FailedExtension):
+ rs.circ_fail_to += 1
+ elif e.hop == 0:
+ tot_extend_time += e.delta
+ tot_extends += 1
+ elif e.from_node == r:
+ rs.circ_try_from += 1
+ if isinstance(e, FailedExtension):
+ rs.circ_fail_from += 1
+
+ if isinstance(c, FailedCircuit):
+ pass # TODO: Also count timeouts against earlier nodes?
+ elif isinstance(c, DestroyedCircuit):
+ pass # TODO: Count these somehow..
- if tot_extends > 0: rs.avg_first_ext = (1.0*tot_extend_time)/tot_extends
- else: rs.avg_first_ext = 0
- _compute_stats_relation = Callable(_compute_stats_relation)
-
- def _compute_stats_query(r):
- rs = r.stats
- to_r = Extension.query.filter_by(to_node=r)
- rs.circ_try_to = to_r.count()
- rs.circ_try_from = Extension.query.filter_by(from_node=r).count()
- rs.circ_fail_to = FailedExtension.query.filter_by(to_node=r).count()
- rs.circ_fail_from = FailedExtension.query.filter_by(from_node=r).count()
- rs.avg_first_ext = to_r.filter_by(hop=0,row_type='extension').avg(Extension.delta)
- _compute_stats_query = Callable(_compute_stats_query)
-
-
- def _compute_stats():
- for r in Router.query.\
- options(eagerload_all('circuits.extensions')).\
- all():
- RouterStats._compute_stats_relation(r)
- #RouterStats._compute_stats_query(r) # Remove options if this is used
- rs = r.stats
+ if tot_extends > 0: rs.avg_first_ext = (1.0*tot_extend_time)/tot_extends
+ else: rs.avg_first_ext = 0
if rs.circ_try_from > 0:
rs.circ_from_rate = (1.0*rs.circ_fail_from/rs.circ_try_from)
if rs.circ_try_to > 0:
@@ -265,105 +259,257 @@
if rs.circ_try_to+rs.circ_try_from > 0:
rs.circ_bi_rate = (1.0*rs.circ_fail_to+rs.circ_fail_from)/(rs.circ_try_to+rs.circ_try_from)
- #for s in r.streams:
- # if isinstance(c, ClosedStream):
- # elif isinstance(c, FailedStream):
tc_session.update(rs)
+ _compute_stats_relation = Callable(_compute_stats_relation)
+
+ def _compute_stats_query(stats_clause):
+ # http://www.sqlalchemy.org/docs/04/sqlexpression.html#sql_update
+ to_s = select([func.count(Extension.id)],
+ and_(stats_clause, Extension.table.c.to_node_idhex
+ == RouterStats.table.c.router_idhex)).as_scalar()
+ from_s = select([func.count(Extension.id)],
+ and_(stats_clause, Extension.table.c.from_node_idhex
+ == RouterStats.table.c.router_idhex)).as_scalar()
+ f_to_s = select([func.count(FailedExtension.id)],
+ and_(stats_clause, FailedExtension.table.c.to_node_idhex
+ == RouterStats.table.c.router_idhex)).as_scalar()
+ f_from_s = select([func.count(FailedExtension.id)],
+ and_(stats_clause, FailedExtension.table.c.from_node_idhex
+ == RouterStats.table.c.router_idhex)).as_scalar()
+ avg_ext = select([func.avg(Extension.delta)],
+ and_(stats_clause,
+ Extension.table.c.to_node_idhex==RouterStats.table.c.router_idhex,
+ Extension.table.c.hop==0,
+ Extension.table.c.row_type=='extension')).as_scalar()
+
+ RouterStats.table.update(stats_clause, values=
+ {RouterStats.table.c.circ_try_to:to_s,
+ RouterStats.table.c.circ_try_from:from_s,
+ RouterStats.table.c.circ_fail_to:f_to_s,
+ RouterStats.table.c.circ_fail_from:f_from_s,
+ RouterStats.table.c.avg_first_ext:avg_ext}).execute()
+
+ RouterStats.table.update(values=
+ {RouterStats.table.c.circ_from_rate :
+ RouterStats.table.c.circ_fail_from/RouterStats.table.c.circ_try_from,
+ RouterStats.table.c.circ_to_rate :
+ RouterStats.table.c.circ_fail_to/RouterStats.table.c.circ_try_to,
+ RouterStats.table.c.circ_bi_rate :
+ (RouterStats.table.c.circ_fail_to+RouterStats.table.c.circ_fail_from)
+ /
+ (RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from)}).execute()
+
+ tc_session.clear()
+
+ # TODO: Give the streams relation table a sane name and reduce this too
+ for rs in RouterStats.query.options(eagerload('router'),
+ eagerload('router.streams')).all():
+ tot_bw = 0.0
+ s_cnt = 0
+ for s in rs.router.streams:
+ if isinstance(s, ClosedStream):
+ tot_bw += s.read_bandwidth
+ s_cnt += 1
+ if s_cnt > 0: rs.sbw = tot_bw/s_cnt
+ tc_session.update(rs)
+ _compute_stats_query = Callable(_compute_stats_query)
+
+ def _compute_stats(stats_clause):
+ RouterStats._compute_stats_query(stats_clause)
+ #RouterStats._compute_stats_relation(stats_clause)
_compute_stats = Callable(_compute_stats)
def _compute_ranks():
- min_avg_rank = 0x7fffffff
- max_avg_rank = 0
- # TODO: Can we optimize this further into one query/update?
- for r in Router.query.all():
- if r.stats: tc_session.delete(r.stats)
- rs = RouterStats()
- rs.router = r
- r.stats = rs
- rank_q = BwHistory.query.filter_by(router=r)
- rs.min_rank = rank_q.min(BwHistory.rank)
- rs.avg_rank = rank_q.avg(BwHistory.rank)
- rs.max_rank = rank_q.max(BwHistory.rank)
- rs.avg_bw = rank_q.avg(BwHistory.bw)
- min_avg_rank = RouterStats.query.filer('1=1').min(RouterStats.avg_rank)
- max_avg_rank = RouterStats.query.filer('1=1').max(RouterStats.avg_rank)
- for rs in RouterStats.query.all():
- rs.percentile = (100.0*rs.avg_rank)/(max_avg_rank - min_avg_rank)
+ min_r = select([func.min(BwHistory.rank)],
+ BwHistory.table.c.router_idhex
+ == RouterStats.table.c.router_idhex).as_scalar()
+ avg_r = select([func.avg(BwHistory.rank)],
+ BwHistory.table.c.router_idhex
+ == RouterStats.table.c.router_idhex).as_scalar()
+ max_r = select([func.max(BwHistory.rank)],
+ BwHistory.table.c.router_idhex
+ == RouterStats.table.c.router_idhex).as_scalar()
+ avg_bw = select([func.avg(BwHistory.bw)],
+ BwHistory.table.c.router_idhex
+ == RouterStats.table.c.router_idhex).as_scalar()
- tc_session.update(rs)
- tc_session.update(r)
+ RouterStats.table.update(values=
+ {RouterStats.table.c.min_rank:min_r,
+ RouterStats.table.c.avg_rank:avg_r,
+ RouterStats.table.c.max_rank:max_r,
+ RouterStats.table.c.avg_bw:avg_bw}).execute()
+
+ min_avg_rank = select([func.min(RouterStats.avg_rank)]).as_scalar()
+ max_avg_rank = select([func.max(RouterStats.avg_rank)]).as_scalar()
+
+ RouterStats.query.filter('1=1').min(RouterStats.avg_rank)
+ max_avg_rank = RouterStats.query.filter('1=1').max(RouterStats.avg_rank)
+
+ RouterStats.table.update(values=
+ {RouterStats.table.c.percentile:
+ (100.0*rs.avg_rank)/max_avg_rank}).execute()
+
+ tc_session.clear()
_compute_ranks = Callable(_compute_ranks)
- def _compute_ratios(filter):
- sliceq = RouterStats.query.filter(filter)
- avg_circ_from_rate = sliceq.avg(RouterStats.circ_from_rate)
- avg_circ_to_rate = sliceq.avg(RouterStats.circ_to_rate)
- avg_circ_bi_rate = sliceq.avg(RouterStats.circ_bi_rate)
- avg_ext = sliceq.avg(RouterStats.avg_first_ext)
- for rs in sliceq.all():
- rs.circ_from_ratio = rs.circ_from_rate/avg_circ_from_rate
- rs.circ_to_ratio = rs.circ_to_rate/avg_circ_to_rate
- rs.circ_bi_ratio = rs.circ_bi_rate/avg_circ_bi_rate
- rs.ext_ratio = rs.avg_first_ext/avg_ext
- rs.filt_from_ratio = rs.circ_from_ratio
- rs.filt_to_ratio = rs.circ_to_ratio
- rs.filt_bi_ratio = rs.circ_bi_ratio
+ def _compute_ratios(stats_clause):
+ avg_from_rate = select([func.avg(RouterStats.circ_from_rate)],
+ stats_clause).as_scalar()
+ avg_to_rate = select([func.avg(RouterStats.circ_to_rate)],
+ stats_clause).as_scalar()
+ avg_bi_rate = select([func.avg(RouterStats.circ_bi_rate)],
+ stats_clause).as_scalar()
+ avg_ext = select([func.avg(RouterStats.avg_first_ext)],
+ stats_clause).as_scalar()
+ avg_sbw = select([func.avg(RouterStats.sbw)],
+ stats_clause).as_scalar()
+
+ RouterStats.update(stats_clause, values=
+ {RouterStats.table.c.circ_from_ratio:
+ (1-RouterStats.table.c.circ_from_rate)/(1-avg_from_rate),
+ RouterStats.table.c.circ_to_ratio:
+ (1-RouterStats.table.c.circ_to_rate)/(1-avg_to_rate),
+ RouterStats.table.c.circ_bi_ratio:
+ (1-RouterStats.table.c.circ_bi_rate)/(1-avg_bi_rate),
+ RouterStats.table.c.avg_first_ext:
+ (RouterStats.table.c.avg_first_ext)/(avg_ext),
+ RouterStats.table.c.sbw_ratio:
+ (RouterStats.table.c.sbw)/(avg_sbw)})
+ tc_session.clear()
_compute_ratios = Callable(_compute_ratios)
- def _compute_filtered_ratios(filter, min_ratio):
- # XXX: Actually, we should start off simple and only
- # do filtering for stream ratios
+ def _compute_filtered_query(min_ratio): # broken.. don't use.
badrouters = RouterStats.query.filter(
- RouterStats.circ_from_rate < min_ratio).column(RouterStats.router).all()
+ RouterStats.sbw_ratio < min_ratio).column(RouterStats.router).all()
+
+ for r in Router.query.all():
+ rs = r.stats
+ # XXX: This is totally wrong:
+ strmq = Router.query.filter_by(idhex=r.idhex).add_column(Router.streams).filter_by(row_type='closedstream')
+ for br in badrouters:
+ if br != r:
+ strmq = strmq.filter(not_(ClosedStream.circuit.routers.contains(r)))
+ rs.filt_sbw = strmq.avg(ClosedStream.read_bandwidth)
+ avg_sbw = RouterStats.query.filter('1=1').avg(RouterStats.filt_sbw)
+ for rs in RouterStats.query.all():
+ rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw
+ _compute_filtered_query = Callable(_compute_filtered_query)
- extnq = Circuit.query
- for r in badrouters:
- extnq.filter(not_(Circuit.routers.contains(r)))
- extnq = sliceq.column(Circuit.extensions)
-
- #sliceq = RouterStats.query.filter(filter)
- #for r in badrouters:
- # sliceq = sliceq.filter(not_(Circuits.routers.contains(r)))
+ def _compute_filtered_relational(min_ratio, stats_clause, filter_clause):
+ badrouters = RouterStats.query.filter(stats_clause).filter(filter_clause).\
+ filter(RouterStats.sbw_ratio < min_ratio).all()
- avg_rate = sliceq.avg(RouterStats.circ_from_rate)
+ # TODO: Turn this into a single query....
+ for rs in RouterStats.query.filter(stats_clause).\
+ options(eagerload_all('router.streams.circuit.routers')).\
+ all():
+ tot_sbw = 0
+ sbw_cnt = 0
+ for s in rs.router.streams:
+ if isinstance(s, ClosedStream):
+ skip = False
+ for br in badrouters:
+ if br != rs:
+ if br.router in s.circuit.routers:
+ skip = True
+ if not skip:
+ tot_sbw += s.read_bandwidth
+ sbw_cnt += 1
+ rs.filt_sbw = tot_sbw/sbw_cnt
+ tc_session.update(rs)
+ avg_sbw = RouterStats.query.filter(stats_clause).avg(RouterStats.filt_sbw)
+ for rs in RouterStats.query.filter(stats_clause).all():
+ rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw
+ tc_session.update(rs)
+ _compute_filtered_relational = Callable(_compute_filtered_relational)
- for rs in sliceq.all():
- rs.filt_from_ratio = rs.circ_from_rate/avg_rate
+ def _compute_filtered_ratios(min_ratio, stats_clause, filter_clause):
+ RouterStats._compute_filtered_relational(min_ratio, stats_clause,
+ filter_clause)
+ #RouterStats._compute_filtered_query(filter,min_ratio)
_compute_filtered_ratios = Callable(_compute_filtered_ratios)
def reset():
- for r in Router.query.all():
- r.stats = None
- tc_session.update(r)
RouterStats.table.drop()
RouterStats.table.create()
+ for r in Router.query.all(): # Is this needed?
+ rs = RouterStats()
+ rs.router = r
+ r.stats = rs
+ tc_session.update(r)
tc_session.commit()
reset = Callable(reset)
- def compute(router_filter, stats_filter):
- RouterStats._compute_ranks()
- RouterStats._compute_stats()
- RouterStats._compute_ratios()
- RouterStats._compute_filtered_ratios()
+ def compute(pct_low=0, pct_high=100, stat_clause=None, filter_clause=None):
+ pct_clause = and_(RouterStats.percentile >= pct_low,
+ RouterStats.percentile < pct_high)
+ if stat_clause:
+ stat_clause = and_(pct_clause, stat_clause)
+ else:
+ stat_clause = pct_clause
+
+ RouterStats.reset()
+ RouterStats._compute_ranks() # No filters. Ranks are independent
+ RouterStats._compute_stats(stat_clause)
+ RouterStats._compute_ratios(stat_clause)
+ RouterStats._compute_filtered_ratios(MIN_RATIO, stat_clause, filter_clause)
tc_session.commit()
compute = Callable(compute)
-##################### End Model ####################
+ def write_stats(f, pct_low=0, pct_high=100, order_by=None, recompute=False, stat_clause=None, filter_clause=None):
+ ratio_key = """SQLSupport Statistics:
+ SR=Stream avg ratio AR=Advertised bw ratio BRR=Adv. bw avg ratio
+ CSR=Circ suspect ratio CFR=Circ Fail Ratio SSR=Stream suspect ratio
+ SFR=Stream fail ratio CC=Circuit Count SC=Stream Count
+ P=Percentile Rank U=Uptime (h)\n"""
+
+ if not order_by:
+ order_by=RouterStats.avg_first_ext
-class CircuitStatsBroker:
- pass
+ if recompute:
+ RouterStats.compute(pct_low, pct_high, stat_clause, filter_clause)
-class StreamStatsBroker:
- pass
+ pct_clause = and_(RouterStats.percentile >= pct_low,
+ RouterStats.percentile < pct_high)
-class RatioBroker:
- pass
+ circ_from_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_from_rate)
+ circ_to_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_to_rate)
+ circ_bi_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_bi_rate)
+ avg_first_ext = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.avg_first_ext)
+ sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.sbw)
+ filt_sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.filt_sbw)
+ percentile = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.percentile)
+
+ f.write("Average Statistics:\n")
+ f.write(" CFR="+str(round(circ_from_rate,2))+"\n")
+ f.write(" CTR="+str(round(circ_to_rate,2))+"\n")
+ f.write(" CBR="+str(round(circ_bi_rate,2))+"\n")
+ f.write(" CFE="+str(round(avg_first_ext,2))+"\n")
+ f.write(" SBW="+str(round(sbw,2))+"\n")
+ f.write(" FBW="+str(round(filt_sbw,2))+"\n")
+ f.write(" PR="+str(round(percentile,2))+"\n\n")
+
+ for s in RouterStats.query.filter(pct_clause).filter(stat_clause).\
+ order_by(order_by).all():
+ f.write(s.router.idhex+"="+s.router.nickname+"\n")
+ f.write(" CFR="+str(round(s.circ_from_rate,2))+" ")
+ f.write(" CTR="+str(round(s.circ_to_rate,2))+" ")
+ f.write(" CBR="+str(round(s.circ_bi_rate,2))+" ")
+ f.write(" CFE="+str(round(s.avg_first_ext,2))+" ")
+ f.write(" SBW="+str(round(s.sbw,2))+" ")
+ f.write(" FBW="+str(round(s.filt_sbw,2))+" ")
+ f.write(" PR="+str(round(s.percentile,1))+"\n")
+ write_stats = Callable(write_stats)
+
+
+##################### End Model ####################
+
#################### Model Support ################
-def reset_all_stats():
+def reset_all():
# Need to keep routers around..
for r in Router.query.all():
- r.bw_history = [] # XXX: Is this sufficient/correct?
+ r.bw_history = [] # XXX: Is this sufficient/correct/necessary?
r.circuits = []
r.streams = []
r.stats = None
@@ -391,7 +537,7 @@
self.last_desc_at = time.time()
self.consensus = None
- # XXX: What about non-running routers and uptime information?
+ # TODO: What about non-running routers and uptime information?
def _update_rank_history(self, idlist):
for idhex in idlist:
if idhex not in self.consensus.routers: continue
@@ -431,6 +577,8 @@
TorCtl.DualEventListener.set_parent(self, parent_handler)
def heartbeat_event(self, e):
+ # This sketchiness is to ensure we have an accurate history
+ # of each router's rank+bandwidth for the entire duration of the run..
if e.state == EVENT_STATE.PRELISTEN:
if not self.consensus:
global OP
@@ -522,7 +670,7 @@
e.to_node = Router.query.filter_by(idhex=r_ext[1:]).one()
if not self.track_parent:
- # XXX: Eager load here?
+ # FIXME: Eager load here?
circ.routers.append(e.to_node)
e.to_node.circuits.append(circ)
tc_session.update(e.to_node)
@@ -614,10 +762,110 @@
class StreamListener(CircuitListener):
def stream_bw_event(self, s):
- pass
+ strm = Stream.query.filter_by(strm_id = s.strm_id).first()
+ if strm:
+ strm.tot_read_bytes += s.bytes_read
+ strm.tot_write_bytes += s.bytes_written
+ tc_session.update(strm)
+ tc_session.commit()
+
def stream_status_event(self, s):
- pass
+ if s.reason: lreason = s.reason
+ else: lreason = "NONE"
+ if s.remote_reason: rreason = s.remote_reason
+ else: rreason = "NONE"
+ reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
+ if s.status in ("NEW", "NEWRESOLVE"):
+ strm = Stream(strm_id=s.strm_id, tgt_host=s.target_host,
+ tgt_port=s.target_port, init_status=s.status,
+ tot_read_bytes=0, tot_write_bytes=0)
+ tc_session.save(strm)
+ tc_session.commit()
+ return
+
+ strm = Stream.query.filter_by(strm_id = s.strm_id).first()
+ if not strm:
+ plog("NOTICE", "Ignoring prior stream "+str(s.strm_id))
+ return # Ignore prior streams
+
+ if s.statis == "SENTCONNECT":
+ # New circuit
+ strm.circuit = Circuit.query.filter_by(circ_id=s.circ_id).first()
+ if not strm.circuit:
+ plog("NOTICE", "Ignoring prior stream "+str(strm.strm_id)+" with old circuit "+str(s.circ_id))
+ tc_session.delete(strm)
+ tc_session.commit()
+ return
+ else:
+ circ = None
+ if s.circ_id:
+ circ = Circuit.query.filter_by(circ_id=s.circ_id).first()
+ elif self.track_parent:
+ circ = self.parent_handler.streams[s.strm_id].circ
+ if not circ: circ = self.parent_handler.streams[s.strm_id].pending_circ
+ if circ:
+ circ = Circuit.query.filter_by(circ_id=circ.circ_id).first()
+
+ if not circ:
+ plog("WARN", "No circuit for "+str(s.strm_id)+" circ: "+str(s.circ_id))
+
+ if not strm.circuit:
+ plog("WARN", "No stream circuit for "+str(s.strm_id)+" circ: "+str(s.circ_id))
+ strm.circuit = circ
+
+ # XXX: Verify circ id matches stream.circ
+
+ if s.status == "SUCCEEDED":
+ strm.start_time = s.arrived_at
+ for r in strm.circuit.routers:
+ r.streams.add(strm)
+ tc_session.update(r)
+ tc_session.update(strm)
+ tc_session.commit()
+ elif s.status == "DETACHED":
+ strm.detached_circuits.append(circ)
+ strm.circuit.detached_streams.append(strm)
+ for r in strm.circuit.routers:
+ r.detached_streams.add(strm)
+ tc_session.update(r)
+ tc_session.update(circ)
+ tc_session.update(strm)
+ tc_session.commit()
+ elif s.status == "FAILED":
+ strm.expunge()
+ # Convert to destroyed circuit
+ Stream.table.update(Stream.id ==
+ strm.id).execute(row_type='failedstream')
+ strm = FailedStream.query.filter_by(id=strm.id).one()
+ strm.fail_time = s.arrived_at
+ strm.fail_reason = reason
+ tc_session.update(strm)
+ tc_session.commit()
+ elif s.status == "CLOSED":
+ if isinstance(strm, FailedStream):
+ strm.close_reason = reason
+ else:
+ strm.expunge()
+ if not (lreason == "DONE" or (lreason == "END" and rreason == "DONE")):
+ # Convert to destroyed circuit
+ Stream.table.update(Stream.id ==
+ strm.id).execute(row_type='failedstream')
+ strm = FailedStream.query.filter_by(id=strm.id).one()
+ strm.fail_time = s.arrived_at
+ else:
+ strm.expunge()
+ # Convert to destroyed circuit
+ Stream.table.update(Stream.id ==
+ strm.id).execute(row_type='closedstream')
+ strm = ClosedStream.query.filter_by(id=strm.id).one()
+ strm.read_bandwidth = strm.tot_read_bytes/(s.arrived_at-strm.start_time)
+ strm.write_bandwidth = strm.tot_write_bytes/(s.arrived_at-strm.start_time)
+ strm.end_time = s.arrived_at
+ strm.close_reason = reason
+ tc_session.update(strm)
+ tc_session.commit()
+
def run_example(host, port):
""" Example of basic TorCtl usage. See PathSupport for more advanced
usage.
More information about the tor-commits
mailing list