[or-cvs] r10531: Introduced a new method of measuring all circuits in a pool (in torflow/trunk: . TorCtl)
renner at seul.org
renner at seul.org
Fri Jun 8 10:48:14 UTC 2007
Author: renner
Date: 2007-06-08 06:48:14 -0400 (Fri, 08 Jun 2007)
New Revision: 10531
Modified:
torflow/trunk/TorCtl/PathSupport.py
torflow/trunk/op-addon.py
Log:
Introduced a new method of measuring all circuits in a pool by simply using one connection
and pass it on to the next circuit on a DETACHED + more refactorings.
Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py 2007-06-08 09:30:58 UTC (rev 10530)
+++ torflow/trunk/TorCtl/PathSupport.py 2007-06-08 10:48:14 UTC (rev 10531)
@@ -18,7 +18,8 @@
"UniqueRestriction", "UniformGenerator", "OrderedExitGenerator",
"PathSelector", "Connection", "NickRestriction", "IdHexRestriction",
"PathBuilder", "SelectionManager", "CountryCodeRestriction",
-"CountryRestriction", "UniqueCountryRestriction", "ContinentRestriction" ]
+"CountryRestriction", "UniqueCountryRestriction", "ContinentRestriction",
+"ContinentJumperRestriction"]
#################### Path Support Interfaces #####################
Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py 2007-06-08 09:30:58 UTC (rev 10530)
+++ torflow/trunk/op-addon.py 2007-06-08 10:48:14 UTC (rev 10531)
@@ -4,7 +4,6 @@
Copyright (C) 2007 Johannes Renner
Contact: renner at i4.informatik.rwth-aachen.de
"""
-
# Addon for Onion Proxies (prototype-v0.0-alpha):
# Shall eventually improve the performance of anonymous communications
# and browsing by measuring RTTs of circuits/links, receiving infos
@@ -17,6 +16,7 @@
import copy
import math
import time
+import random
import socket
import threading
import Queue
@@ -45,25 +45,27 @@
timeout_limit = 1
slowness_limit = 3
# Slow RTT := x seconds
-slow = 1.5
+slow = 0.7
# Note: Tor-internal lifetime of a circuit is 10 min --> 600/sleep_interval = max-age
# Sleep interval between working loads in sec
sleep_interval = 5
# No of idle circuits to build preemptively
# TODO: Also configure ports to use
-idle_circuits = 6
+idle_circuits = 5
# Measure complete circuits
measure_circs = True
# Set to True if we want to measure partial circuits
measure_partial_circs = False
-# Testing mode: Close circuits after num_tests measures
+# Testing mode: Close circuits after num_tests measures +
+# involves a FileHandler to write collected data to a file
testing_mode = False
# Number of tests per circuit
num_tests = 5
# Do configuration here TODO: use my_country for src
+# Set src_country below when setting up our location
path_config = GeoIPSupport.GeoIPConfig(unique_countries = True,
src_country = None,
crossings = 1,
@@ -84,6 +86,9 @@
use_guards=True,
geoip_config=path_config)
+# Signalize that a round has finished
+finished_event = threading.Event()
+
######################################### BEGIN: Connection #####################
class Connection(TorCtl.Connection):
@@ -106,9 +111,73 @@
def build_circuit_from_path(self, path):
""" Build circuit using a given path shall be used to build circs from NetworkModel """
circ = Circuit()
+ circ.rtt_created = True
+ # Set path to circuit
+ circ.path = path
+ # Set exit
+ circ.exit = path[len(path)-1]
if len(path) > 0:
- circ.circ_id = self.extend_circuit(0, path)
+ circ.circ_id = self.extend_circuit(0, circ.id_path())
+ return circ
+######################################### Stats #####################
+
+class Stats:
+ def __init__(self):
+ self.values = []
+ self.min = 0.0
+ self.max = 0.0
+ self.mean = 0.0
+ self.dev = 0.0
+ self.median = 0.0
+
+ def add_value(self, value):
+ # Append value
+ self.values.append(value)
+ # Set min & max
+ if self.min == 0: self.min = value
+ elif self.min > value: self.min = value
+ if self.max < value: self.max = value
+ # Refresh everything
+ self.mean = self.get_mean()
+ self.dev = self.get_dev()
+ self.median = self.get_median()
+
+ def get_mean(self):
+ """ Compute mean from the values """
+ if len(self.values) > 0:
+ sum = reduce(lambda x, y: x+y, self.values, 0.0)
+ return sum/len(self.values)
+ else:
+ return 0.0
+
+ def get_dev(self):
+ """ Return the stddev of the values """
+ if len(self.values) > 1:
+ mean = self.get_mean()
+ sum = reduce(lambda x, y: x + ((y-mean)**2.0), self.values, 0.0)
+ s = math.sqrt(sum/(len(self.values)-1))
+ return s
+ else:
+ return 0.0
+
+ def get_median(self):
+ """ Return the median of the values """
+ if len(self.values) > 0:
+ self.values.sort()
+ return self.values[(len(self.values)-1)/2]
+ else: return 0.0
+
+class FileHandler:
+ """ FileHandler for appending collected data to a file """
+ def __init__(self, filename):
+ self.filename = filename
+
+ def write(self, line):
+ self.filehandle = open(self.filename, 'a')
+ self.filehandle.write(line + "\n")
+ self.filehandle.close()
+
######################################### Circuit, Stream #####################
class Circuit(PathSupport.Circuit):
@@ -118,38 +187,21 @@
# RTT stuff
self.part_rtts = {} # dict of partial rtts, pathlen 3: 1-2-None
self.current_rtt = None # double (sec): current value
- self.rtt_history = [] # rtt history for computing stats:
- self.avg_rtt = 0 # avg rtt value
- self.dev_rtt = 0 # standard deviation
+ self.stats = Stats() # stats about total RTT contains history
# Counters and flags
self.age = 0 # age in rounds
self.timeout_counter = 0 # timeout limit
self.slowness_counter = 0 # slowness limit
self.closed = False # mark circuit closed
-
- def get_avg_rtt(self):
- """ Compute average from history """
- if len(self.rtt_history) > 0:
- sum = reduce(lambda x, y: x+y, self.rtt_history, 0.0)
- return sum/len(self.rtt_history)
- else:
- return 0.0
-
- def get_dev_rtt(self):
- """ Return the stddev of measured rtts """
- if len(self.rtt_history) > 0:
- avg = self.get_avg_rtt()
- sum = reduce(lambda x, y: x + ((y-avg)**2.0), self.rtt_history, 0.0)
- return math.sqrt(sum/len(self.rtt_history))
- else:
- return 0.0
-
+ self.rtt_created = False # if this was created from the model
+
def add_rtt(self, rtt):
""" Add a new value and refresh the stats """
+ # Set current
self.current_rtt = rtt
- self.rtt_history.append(rtt)
- self.avg_rtt = self.get_avg_rtt()
- self.dev_rtt = self.get_dev_rtt()
+ # Add to the stats
+ self.stats.add_value(rtt)
+ # Increase age
self.age += 1
def to_string(self):
@@ -158,14 +210,17 @@
for r in self.path: s += " " + r.nickname + "(" + str(r.country_code) + ")"
if not self.built: s += " (not yet built)"
else: s += " (age=" + str(self.age) + ")"
- if self.current_rtt: s += ": " "RTT last/avg/dev: " + str(self.current_rtt) + "/" + str(self.avg_rtt) + "/"+ str(self.dev_rtt)
+ if self.current_rtt:
+ s += ": " "RTT current/median/mean/dev: "
+ s += str(self.current_rtt) + "/" + str(self.stats.median) + "/"
+ s += str(self.stats.mean) + "/" + str(self.stats.dev)
+ if self.rtt_created: s += "*"
return s
class Stream(PathSupport.Stream):
- """ Stream class extended to isPing and hop """
+ """ Stream class extended to hop """
def __init__(self, sid, host, port, kind):
PathSupport.Stream.__init__(self, sid, host, port, kind)
- self.isPing = False # set this to mark as a ping-stream
self.hop = None # save hop if this is a ping, hop=None means complete circ
######################################### BEGIN: NetworkModel #####################
@@ -177,30 +232,32 @@
# Set src and dest
self.src = src
self.dest = dest
- # Setup the history and record RTT
- self.rtt_history = []
- self.set_rtt(rtt)
+ # The current value
+ self.current_rtt = 0.0
+ # Setup the stats and record the first RTT
+ self.stats = Stats()
+ self.add_rtt(rtt)
- def set_rtt(self, rtt):
- self.rtt = rtt
- self.rtt_history.append(rtt)
+ def add_rtt(self, rtt):
+ self.current_rtt = rtt
+ self.stats.add_value(rtt)
class PathProposal:
""" Instances of this class are path-proposals """
def __init__(self, links, path):
# This is a list of LinkInfo objects
self.links = links
- # Also save the path for passing to build_circuit
- self.path = path
- # Compute the expected RTT
- self.rtt = reduce(lambda x,y: x + y.rtt, self.links, 0.0)
+ # Also save the path for passing to build_circuit, cut off ROOT
+ self.path = path[1:len(path)]
+ # Compute the expected RTT (from current value?)
+ self.rtt = reduce(lambda x,y: x + y.current_rtt, self.links, 0.0)
def to_string(self):
""" Create a string for printing out information """
s = ""
for l in self.links:
# Get the single objects
- s += l.src.nickname + "--" + l.dest.nickname + " (" + str(l.rtt) + ") " + ", "
+ s += l.src.nickname + "--" + l.dest.nickname + " (" + str(l.current_rtt) + ") " + ", "
return "Route proposal: " + s + "--> " + str(self.rtt) + " sec"
class NetworkModel:
@@ -213,6 +270,7 @@
# Initially add THIS proxy to the model
self.root = rooter
self.graph.add_node(self.root)
+ self.proposals = []
plog("DEBUG", "NetworkModel initiated: added " + self.root.nickname)
def add_link(self, src, dest, rtt):
@@ -239,16 +297,14 @@
for p in self.proposals:
print(p.to_string())
- def check_proposals(self, m, n):
- """ Check if we have at least m proposals with rtt <= n seconds """
- i = 0
+ def check_proposals(self, n):
+ """ Return all proposals with rtt <= n seconds """
+ ret = []
for p in self.proposals:
if p.rtt <= n:
- i += 1
- if p.rtt > n:
- return False
- if i == m:
- return True
+ ret.append(p)
+ plog("DEBUG", "Found " + str(len(ret)) + " path proposals having RTT <= " + str(n) + " sec")
+ return ret
def visit(self, node, path, i=1):
""" Recursive Depth-First-Search: Maybe use some existing method? """
@@ -280,7 +336,7 @@
def __init__(self, c, selmgr):
# Init the PathBuilder
PathSupport.PathBuilder.__init__(self, c, selmgr, GeoIPSupport.GeoIPRouter)
- self.circs_sorted = [] # list of circs sorted by avg_rtt
+ self.circs_sorted = [] # list of circs sorted by mean RTT
self.check_circuit_pool() # bring up the pool of circs
def check_circuit_pool(self):
@@ -295,13 +351,38 @@
self.build_idle_circuit()
plog("DEBUG", "Scheduled circuit No. " + str(n+1))
n += 1
- self.print_circuits()
+ def check_path(self, path):
+ """ Check if we already have a circuit with this path """
+ for c in self.circuits.values():
+ if c.path == path:
+ return False
+ return True
+
def build_idle_circuit(self):
""" Build an idle circuit """
circ = None
while circ == None:
try:
+ if measure_partial_circs:
+ # Get the proposals RTT <= 0.5
+ proposals = self.model.check_proposals(slow)
+ # TODO: Ensure we also create new paths (check number of circs with rtt_created)
+ # TODO: Check if we have > m proposals
+ while len(proposals) > 0:
+ choice = random.choice(proposals)
+ # Check if we already have a circ with this path
+ if self.check_path(choice.path):
+ plog("INFO", "Chosen proposal: " + choice.to_string())
+ circ = self.c.build_circuit_from_path(choice.path)
+ self.circuits[circ.circ_id] = circ
+ return
+ else:
+ plog("DEBUG", "Proposed circuit already exists")
+ # Remove from the proposals
+ proposals.remove(choice)
+ plog("DEBUG", "Falling back to normal path selection")
+
# Build the circuit
self.selmgr.set_target("255.255.255.255", 80)
circ = self.c.build_circuit(self.selmgr.pathlen, self.selmgr.path_selector)
@@ -313,15 +394,14 @@
def print_circuits(self):
""" Print out our circuits plus some info """
- #circs = self.circs_sorted
circs = self.circuits.values()
plog("INFO", "We have " + str(len(circs)) + " circuits:")
for c in circs:
print("+ " + c.to_string())
def refresh_sorted_list(self):
- """ Sort the list for average RTTs """
- self.circs_sorted = sort_list(self.circuits.values(), lambda x: x.avg_rtt)
+ """ Sort the list for their mean RTTs """
+ self.circs_sorted = sort_list(self.circuits.values(), lambda x: x.stats.mean)
plog("DEBUG", "Refreshed sorted list of circuits")
def circ_status_event(self, c):
@@ -350,13 +430,8 @@
del self.circuits[c.circ_id]
# Give away pending streams
for stream in circ.pending_streams:
- if stream.isPing:
- #plog("DEBUG", "Finding new circ for ping stream " + str(stream.strm_id))
- # Close the stream?
- pass
- if not stream.isPing:
- plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
- self.attach_stream_any(stream, stream.detached_from)
+ plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
+ self.attach_stream_any(stream, stream.detached_from)
# Refresh the list
self.refresh_sorted_list()
# Check if there are enough circs
@@ -415,15 +490,18 @@
# FIXME: Consider actually closing circ if no streams.
self.circuits[key].dirty = True
- # Choose from the sorted list!
+ # Choose from the sorted list
+ # TODO: We don't have a sorted list if we don't measure!
for circ in self.circs_sorted:
# Only attach if we already measured
- if circ.built and circ.circ_id not in badcircs and not circ.closed and circ.avg_rtt:
+ if circ.built and not circ.closed and circ.circ_id not in badcircs and circ.current_rtt:
if circ.exit.will_exit_to(stream.host, stream.port):
try:
self.c.attach_stream(stream.strm_id, circ.circ_id)
stream.pending_circ = circ # Only one possible here
circ.pending_streams.append(stream)
+ # Clear cache after the attach?
+ self.clear_dns_cache()
except TorCtl.ErrorReply, e:
# No need to retry here. We should get the failed
# event for either the circ or stream next
@@ -450,8 +528,50 @@
self.circuits[circ.circ_id] = circ
self.last_exit = circ.exit
- # Catch user stream events
- def handle_other_events(self, s):
+ def stream_status_event(self, s):
+ """ Catch user stream events """
+ # Construct debugging output
+ output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id), s.target_host, str(s.target_port)]
+ if s.reason: output.append("REASON=" + s.reason)
+ if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
+ plog("DEBUG", " ".join(output))
+
+ # If target_host is not an IP-address
+ if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+ s.target_host = "255.255.255.255" # ignore DNS for exit policy check
+
+ # NEW or NEWRESOLVE
+ if s.status == "NEW" or s.status == "NEWRESOLVE":
+ if s.status == "NEWRESOLVE" and not s.target_port:
+ s.target_port = self.resolve_port
+ # Set up the new stream
+ stream = Stream(s.strm_id, s.target_host, s.target_port, s.status)
+ self.streams[s.strm_id] = stream
+ self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
+
+ # DETACHED
+ elif s.status == "DETACHED":
+ # Stream not found
+ if s.strm_id not in self.streams:
+ plog("WARN", "Detached stream " + str(s.strm_id) + " not found")
+ self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, "NEW")
+ # Circuit not found
+ if not s.circ_id:
+ plog("WARN", "Stream " + str(s.strm_id) + " detached from no circuit!")
+ else:
+ self.streams[s.strm_id].detached_from.append(s.circ_id)
+ # Detect timeouts on user streams
+ if s.reason == "TIMEOUT":
+ # Increase a timeout counter on the stream?
+ #self.circuits[s.circ_id].timeout_counter += 1
+ plog("DEBUG", "User stream timed out on circuit " + str(s.circ_id))
+ # Stream was pending
+ if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
+ self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+ # Attach to another circ
+ self.streams[s.strm_id].pending_circ = None
+ self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
+
# SUCCEEDED
if s.status == "SUCCEEDED":
if s.strm_id not in self.streams:
@@ -483,7 +603,7 @@
self.streams[s.strm_id].failed = True
if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
elif self.streams[s.strm_id].attached_at != 0:
- plog("WARN","Failed stream on unknown circuit " + str(s.circ_id))
+ plog("WARN", "Failed stream on unknown circuit " + str(s.circ_id))
return
# CLOSED
if self.streams[s.strm_id].pending_circ:
@@ -504,22 +624,29 @@
######################################### BEGIN: PingHandler #####################
-# This class extends the StreamHandler
class PingHandler(StreamHandler):
+ """ This class extends the general StreamHandler to handle ping-requests """
def __init__(self, c, selmgr, router):
- # Init the StreamHandler
- StreamHandler.__init__(self, c, selmgr)
# Anything ping-related
self.ping_queue = Queue.Queue() # (circ_id, hop)-pairs
self.start_times = {} # dict mapping (circ_id, hop):start_time TODO: cleanup
- # Start the Pinger that schedules the measurings
- self.ping_manager = Pinger(self, router)
- self.ping_manager.setDaemon(True)
- self.ping_manager.start()
+ # Start the Pinger that triggers the connections
+ self.pinger = Pinger(self)
+ self.pinger.setDaemon(True)
+ self.pinger.start()
+ # Additional stuff for partial measurings
+ if measure_partial_circs:
+ self.router = router # this object represents this OR
+ self.model = NetworkModel(self.router) # model for recording link-RTTs
+ # Handle testing_mode
+ if testing_mode:
+ self.filehandler = FileHandler("data/circuits")
+ # Init the StreamHandler
+ StreamHandler.__init__(self, c, selmgr)
def enqueue_pings(self):
- """ To be schedule_immediated by ping_manager before the first connection is triggered """
- # TODO: Empty the queue?
+ """ To be schedule_immediated by pinger before the first connection is triggered """
+ print("\n")
circs = self.circuits.values()
for c in circs:
if c.built:
@@ -534,14 +661,58 @@
# And for the whole circuit ...
self.ping_queue.put((id, None))
plog("DEBUG", "Enqueued circuit " + str(id) + " hop None")
-
- def attach_ping(self, stream, arrived_at):
+
+ def compute_link_RTTs(self):
+ """ Get the circs and check if we can compute RTTs of single links and store these in the model """
+ circs = self.circuits.values()
+ # Measure also the duration
+ start = time.time()
+ for c in circs:
+ # Get the length
+ path_len = len(c.path)
+ # Go through the path
+ for i in xrange(1,path_len):
+ if i in c.part_rtts:
+ # First hop --> add Link from Root to 1
+ if i == 1:
+ link_rtt = c.part_rtts[i]
+ self.model.add_link(self.router, c.path[i-1], link_rtt)
+ # Handle i -- (i+1)
+ if i+1 in c.part_rtts:
+ link_rtt = c.part_rtts[i+1] - c.part_rtts[i]
+ if link_rtt > 0:
+ plog("INFO", "Computed link-RTT: " + str(link_rtt))
+ # Save to NetworkModel
+ self.model.add_link(c.path[i-1], c.path[i], link_rtt)
+ else:
+ plog("WARN", "Negative link-RTT: " + str(link_rtt))
+ # Handle (n-1) -- n
+ elif None in c.part_rtts:
+ # We have a total value
+ link_rtt = c.part_rtts[None] - c.part_rtts[i]
+ if link_rtt > 0:
+ plog("INFO", "Computed link-RTT: " + str(link_rtt))
+ # Save to NetworkModel
+ self.model.add_link(c.path[i-1], c.path[i], link_rtt)
+ else:
+ plog("WARN", "Negative link-RTT: " + str(link_rtt))
+ plog("DEBUG", "Computation of link-RTTs took us " + str(time.time()-start) + " seconds")
+ # Print out the model
+ self.model.print_graph()
+ self.model.find_circuits()
+
+ def attach_ping(self, stream):
""" Attach a ping stream to its circuit """
- plog("DEBUG", "New ping request")
- # Check if there is something in the queue
if self.ping_queue.empty():
- plog("DEBUG", "Queue is empty --> discarding ping stream " + str(stream.strm_id))
+ # This round has finished
+ plog("INFO", "Queue is empty --> round finished, closing stream " + str(stream.strm_id))
self.c.close_stream(stream.strm_id, 5)
+ # Fire the event
+ finished_event.set()
+ # Call the rest from here?
+ self.print_circuits()
+ if measure_partial_circs:
+ self.compute_link_RTTs()
return
else:
# Get the info and extract
@@ -555,142 +726,111 @@
if circ_id in self.circuits:
circ = self.circuits[circ_id]
if circ.built and not circ.closed:
- self.c.attach_stream(stream.strm_id, circ.circ_id, hop)
- # Measure here or move to before attaching?
- self.start_times[(circ_id, hop)] = arrived_at
- stream.hop = hop
- stream.pending_circ = circ # Only one possible here
- circ.pending_streams.append(stream)
+ stream.hop = hop
+ self.c.attach_stream(stream.strm_id, circ.circ_id, hop)
+ # Don't use pending for pings
else:
- plog("WARN", "Circuit not built")
+ plog("WARN", "Circuit not built or closed")
+ self.attach_ping(stream)
else:
- # Close stream if circuit is gone
- plog("WARN", "Circuit does not exist anymore, closing stream " + str(stream.strm_id))
- self.c.close_stream(stream.strm_id, 5)
+ # Go to next test if circuit is gone
+ plog("WARN", "Circuit " + str(circ_id) + " does not exist anymore --> passing")
+ self.attach_ping(stream)
except TorCtl.ErrorReply, e:
plog("WARN", "Error attaching stream: " + str(e.args))
- # TODO: Separate pings from normal streams directly, to make StreamHandler usable even without this ..
def stream_status_event(self, s):
- """ Catch stream status events: Handle NEW and DETACHED here,
- pass other events to StreamHandler """
+ """ Separate Pings from regular streams directly """
+ if not (s.target_host == ping_dummy_host and s.target_port == ping_dummy_port):
+ # This is no ping, call the other method
+ return StreamHandler.stream_status_event(self, s)
+
# Construct debugging output
output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id), s.target_host, str(s.target_port)]
if s.reason: output.append("REASON=" + s.reason)
if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
plog("DEBUG", " ".join(output))
-
- # If target_host is not an IP-address
- if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
- s.target_host = "255.255.255.255" # ignore DNS for exit policy check
-
+
# NEW or NEWRESOLVE
- if s.status == "NEW" or s.status == "NEWRESOLVE":
- if s.status == "NEWRESOLVE" and not s.target_port:
- s.target_port = self.resolve_port
- # Set up the new stream
+ if s.status == "NEW":
+ # Set up the stream object
stream = Stream(s.strm_id, s.target_host, s.target_port, s.status)
self.streams[s.strm_id] = stream
- # (Double-)Check if this is a ping stream
- if (stream.host == ping_dummy_host) & (stream.port == ping_dummy_port):
- # Set isPing
- stream.isPing = True
- self.attach_ping(stream, s.arrived_at)
+ self.attach_ping(stream)
+
+ # SENTCONNECT
+ elif s.status == "SENTCONNECT":
+ # Measure here, means save arrived_at in the dict
+ self.start_times[(s.circ_id, self.streams[s.strm_id].hop)] = s.arrived_at
+
+ # DETACHED (CLOSED + TORPROTOCOL is also ping, some routers send it when measuring 1-hop)
+ elif s.status == "DETACHED" or (s.status == "CLOSED" and s.remote_reason == "TORPROTOCOL"):
+ if (s.reason == "TIMEOUT"):
+ self.circuits[s.circ_id].timeout_counter += 1
+ self.circuits[s.circ_id].slowness_counter += 1
+ plog("DEBUG", str(self.circuits[s.circ_id].timeout_counter) + " timeout(s) on circuit " + str(s.circ_id))
+ if self.circuits[s.circ_id].timeout_counter >= timeout_limit and not self.circuits[s.circ_id].closed:
+ # Close the circuit
+ plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
+ self.circuits[s.circ_id].closed = True
+ try: self.c.close_circuit(s.circ_id)
+ except TorCtl.ErrorReply, e:
+ plog("ERROR", "Failed closing circuit " + str(s.circ_id) + ": " + str(e))
+ # Set RTT for circ to None
+ self.circuits[s.circ_id].current_rtt = None
+
else:
- self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
-
- # DETACHED
- elif s.status == "DETACHED":
- # Stream not found
- if s.strm_id not in self.streams:
- plog("WARN", "Detached stream " + str(s.strm_id) + " not found")
- self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, "NEW")
- # s.circ_id not found
- if not s.circ_id:
- plog("WARN", "Stream " + str(s.strm_id) + " detached from no circuit!")
- else:
- self.streams[s.strm_id].detached_from.append(s.circ_id)
+ # No timeout, this is a successful ping: measure here
+ hop = self.streams[s.strm_id].hop
+ # Compute RTT using arrived_at
+ rtt = s.arrived_at - self.start_times[(s.circ_id, hop)]
+ plog("INFO", "Measured RTT: " + str(rtt) + " sec")
+ # Save RTT to circuit
+ self.circuits[s.circ_id].part_rtts[hop] = rtt
- # If this is a ping
- if self.streams[s.strm_id].isPing:
- if (s.reason == "TIMEOUT"):
- self.circuits[s.circ_id].timeout_counter += 1
- self.circuits[s.circ_id].slowness_counter += 1
- plog("DEBUG", str(self.circuits[s.circ_id].timeout_counter) + " timeout(s) on circuit " + str(s.circ_id))
- if self.circuits[s.circ_id].timeout_counter >= timeout_limit and not self.circuits[s.circ_id].closed:
- # Close the circuit
- plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
- self.circuits[s.circ_id].closed = True
- try: self.c.close_circuit(s.circ_id)
- except TorCtl.ErrorReply, e:
- plog("ERROR", "Failed closing circuit " + str(s.circ_id) + ": " + str(e))
- # Set RTT for circ to None
- self.circuits[s.circ_id].current_rtt = None
- # Only close the stream
- self.c.close_stream(s.strm_id, 7)
- return
- # This is a successful ping: measure here
- hop = self.streams[s.strm_id].hop
- # Compute RTT using arrived_at
- rtt = s.arrived_at - self.start_times[(s.circ_id, hop)]
- plog("INFO", "Measured RTT: " + str(rtt) + " sec")
- # Save RTT to circuit
- self.circuits[s.circ_id].part_rtts[hop] = rtt
- if hop == None:
- # This is a total circuit measuring
+ if hop == None:
+ # This is a total circuit measuring
self.circuits[s.circ_id].add_rtt(rtt)
- plog("DEBUG", "Added RTT to history: " + str(self.circuits[s.circ_id].rtt_history))
-
+ plog("DEBUG", "Added RTT to history: " + str(self.circuits[s.circ_id].stats.values))
+
+ # Close if num_tests is reached
if testing_mode:
- # Close if num_tests is reached
if self.circuits[s.circ_id].age >= num_tests:
- self.print_circuits()
+ plog("DEBUG", "Closing circ " + str(s.circ_id) + ": num_tests is reached")
self.circuits[s.circ_id].closed = True
+ # Save stats to a file in for generating plots etc.
+ self.filehandler.write(str(self.circuits[s.circ_id].stats.mean) + "\t" + str(self.circuits[s.circ_id].stats.dev))
self.c.close_circuit(s.circ_id)
- # Close if slow-max is reached on avg_rtt
- if self.circuits[s.circ_id].avg_rtt >= slow:
+ # Close if slow-max is reached on mean RTT
+ if self.circuits[s.circ_id].stats.mean >= slow:
self.circuits[s.circ_id].slowness_counter += 1
- plog("DEBUG", "Slow circuit " + str(s.circ_id))
if self.circuits[s.circ_id].slowness_counter >= slowness_limit and not self.circuits[s.circ_id].closed:
plog("DEBUG", "Slow-max (" + str(slowness_limit) + ") is reached --> closing circuit " + str(s.circ_id))
self.circuits[s.circ_id].closed = True
self.c.close_circuit(s.circ_id)
- # Resort every time ??
- self.refresh_sorted_list()
- # Close the stream
- self.c.close_stream(s.strm_id, 6)
+ # Resort only if this is for the complete circ
+ self.refresh_sorted_list()
+
+ if s.status == "CLOSED":
+ # Stream is gone .. we have to create a new ping :(
+ t = threading.Thread(None, self.pinger.ping, "Ping")
+ t.setDaemon(True)
+ t.start()
return
-
- # Detect timeouts on user streams
- if s.reason == "TIMEOUT":
- # Don't increase counter, cause could be a machine that's not responding
- #self.circuits[s.circ_id].timeout_counter += 1
- plog("DEBUG", "User stream timed out on circuit " + str(s.circ_id))
- # Stream was pending
- if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
- self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
- # Attach to another circ
- self.streams[s.strm_id].pending_circ = None
- self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
-
- else:
- self.handle_other_events(s)
+ # Call attach ping here and use only one stream for all tests
+ self.attach_ping(self.streams[s.strm_id])
+ return
######################################### BEGIN: Pinger #####################
class Pinger(threading.Thread):
""" Separate thread that triggers the Socks4-connections for pings """
- def __init__(self, ping_handler, router=None):
- self.handler = ping_handler # The PingHandler
- self.router = router # This router object represents us
- if measure_partial_circs:
- # Create the model for recording link-RTTs only if wished
- self.model = NetworkModel(self.router)
- # Call thread-constructor
- threading.Thread.__init__(self)
+ def __init__(self, ping_handler):
+ self.handler = ping_handler # the PingHandler
+ threading.Thread.__init__(self) # call the thread-constructor
def run(self):
""" The run()-method """
@@ -700,28 +840,16 @@
def do_work(self):
""" Do the work """
- # Measure RTTs of complete circuits
- if measure_circs:
- # Schedule to enqueue all built circs
- def notlambda(h): h.enqueue_pings()
- self.handler.schedule_immediate(notlambda)
- # Get number of _all_ circs (>= no. of circs built)
- i = len(self.handler.circuits)
- for x in xrange(1, i+1):
- # Maybe run in parallel (separate threads)?
- plog("DEBUG", "Triggered ping " + str(x) + "/" + str(i))
- self.ping()
-
- # Compute link-RTTs
- if measure_partial_circs:
- self.compute_link_RTTs()
- # Check if we have m proposals with rtt <= n
- if self.model.check_proposals(10, 0.5):
- plog("INFO", "We now have enough proposals!")
-
- # Print circuits for logging
- self.handler.schedule_immediate(lambda x: x.print_circuits())
-
+ # Event is only needed, because some routers close our connection if trying
+ # to use them as one-hop, so we need to create a new connection sometimes and
+ # cannot rely on the failing of our first connection
+ finished_event.clear()
+ # Let all circs to test be enqueued
+ self.handler.schedule_immediate(lambda x: x.enqueue_pings())
+ # Simply trigger only _one_ connection
+ self.ping()
+ finished_event.wait()
+
# No "try .. except .. finally .." in Python < 2.5 !
def ping(self):
""" Create a connection to dummy_host/_port using Socks4 """
@@ -739,44 +867,6 @@
# Close the socket if open
if s: s.close()
- def compute_link_RTTs(self):
- """ Get a copy of the circs and check if we can compute links
- and store these in the model """
- # TODO: Refactor: move to PingHandler or get a _copy_ of the circuits
- circs = self.handler.circuits.values()
- for c in circs:
- # Get the length
- path_len = len(c.path)
- # Go through the path
- for i in xrange(1,path_len):
- if i in c.part_rtts:
- # First hop --> add Link from Root to 1
- if i == 1:
- link_rtt = c.part_rtts[i]
- self.model.add_link(self.router, c.path[i-1], link_rtt)
- # Handle i -- (i+1)
- if i+1 in c.part_rtts:
- link_rtt = c.part_rtts[i+1] - c.part_rtts[i]
- if link_rtt > 0:
- plog("INFO", "Computed link RTT = " + str(link_rtt))
- # Save to NetworkModel
- self.model.add_link(c.path[i-1], c.path[i], link_rtt)
- else:
- plog("WARN", "Negative RTT: " + str(link_rtt))
- # Handle (n-1) -- n
- elif None in c.part_rtts:
- # We have a total value
- link_rtt = c.part_rtts[None] - c.part_rtts[i]
- if link_rtt > 0:
- plog("INFO", "Computed link RTT = " + str(link_rtt))
- # Save to NetworkModel
- self.model.add_link(c.path[i-1], c.path[i], link_rtt)
- else:
- plog("WARN", "Negative RTT: " + str(link_rtt))
- # Print out the model
- self.model.print_graph()
- self.model.find_circuits()
-
######################################### END: Pinger #####################
def connect(control_host, control_port):
@@ -787,18 +877,21 @@
def setup_location(conn):
""" Setup a router object representing this proxy """
+ global path_config
plog("INFO","Setting up our location")
+ ip = 0
+ # Try to get our IP from Tor
try:
- # Get our IP from Tor and set up a router object
info = conn.get_info("address")
ip = info["address"]
- router = GeoIPSupport.GeoIPRouter(TorCtl.Router(None,"ROOT",None,False,None,None,ip,None,None))
- #country = GeoIPSupport.geoip.country_code_by_addr(my_ip)
- #country = GeoIPSupport.get_country_from_record(my_ip)
- plog("INFO", "Our IP address is " + router.get_ip_dotted() + " [" + router.country_code + "]")
- return router
except:
plog("ERROR", "Could not get our IP")
+ # Set up a router object
+ router = GeoIPSupport.GeoIPRouter(TorCtl.Router(None,"ROOT",None,False,None,None,ip,None,None))
+ plog("INFO", "Our IP address is " + router.get_ip_dotted() + " [" + router.country_code + "]")
+ # To be configured
+ path_config.src_country = router.country_code
+ return router
def configure(conn):
""" Set events and options """
@@ -820,7 +913,10 @@
# Configure myself
configure(conn)
# Set Handler to the connection
- handler = PingHandler(conn, __selmgr, router)
+ if measure_circs:
+ handler = PingHandler(conn, __selmgr, router)
+ else:
+ handler = StreamHandler(conn, __selmgr)
conn.set_event_handler(handler)
# Go to sleep to be able to get killed from the commandline
try:
More information about the tor-commits
mailing list