[tor-commits] [ooni-probe/master] Add ICMP traceroute
art at torproject.org
art at torproject.org
Thu May 31 03:01:42 UTC 2012
commit e452b755b750781b14b372780966e67a7af2f0fc
Author: Arturo Filastò <hellais at torproject.org>
Date: Thu May 3 00:29:59 2012 +0200
Add ICMP traceroute
---
lib/traceroute.py | 32 +++++
lib/txtraceroute.py | 314 +++++++++++++++++++++++++++++++++++++++++++++++++++
oonicli.py | 16 ++-
3 files changed, 357 insertions(+), 5 deletions(-)
diff --git a/lib/traceroute.py b/lib/traceroute.py
new file mode 100644
index 0000000..98015ea
--- /dev/null
+++ b/lib/traceroute.py
@@ -0,0 +1,32 @@
+import sys
+import socket
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.internet import threads
+
+from txtraceroute import traceroute
+
+def run(target, src_port, dst_port):
+ res = []
+ @defer.inlineCallbacks
+ def start_trace(target, **settings):
+ hops = yield traceroute(target, **settings)
+ for hop in hops:
+ res.append(hop.get())
+ reactor.stop()
+
+ settings = dict(hop_callback=None,
+ timeout=2,
+ max_tries=3,
+ max_hops=30)
+ try:
+ target = socket.gethostbyname(target)
+ except Exception, e:
+ print("could not resolve '%s': %s" % (target, str(e)))
+ sys.exit(1)
+
+ reactor.callWhenRunning(start_trace, target, **settings)
+ reactor.run()
+ return res
+
+print run("google.com")
diff --git a/lib/txtraceroute.py b/lib/txtraceroute.py
new file mode 100644
index 0000000..f74dfb1
--- /dev/null
+++ b/lib/txtraceroute.py
@@ -0,0 +1,314 @@
+#!/usr/bin/env python
+# coding: utf-8
+#
+# Copyright (c) 2012 Alexandre Fiori
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import json
+import operator
+import os
+import socket
+import struct
+import sys
+import time
+
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.internet import threads
+from twisted.python import usage
+from twisted.web.client import getPage
+
+
+class iphdr(object):
+ def __init__(self, proto=socket.IPPROTO_ICMP, src="0.0.0.0", dst=None):
+ self.version = 4
+ self.hlen = 5
+ self.tos = 0
+ self.length = 20
+ self.id = os.getpid()
+ self.frag = 0
+ self.ttl = 255
+ self.proto = proto
+ self.cksum = 0
+ self.src = src
+ self.saddr = socket.inet_aton(src)
+ self.dst = dst or "0.0.0.0"
+ self.daddr = socket.inet_aton(self.dst)
+ self.data = ""
+
+ def assemble(self):
+ header = struct.pack('BBHHHBB',
+ (self.version & 0x0f) << 4 | (self.hlen & 0x0f),
+ self.tos, self.length + len(self.data),
+ socket.htons(self.id), self.frag,
+ self.ttl, self.proto)
+ return header + "\000\000" + self.saddr + self.daddr + self.data
+
+ @classmethod
+ def disassemble(self, data):
+ ip = iphdr()
+ pkt = struct.unpack('!BBHHHBBH', data[:12])
+ ip.version = (pkt[0] >> 4 & 0x0f)
+ ip.hlen = (pkt[0] & 0x0f)
+ ip.tos, ip.length, ip.id, ip.frag, ip.ttl, ip.proto, ip.cksum = pkt[1:]
+ ip.saddr = data[12:16]
+ ip.daddr = data[16:20]
+ ip.src = socket.inet_ntoa(ip.saddr)
+ ip.dst = socket.inet_ntoa(ip.daddr)
+ return ip
+
+ def __repr__(self):
+ return "IP (tos %s, ttl %s, id %s, frag %s, proto %s, length %s) " \
+ "%s -> %s" % \
+ (self.tos, self.ttl, self.id, self.frag, self.proto,
+ self.length, self.src, self.dst)
+
+class icmphdr(object):
+ def __init__(self, data=""):
+ self.type = 8
+ self.code = 0
+ self.cksum = 0
+ self.id = os.getpid()
+ self.sequence = 0
+ self.data = data
+
+ def assemble(self):
+ part1 = struct.pack("BB", self.type, self.code)
+ part2 = struct.pack("!HH", self.id, self.sequence)
+ cksum = self.checksum(part1 + "\000\000" + part2 + self.data)
+ cksum = struct.pack("!H", cksum)
+ return part1 + cksum + part2 + self.data
+
+ @classmethod
+ def checksum(self, data):
+ if len(data) & 1:
+ data += "\0"
+ cksum = reduce(operator.add,
+ struct.unpack('!%dH' % (len(data) >> 1), data))
+ cksum = (cksum >> 16) + (cksum & 0xffff)
+ cksum += (cksum >> 16)
+ cksum = (cksum & 0xffff) ^ 0xffff
+ return cksum
+
+ @classmethod
+ def disassemble(self, data):
+ icmp = icmphdr()
+ pkt = struct.unpack("!BBHHH", data)
+ icmp.type, icmp.code, icmp.cksum, icmp.id, icmp.sequence = pkt
+ return icmp
+
+ def __repr__(self):
+ return "ICMP (type %s, code %s, id %s, sequence %s)" % \
+ (self.type, self.code, self.id, self.sequence)
+
+
+ at defer.inlineCallbacks
+def geoip_lookup(ip):
+ try:
+ r = yield getPage("http://freegeoip.net/json/%s" % ip)
+ d = json.loads(r)
+ items = [d["country_name"], d["region_name"], d["city"]]
+ text = ", ".join([s for s in items if s])
+ defer.returnValue(text.encode("utf-8"))
+ except Exception:
+ defer.returnValue("Unknown location")
+
+
+ at defer.inlineCallbacks
+def reverse_lookup(ip):
+ try:
+ r = yield threads.deferToThread(socket.gethostbyaddr, ip)
+ defer.returnValue(r[0])
+ except Exception:
+ defer.returnValue(None)
+
+
+class Hop(object):
+ def __init__(self, target, ttl):
+ self.found = False
+ self.tries = 0
+ self.last_try = 0
+ self.remote_ip = None
+ self.remote_icmp = None
+ self.remote_host = None
+ self.location = ""
+
+ self.ttl = ttl
+ self.ip = iphdr(dst=target)
+ self.ip.ttl = ttl
+ self.ip.id += ttl
+
+ self.icmp = icmphdr("traceroute")
+ self.icmp.id = self.ip.id
+ self.ip.data = self.icmp.assemble()
+
+ self._pkt = self.ip.assemble()
+
+ @property
+ def pkt(self):
+ self.tries += 1
+ self.last_try = time.time()
+ return self._pkt
+
+ def get(self):
+ if self.found:
+ if self.remote_host:
+ ip = self.remote_host
+ else:
+ ip = self.remote_ip.src
+ ping = self.found - self.last_try
+ else:
+ ip = None
+ ping = None
+
+ location = self.location if self.location else None
+ return {'ttl': self.ttl, 'ping': ping, 'ip': ip, 'location': location}
+
+ def __repr__(self):
+ if self.found:
+ if self.remote_host:
+ ip = ":: %s" % self.remote_host
+ else:
+ ip = ":: %s" % self.remote_ip.src
+ ping = "%0.3fs" % (self.found - self.last_try)
+ else:
+ ip = "??"
+ ping = "-"
+
+ location = ":: %s" % self.location if self.location else ""
+ return "%02d. %s %s %s" % (self.ttl, ping, ip, location)
+
+
+class TracerouteProtocol(object):
+ def __init__(self, target, **settings):
+ self.target = target
+ self.settings = settings
+ self.fd = socket.socket(socket.AF_INET, socket.SOCK_RAW,
+ socket.IPPROTO_ICMP)
+ self.fd.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
+
+ self.hops = []
+ self.out_queue = []
+ self.waiting = True
+ self.deferred = defer.Deferred()
+
+ reactor.addReader(self)
+ reactor.addWriter(self)
+
+ # send 1st probe packet
+ self.out_queue.append(Hop(self.target, 1))
+
+ def logPrefix(self):
+ return "TracerouteProtocol(%s)" % self.target
+
+ def fileno(self):
+ return self.fd.fileno()
+
+ @defer.inlineCallbacks
+ def hopFound(self, hop, ip, icmp):
+ hop.remote_ip = ip
+ hop.remote_icmp = icmp
+
+ if (ip and icmp):
+ hop.found = time.time()
+ if self.settings.get("geoip_lookup") is True:
+ hop.location = yield geoip_lookup(ip.src)
+
+ if self.settings.get("reverse_lookup") is True:
+ hop.remote_host = yield reverse_lookup(ip.src)
+
+ ttl = hop.ttl + 1
+ last = self.hops[-2:]
+ if len(last) == 2 and last[0].remote_ip == ip or \
+ (ttl > (self.settings.get("max_hops", 30) + 1)):
+ done = True
+ else:
+ done = False
+
+ if not done:
+ cb = self.settings.get("hop_callback")
+ if callable(cb):
+ yield defer.maybeDeferred(cb, hop)
+
+ if not self.waiting:
+ if self.deferred:
+ self.deferred.callback(self.hops)
+ self.deferred = None
+ else:
+ self.out_queue.append(Hop(self.target, ttl))
+
+ def doRead(self):
+ if not self.waiting or not self.hops:
+ return
+
+ pkt = self.fd.recv(4096)
+
+ # disassemble ip header
+ ip = iphdr.disassemble(pkt[:20])
+ if ip.proto != socket.IPPROTO_ICMP:
+ return
+
+ found = False
+
+ # disassemble icmp header
+ icmp = icmphdr.disassemble(pkt[20:28])
+ if icmp.type == 0 and icmp.id == self.hops[-1].icmp.id:
+ found = True
+ elif icmp.type == 11:
+ # disassemble referenced ip header
+ ref = iphdr.disassemble(pkt[28:48])
+ if ref.dst == self.target:
+ found = True
+
+ if ip.src == self.target:
+ self.waiting = False
+
+ if found:
+ self.hopFound(self.hops[-1], ip, icmp)
+
+ def hopTimeout(self, *ign):
+ hop = self.hops[-1]
+ if not hop.found:
+ if hop.tries < self.settings.get("max_tries", 3):
+ # retry
+ self.out_queue.append(hop)
+ else:
+ # give up and move forward
+ self.hopFound(hop, None, None)
+
+ def doWrite(self):
+ if self.waiting and self.out_queue:
+ hop = self.out_queue.pop(0)
+ pkt = hop.pkt
+ if not self.hops or (self.hops and hop.ttl != self.hops[-1].ttl):
+ self.hops.append(hop)
+ self.fd.sendto(pkt, (hop.ip.dst, 0))
+
+ timeout = self.settings.get("timeout", 1)
+ reactor.callLater(timeout, self.hopTimeout)
+
+ def connectionLost(self, why):
+ pass
+
+
+def traceroute(target, **settings):
+ tr = TracerouteProtocol(target, **settings)
+ return tr.deferred
diff --git a/oonicli.py b/oonicli.py
index c2d01d2..68384f8 100755
--- a/oonicli.py
+++ b/oonicli.py
@@ -63,7 +63,7 @@ class StupidAsset(object):
return self.idx
-def runTest(test, options):
+def runTest(test, options, global_options):
asset = None
if options['asset']:
print options['asset']
@@ -73,7 +73,10 @@ def runTest(test, options):
wgen = work.WorkGenerator(asset, plugoo[test].__class__,
dict(options), start=options['resume'])
- worker = work.Worker()
+ if global_options['parallelism']:
+ wgen.size = int(global_options['parallelism'])
+ worker = work.Worker(wgen.size)
+
for x in wgen:
worker.push(x)
@@ -92,7 +95,8 @@ class Options(usage.Options):
]
optParameters = [
- ['node', 'n', 'localhost:31415', 'Select target node'],
+ ['parallelism', 'n', 10, "Specify the number of parallel tests to run"],
+ ['target-node', 't', 'localhost:31415', 'Select target node'],
['ooninet', 'o', 'localhost:4242', "Select OONI-net address for reporting"],
['password', 'p', 'opennetwork', "Specify the password for authentication"],
]
@@ -129,8 +133,10 @@ if not config.subCommand:
sys.exit(1)
if config['local']:
- runTest(config.subCommand, config.subOptions)
+ runTest(config.subCommand, config.subOptions, config)
else:
- print "The test will be run on the node %s" % config['node']
+ print "This feature is currently not supported. :("
+ print "Use -l to run the test locally."
+ sys.exit(0)
More information about the tor-commits
mailing list