[tor-commits] [ooni-probe/master] Rewrite DNS Tamper test to properly use Twisted DNS client.

art at torproject.org art at torproject.org
Mon Oct 8 02:25:40 UTC 2012


commit c904e684ac60f9c25c09d52251cb27966aaab065
Author: Arturo Filastò <arturo at filasto.net>
Date:   Mon Oct 8 02:25:13 2012 +0000

    Rewrite DNS Tamper test to properly use Twisted DNS client.
    * Should be ready for usage
---
 nettests/core/dnstamper.py |  182 ++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 182 insertions(+), 0 deletions(-)

diff --git a/nettests/core/dnstamper.py b/nettests/core/dnstamper.py
new file mode 100644
index 0000000..92948cd
--- /dev/null
+++ b/nettests/core/dnstamper.py
@@ -0,0 +1,182 @@
+# -*- encoding: utf-8 -*-
+#
+#
+#  dnstamper
+#  *********
+#
+#  The test reports censorship if the cardinality of the intersection of
+#  the query result set from the control server and the query result set
+#  from the experimental server is zero, which is to say, if the two sets
+#  have no matching results whatsoever.
+#
+#  NOTE: This test frequently results in false positives due to GeoIP-based
+#  load balancing on major global sites such as google, facebook, and
+#  youtube, etc.
+#
+# :authors: Arturo Filastò, Isis Lovecruft
+# :licence: see LICENSE
+
+from ooni import nettest
+from twisted.internet import defer
+from twisted.names import client
+
+class DNSTamperTest(nettest.TestCase):
+
+    name = "DNS tamper"
+
+    description = "DNS censorship detection test"
+
+    requirements = None
+    inputFile = ['file', 'f', None,
+                 'Input file of list of hostnames to attempt to resolve']
+
+    optParameters = [['controlresolver', 'c', '8.8.8.8',
+                      'Known good DNS server'],
+                     ['testresolvers', 't', None,
+                      'file containing list of DNS resolvers to test against']
+                     ]
+
+    def setUp(self):
+        self.report['test_lookups'] = {}
+        self.report['test_reverse'] = {}
+
+        self.report['control_lookup'] = []
+
+        self.report['a_lookups'] = {}
+
+        self.test_a_lookups = {}
+        self.control_a_lookups = []
+
+        self.test_reverse = {}
+
+        if not self.localOptions['testresolvers']:
+            self.test_resolvers = ['8.8.8.8']
+            return
+        try:
+            fp = open(self.localOptions['testresolvers'])
+        except:
+            raise usage.UsageError("Invalid test resolvers file")
+
+        self.test_resolvers = [x.strip() for x in fp.readlines]
+        fp.close()
+
+    def process_a_answers(self, answers, resolver):
+        all_a = []
+        a_a = []
+        for answer in answers[0]:
+            if answer.type is 1:
+                # A type query
+                r = answer.payload.dottedQuad()
+                self.report['a_lookups'][resolver] = r
+                a_a.append(r)
+            lookup = str(answer.payload)
+            all_a.append(lookup)
+
+        if resolver == 'control':
+            self.report['control_server'] = self.localOptions['controlresolver']
+            self.report['control_lookup'] = all_a
+            self.control_a_lookups = a_a
+        else:
+            self.test_a_lookups[resolver] = a_a
+            self.report['test_lookups'][resolver] = all_a
+
+    def process_ptr_answers(self, answers, resolver):
+        print "Processing PTR ANSWER for %s" % resolver
+        name = None
+        for answer in answers[0]:
+            if answer.type is 12:
+                # PTR type
+                name = str(answer.payload.name)
+
+        if resolver == 'control':
+            self.control_reverse = name
+            self.report['control_reverse'] = name
+        else:
+            self.test_reverse[resolver] = name
+            self.report['test_reverse'][resolver] = name
+
+
+    def ptr_lookup_error(self, failure, resolver):
+        if resolver == 'control':
+            self.report['control_reverse'] = None
+        else:
+            self.report['test_reverse'][resolver] = None
+
+    def a_lookup_error(self, failure, resolver):
+        if resolver == 'control':
+            self.report['control_lookup'] = None
+        else:
+            self.report['test_lookups'][resolver] = None
+
+    def test_lookup(self):
+        list_of_ds = []
+        hostname = self.input
+
+        resolver = [(self.localOptions['controlresolver'], 53)]
+        res = client.createResolver(servers=resolver)
+
+        control_r = res.lookupAllRecords(hostname)
+        control_r.addCallback(self.process_a_answers, 'control')
+        control_r.addErrback(self.a_lookup_error, 'control')
+
+        list_of_ds.append(control_r)
+
+        for test_resolver in self.test_resolvers:
+            resolver = [(test_resolver,53)]
+            res = client.createResolver(servers=resolver)
+
+            d = res.lookupAddress(hostname)
+            d.addCallback(self.process_a_answers, test_resolver)
+            d.addErrback(self.a_lookup_error, test_resolver)
+            list_of_ds.append(d)
+
+        dl = defer.DeferredList(list_of_ds)
+        dl.addCallback(self.do_reverse_lookups)
+        dl.addCallback(self.compare_results)
+        return dl
+
+    def reverse_lookup(self, address, resolver):
+        ptr = '.'.join(address.split('.')[::-1]) + '.in-addr.arpa'
+        r = resolver.lookupPointer(ptr)
+        return r
+
+    def do_reverse_lookups(self, result):
+
+
+        list_of_ds = []
+
+        resolver = [(self.localOptions['controlresolver'], 53)]
+        res = client.createResolver(servers=resolver)
+
+        test_reverse = self.reverse_lookup(self.control_a_lookups[0], res)
+        test_reverse.addCallback(self.process_ptr_answers, 'control')
+        test_reverse.addErrback(self.ptr_lookup_error, 'control')
+
+        list_of_ds.append(test_reverse)
+
+        for test_resolver in self.test_resolvers:
+            d = self.reverse_lookup(self.test_a_lookups[test_resolver][0], res)
+            d.addCallback(self.process_ptr_answers, test_resolver)
+            d.addErrback(self.ptr_lookup_error, test_resolver)
+            list_of_ds.append(d)
+
+        dl = defer.DeferredList(list_of_ds)
+        return dl
+
+    def compare_results(self, *arg, **kw):
+        for test, test_a_lookups in self.test_a_lookups.items():
+            self.report['tampering'] = {}
+            self.report['tampering'][test] = 'unknown'
+
+            if len(set(test_a_lookups) & set(self.control_a_lookups)) > 0:
+                # Address has not tampered with on DNS server
+                self.report['tampering'][test] = False
+
+            elif len(set(self.control_reverse) & set(self.test_reverse[test])) > 0:
+                # Further testing has eliminated false positives
+                self.report['tampering'][test] = 'reverse-match'
+            else:
+                # Reverse DNS on the results returned by returned
+                # which does not match the expected domainname
+                self.report['tampering'][test] = True
+



More information about the tor-commits mailing list