[tor-commits] [ooni-probe/master] Merge remote-tracking branch 'thetorproject/master' into feature/move_nettests
art at torproject.org
art at torproject.org
Sat Sep 28 20:14:37 UTC 2013
commit 40f5789e926d9af0479bd1f879ad9ab6053f36c9
Merge: b9bc4f0 c027888
Author: Arturo Filastò <art at fuffa.org>
Date: Wed Sep 11 18:20:01 2013 +0200
Merge remote-tracking branch 'thetorproject/master' into feature/move_nettests
* thetorproject/master:
Add encoding format
Add a version and author to dnsspoof
Write unittest for testing https://github.com/TheTorProject/ooni-backend/pull/16
URL must be utf-8
ooni/deck.py | 2 +-
ooni/nettests/manipulation/dnsspoof.py | 9 +++++++--
ooni/tests/test_oonibclient.py | 9 +++++++++
3 files changed, 17 insertions(+), 3 deletions(-)
diff --cc ooni/nettests/manipulation/dnsspoof.py
index c9120a4,0000000..598c41f
mode 100644,000000..100644
--- a/ooni/nettests/manipulation/dnsspoof.py
+++ b/ooni/nettests/manipulation/dnsspoof.py
@@@ -1,70 -1,0 +1,75 @@@
++# -*- encoding: utf-8 -*-
++#
++# :authors: Arturo Filastò
++# :licence: see LICENSE
++
+from twisted.internet import defer
+from twisted.python import usage
+
+from scapy.all import IP, UDP, DNS, DNSQR
+
+from ooni.templates import scapyt
+from ooni.utils import log
+
+class UsageOptions(usage.Options):
+ optParameters = [['resolver', 'r', None,
+ 'Specify the resolver that should be used for DNS queries (ip:port)'],
+ ['hostname', 'h', None,
+ 'Specify the hostname of a censored site'],
+ ['backend', 'b', '8.8.8.8:53',
+ 'Specify the IP address of a good DNS resolver (ip:port)']
+ ]
+
+
+class DNSSpoof(scapyt.ScapyTest):
+ name = "DNS Spoof"
++ author = "Arturo Filastò"
++ version = "0.0.1"
+ timeout = 2
+
+ usageOptions = UsageOptions
+
+ requiredTestHelpers = {'backend': 'dns'}
+ requiredOptions = ['hostname', 'resolver']
+
+ def setUp(self):
+ self.resolverAddr, self.resolverPort = self.localOptions['resolver'].split(':')
+ self.resolverPort = int(self.resolverPort)
+
+ self.controlResolverAddr, self.controlResolverPort = self.localOptions['backend'].split(':')
+ self.controlResolverPort = int(self.controlResolverPort)
+
+ self.hostname = self.localOptions['hostname']
+
+ def postProcessor(self, report):
+ """
+ This is not tested, but the concept is that if the two responses
+ match up then spoofing is occuring.
+ """
+ try:
+ test_answer = report['test_a_lookup']['answered_packets'][0][1]
+ control_answer = report['test_control_a_lookup']['answered_packets'][0][1]
+ except IndexError:
+ self.report['spoofing'] = 'no_answer'
+ return
+
+ if test_answer[UDP] == control_answer[UDP]:
+ self.report['spoofing'] = True
+ else:
+ self.report['spoofing'] = False
+ return
+
+ @defer.inlineCallbacks
+ def test_a_lookup(self):
+ question = IP(dst=self.resolverAddr)/UDP()/DNS(rd=1,
+ qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname))
+ log.msg("Performing query to %s with %s:%s" % (self.hostname, self.resolverAddr, self.resolverPort))
+ yield self.sr1(question)
+
+ @defer.inlineCallbacks
+ def test_control_a_lookup(self):
+ question = IP(dst=self.controlResolverAddr)/UDP()/DNS(rd=1,
+ qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname))
+ log.msg("Performing query to %s with %s:%s" % (self.hostname,
+ self.controlResolverAddr, self.controlResolverPort))
+ yield self.sr1(question)
-
-
More information about the tor-commits
mailing list