[tor-commits] [oonib/master] Feature/web connectivity (#63)
art at torproject.org
art at torproject.org
Mon May 30 13:44:56 UTC 2016
commit 5178fc299d361c9ab57002e619e40b05061f5a7d
Author: Arturo Filastò <arturo at filasto.net>
Date: Sat May 28 19:05:42 2016 +0200
Feature/web connectivity (#63)
* Add web connectivity test helper
* Forward test helper addresses to the host machine
* Align the request headers used by the web_connectivity test helper to those of the probe
* Add monkey patch for bug in twisted RedirectAgent:
https://twistedmatrix.com/trac/ticket/8265
* Add monkey patch for HTTPClientParser.statusReceived
* Use TrueHeaders in control request
* Add support for specifying endpoints in web_connectivity test helper
* Add endpoint for checking the status of the WebConnectivity test helper
* Add support for parsing test-helpers-alternate
* Fix key for web_connectivity test in example configuration file
* Implement on-disk web_connectivity cache
* Add support for Gzip content decoding
* Also record CNAME resolution.
* Rename ips to addrs
* Add support for retries in the http_request
* Add support for extracting title
* Encode the responses as well when debug mode
* Handle partial downloads
* Ignore errors when encoding headers
* Cast title to unicode and ignore errors
* Improvements based on feedback and comments by @bassosimone
* Move twisted related patches into txextra module
* Add support for returning the responses based on a key sent by the
client.
* Inherit from OONIBHandler so we can get the error message format
* Stylistic improvements
* Set defaults in a way that oonib.conf can start from the example
* Avoid doing join on nonetype
* Address comments by @bassosimone
* Properly set the body also when we get a partial body downloaded
* Move more code into shared oonib.common module
* Fully sync the common module with ooni-probe (pulling in also other
shared functionality so the Agents match entirely)
* Fix location of comment for the patched HTTPClient
* Add unittests and integration tests for web_connectivity
---
Vagrantfile | 25 +++
oonib.conf.example | 18 +-
oonib/bouncer/handlers.py | 14 +-
oonib/common/__init__.py | 10 ++
oonib/common/http_utils.py | 54 ++++++
oonib/common/tcp_utils.py | 10 ++
oonib/common/txextra.py | 202 +++++++++++++++++++++
oonib/oonibackend.py | 29 ++-
oonib/test/test_web_connectivity.py | 71 ++++++++
oonib/testhelpers/http_helpers.py | 344 +++++++++++++++++++++++++++++++++++-
10 files changed, 758 insertions(+), 19 deletions(-)
diff --git a/Vagrantfile b/Vagrantfile
index b71e4e7..7aa5f71 100644
--- a/Vagrantfile
+++ b/Vagrantfile
@@ -4,6 +4,31 @@
Vagrant.configure("2") do |config|
config.vm.box = "precise32"
config.vm.box_url = "http://files.vagrantup.com/precise32.box"
+
+ # Create a forwarded port mapping which allows access to a specific port
+ # within the machine from a port on the host machine. In the example below,
+ # accessing "localhost:8080" will access port 80 on the guest machine.
+ config.vm.network :forwarded_port, guest: 57001, host: 57001
+ config.vm.network :forwarded_port, guest: 57001, host: 57002
+ config.vm.network :forwarded_port, guest: 57001, host: 57003
+ config.vm.network :forwarded_port, guest: 57004, host: 57004
+ config.vm.network :forwarded_port, guest: 57005, host: 57005
+ config.vm.network :forwarded_port, guest: 57006, host: 57006
+ config.vm.network :forwarded_port, guest: 57007, host: 57007
+
+ # Create a private network, which allows host-only access to the machine
+ # using a specific IP.
+ # config.vm.network :private_network, ip: "192.168.33.10"
+
+ # Create a public network, which generally matched to bridged network.
+ # Bridged networks make the machine appear as another physical device on
+ # your network.
+ # config.vm.network :public_network
+
+ # Share an additional folder to the guest VM. The first argument is
+ # the path on the host to the actual folder. The second argument is
+ # the path on the guest to mount the folder. And the optional third
+ # argument is a set of non-required options.
config.vm.synced_folder ".", "/data/oonib"
end
diff --git a/oonib.conf.example b/oonib.conf.example
index 1925d81..6f4c7f3 100644
--- a/oonib.conf.example
+++ b/oonib.conf.example
@@ -30,12 +30,13 @@ main:
tor_datadir: null
bouncer_endpoints:
- - {type: tls, port: 10443, cert: "private/ssl-key-and-cert.pem"}
+ #- {type: tls, port: 10443, cert: "private/ssl-key-and-cert.pem"}
- {type: tcp, port: 10080}
- - {type: onion, hsdir: bouncer}
+ - {type: onion, hsdir: /tmp/bouncer}
collector_endpoints:
- - {type: tls, port: 11443, cert: "private/ssl-key-and-cert.pem"}
+ #- {type: tls, port: 11443, cert: "private/ssl-key-and-cert.pem"}
+ - {type: onion, hsdir: /tmp/collector}
report_file_template: '{iso8601_timestamp}-{test_name}-{report_id}-{probe_asn}-{probe_cc}-probe-0.2.0.{ext}'
helpers:
@@ -62,12 +63,17 @@ helpers:
dns_discovery:
address: null
- udp_port: 53
- tcp_port: 53
+ udp_port: null
+ tcp_port: null
resolver_address: null
ssl:
address: null
private_key: 'private.key'
certificate: 'certificate.crt'
- port: 57006
+ #port: 57006
+ port: null
+
+ web_connectivity:
+ endpoints:
+ - {type: tcp, port: 57007}
diff --git a/oonib/bouncer/handlers.py b/oonib/bouncer/handlers.py
index 1c7627c..a9370c0 100644
--- a/oonib/bouncer/handlers.py
+++ b/oonib/bouncer/handlers.py
@@ -159,9 +159,12 @@ class Bouncer(object):
requested_nettest['input-hashes'],
requested_nettest['test-helpers'])
test_helpers = {}
+ test_helpers_alternate = {}
for test_helper in requested_nettest['test-helpers']:
+ collector_info = self.bouncerFile['collector'][collector]
try:
- test_helpers[test_helper] = self.bouncerFile['collector'][collector]['test-helper'][test_helper]
+ test_helpers[test_helper] = \
+ collector_info['test-helper'][test_helper]
except KeyError:
helpers = self.knownHelpers.get(test_helper)
if not helpers:
@@ -169,12 +172,19 @@ class Bouncer(object):
helper = random.choice(helpers)
test_helpers[test_helper] = helper['helper-address']
+ try:
+ test_helpers_alternate[test_helper] = \
+ collector_info['test-helper-alternate'][test_helper]
+ except KeyError:
+ pass
+
nettest = {
'name': requested_nettest['name'],
'version': requested_nettest['version'],
'input-hashes': requested_nettest['input-hashes'],
'test-helpers': test_helpers,
- 'collector': collector,
+ 'test-helpers-alternate': test_helpers_alternate,
+ 'collector': collector
}
nettests.append(nettest)
return {'net-tests': nettests}
diff --git a/oonib/common/__init__.py b/oonib/common/__init__.py
new file mode 100644
index 0000000..7f6cf73
--- /dev/null
+++ b/oonib/common/__init__.py
@@ -0,0 +1,10 @@
+"""
+This modules contains functionality that is shared amongst ooni-probe and
+ooni-backend. If the code in here starts growing too much I think it would
+make sense to either:
+
+ * Make the code in here into it's own package that is imported by
+ ooni-probe and ooni-backend.
+
+ * Merge ooniprobe with oonibackend.
+"""
diff --git a/oonib/common/http_utils.py b/oonib/common/http_utils.py
new file mode 100644
index 0000000..6f9db2d
--- /dev/null
+++ b/oonib/common/http_utils.py
@@ -0,0 +1,54 @@
+import re
+import codecs
+from base64 import b64encode
+
+META_CHARSET_REGEXP = re.compile('<meta(?!\s*(?:name|value)\s*=)[^>]*?charset\s*=[\s"\']*([^\s"\'/>]*)')
+
+def representBody(body):
+ if not body:
+ return body
+ body = body.replace('\0', '')
+ decoded = False
+ charsets = ['ascii', 'utf-8']
+
+ # If we are able to detect the charset of body from the meta tag
+ # try to decode using that one first
+ charset = META_CHARSET_REGEXP.search(body, re.IGNORECASE)
+ if charset:
+ try:
+ encoding = charset.group(1).lower()
+ codecs.lookup(encoding)
+ charsets.insert(0, encoding)
+ except (LookupError, IndexError):
+ # Skip invalid codecs and partial regexp match
+ pass
+
+ for encoding in charsets:
+ try:
+ body = unicode(body, encoding)
+ decoded = True
+ break
+ except UnicodeDecodeError:
+ pass
+ if not decoded:
+ body = {
+ 'data': b64encode(body),
+ 'format': 'base64'
+ }
+ return body
+
+TITLE_REGEXP = re.compile("<title>(.*?)</title>", re.IGNORECASE | re.DOTALL)
+
+def extractTitle(body):
+ m = TITLE_REGEXP.search(body, re.IGNORECASE | re.DOTALL)
+ if m:
+ return unicode(m.group(1), errors='ignore')
+ return ''
+
+REQUEST_HEADERS = {
+ 'User-Agent': ['Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, '
+ 'like Gecko) Chrome/47.0.2526.106 Safari/537.36'],
+ 'Accept-Language': ['en-US;q=0.8,en;q=0.5'],
+ 'Accept': ['text/html,application/xhtml+xml,application/xml;q=0.9,'
+ '*/*;q=0.8']
+}
diff --git a/oonib/common/tcp_utils.py b/oonib/common/tcp_utils.py
new file mode 100644
index 0000000..7b7a8a4
--- /dev/null
+++ b/oonib/common/tcp_utils.py
@@ -0,0 +1,10 @@
+from twisted.internet.protocol import Factory, Protocol
+
+class TCPConnectProtocol(Protocol):
+ def connectionMade(self):
+ self.transport.loseConnection()
+
+class TCPConnectFactory(Factory):
+ noisy = False
+ def buildProtocol(self, addr):
+ return TCPConnectProtocol()
diff --git a/oonib/common/txextra.py b/oonib/common/txextra.py
new file mode 100644
index 0000000..7a84592
--- /dev/null
+++ b/oonib/common/txextra.py
@@ -0,0 +1,202 @@
+import itertools
+from copy import copy
+
+from twisted.web.http_headers import Headers
+from twisted.web import error
+
+from twisted.web.client import BrowserLikeRedirectAgent
+from twisted.web._newclient import ResponseFailed
+from twisted.web._newclient import HTTPClientParser, ParseError
+from twisted.python.failure import Failure
+
+from twisted.web import client, _newclient
+
+from twisted.web._newclient import RequestNotSent, RequestGenerationFailed
+from twisted.web._newclient import TransportProxyProducer, STATUS
+
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, fail, maybeDeferred, failure
+
+from twisted.python import log
+
+class TrueHeaders(Headers):
+ def __init__(self, rawHeaders=None):
+ self._rawHeaders = dict()
+ if rawHeaders is not None:
+ for name, values in rawHeaders.iteritems():
+ if type(values) is list:
+ self.setRawHeaders(name, values[:])
+ elif type(values) is str:
+ self.setRawHeaders(name, values)
+
+ def setRawHeaders(self, name, values):
+ if name.lower() not in self._rawHeaders:
+ self._rawHeaders[name.lower()] = dict()
+ self._rawHeaders[name.lower()]['name'] = name
+ self._rawHeaders[name.lower()]['values'] = values
+
+ def getAllRawHeaders(self):
+ for _, v in self._rawHeaders.iteritems():
+ yield v['name'], v['values']
+
+ def getRawHeaders(self, name, default=None):
+ if name.lower() in self._rawHeaders:
+ return self._rawHeaders[name.lower()]['values']
+ return default
+
+
+ def getDiff(self, headers, ignore=[]):
+ """
+
+ Args:
+
+ headers: a TrueHeaders object
+
+ ignore: specify a list of header fields to ignore
+
+ Returns:
+
+ a set containing the header names that are not present in
+ header_dict or not present in self.
+ """
+ diff = set()
+ field_names = []
+
+ headers_a = copy(self)
+ headers_b = copy(headers)
+ for name in ignore:
+ try:
+ del headers_a._rawHeaders[name.lower()]
+ except KeyError:
+ pass
+ try:
+ del headers_b._rawHeaders[name.lower()]
+ except KeyError:
+ pass
+
+ for k, v in itertools.chain(headers_a.getAllRawHeaders(),
+ headers_b.getAllRawHeaders()):
+ field_names.append(k)
+
+ for name in field_names:
+ if self.getRawHeaders(name) and headers.getRawHeaders(name):
+ pass
+ else:
+ diff.add(name)
+ return list(diff)
+
+class HTTPClientParser(_newclient.HTTPClientParser):
+ def logPrefix(self):
+ return 'HTTPClientParser'
+
+ def connectionMade(self):
+ self.headers = TrueHeaders()
+ self.connHeaders = TrueHeaders()
+ self.state = STATUS
+ self._partialHeader = None
+
+ def headerReceived(self, name, value):
+ if self.isConnectionControlHeader(name.lower()):
+ headers = self.connHeaders
+ else:
+ headers = self.headers
+ headers.addRawHeader(name, value)
+
+ def statusReceived(self, status):
+ # This is a fix for invalid number of parts
+ try:
+ return _newclient.HTTPClientParser.statusReceived(self, status)
+ except ParseError as exc:
+ if exc.args[0] == 'wrong number of parts':
+ return _newclient.HTTPClientParser.statusReceived(self,
+ status + " XXX")
+ raise
+
+class HTTP11ClientProtocol(_newclient.HTTP11ClientProtocol):
+ def request(self, request):
+ if self._state != 'QUIESCENT':
+ return fail(RequestNotSent())
+
+ self._state = 'TRANSMITTING'
+ _requestDeferred = maybeDeferred(request.writeTo, self.transport)
+ self._finishedRequest = Deferred()
+
+ self._currentRequest = request
+
+ self._transportProxy = TransportProxyProducer(self.transport)
+ self._parser = HTTPClientParser(request, self._finishResponse)
+ self._parser.makeConnection(self._transportProxy)
+ self._responseDeferred = self._parser._responseDeferred
+
+ def cbRequestWrotten(ignored):
+ if self._state == 'TRANSMITTING':
+ self._state = 'WAITING'
+ self._responseDeferred.chainDeferred(self._finishedRequest)
+
+ def ebRequestWriting(err):
+ if self._state == 'TRANSMITTING':
+ self._state = 'GENERATION_FAILED'
+ self.transport.loseConnection()
+ self._finishedRequest.errback(
+ failure.Failure(RequestGenerationFailed([err])))
+ else:
+ log.err(err, 'Error writing request, but not in valid state '
+ 'to finalize request: %s' % self._state)
+
+ _requestDeferred.addCallbacks(cbRequestWrotten, ebRequestWriting)
+
+ return self._finishedRequest
+
+
+class _HTTP11ClientFactory(client._HTTP11ClientFactory):
+ noisy = False
+
+ def buildProtocol(self, addr):
+ return HTTP11ClientProtocol(self._quiescentCallback)
+
+
+class HTTPConnectionPool(client.HTTPConnectionPool):
+ _factory = _HTTP11ClientFactory
+
+class TrueHeadersAgent(client.Agent):
+ def __init__(self, *args, **kw):
+ super(TrueHeadersAgent, self).__init__(*args, **kw)
+ self._pool = HTTPConnectionPool(reactor, False)
+
+class FixedRedirectAgent(BrowserLikeRedirectAgent):
+ """
+ This is a redirect agent with this patch manually applied:
+ https://twistedmatrix.com/trac/ticket/8265
+ """
+ def _handleRedirect(self, response, method, uri, headers, redirectCount):
+ """
+ Handle a redirect response, checking the number of redirects already
+ followed, and extracting the location header fields.
+
+ This is patched to fix a bug in infinite redirect loop.
+ """
+ if redirectCount >= self._redirectLimit:
+ err = error.InfiniteRedirection(
+ response.code,
+ b'Infinite redirection detected',
+ location=uri)
+ raise ResponseFailed([Failure(err)], response)
+ locationHeaders = response.headers.getRawHeaders(b'location', [])
+ if not locationHeaders:
+ err = error.RedirectWithNoLocation(
+ response.code, b'No location header field', uri)
+ raise ResponseFailed([Failure(err)], response)
+ location = self._resolveLocation(
+ # This is the fix to properly handle redirects
+ response.request.absoluteURI,
+ locationHeaders[0]
+ )
+ deferred = self._agent.request(method, location, headers)
+
+ def _chainResponse(newResponse):
+ newResponse.setPreviousResponse(response)
+ return newResponse
+
+ deferred.addCallback(_chainResponse)
+ return deferred.addCallback(
+ self._handleResponse, method, uri, headers, redirectCount + 1)
diff --git a/oonib/oonibackend.py b/oonib/oonibackend.py
index 601a972..ebf3c52 100644
--- a/oonib/oonibackend.py
+++ b/oonib/oonibackend.py
@@ -118,15 +118,22 @@ def getHSEndpoint(endpoint_config):
data_dir=hsdir)
def getTCPEndpoint(endpoint_config):
- return endpoints.TCP4ServerEndpoint(reactor, endpoint_config['port'])
+ return endpoints.TCP4ServerEndpoint(
+ reactor=reactor,
+ port=endpoint_config['port'],
+ interface=endpoint_config.get('address', '')
+ )
def getTLSEndpoint(endpoint_config):
with open(endpoint_config['cert'], 'r') as f:
cert_data = f.read()
certificate = ssl.PrivateCertificate.loadPEM(cert_data)
- return endpoints.SSL4ServerEndpoint(reactor,
- endpoint_config['port'],
- certificate.options())
+ return endpoints.SSL4ServerEndpoint(
+ reactor=reactor,
+ port=endpoint_config['port'],
+ sslContextFactory=certificate.options(),
+ interface=endpoint_config.get('address', '')
+ )
def getEndpoint(endpoint_config):
if endpoint_config['type'] == 'onion':
@@ -143,6 +150,8 @@ def createService(endpoint, role, endpoint_config):
factory = ooniBouncer
elif role == 'collector':
factory = ooniBackend
+ elif role == 'web_connectivity':
+ factory = http_helpers.WebConnectivityHelper
else:
raise Exception("unknown service type")
@@ -157,8 +166,11 @@ def createService(endpoint, role, endpoint_config):
if config.main.tor_hidden_service and \
config.main.bouncer_endpoints is None and \
config.main.collector_endpoints is None:
- bouncer_hsdir = os.path.join(config.main.tor_datadir, 'bouncer')
- collector_hsdir = os.path.join(config.main.tor_datadir, 'collector')
+ base_dir = '.'
+ if config.main.tor_datadir is not None:
+ base_dir = config.main.tor_datadir
+ bouncer_hsdir = os.path.join(base_dir, 'bouncer')
+ collector_hsdir = os.path.join(base_dir, 'collector')
config.main.bouncer_endpoints = [ {'type': 'onion', 'hsdir': bouncer_hsdir} ]
config.main.collector_endpoints = [ {'type': 'onion', 'hsdir': collector_hsdir} ]
@@ -174,3 +186,8 @@ for endpoint_config in config.main.get('collector_endpoints', []):
print "Starting collector with config %s" % endpoint_config
endpoint = getEndpoint(endpoint_config)
createService(endpoint, 'collector', endpoint_config)
+
+for endpoint_config in config.helpers.web_connectivity.get('endpoints', []):
+ print "Starting web_connectivity helper with config %s" % endpoint_config
+ endpoint = getEndpoint(endpoint_config)
+ createService(endpoint, 'web_connectivity', endpoint_config)
diff --git a/oonib/test/test_web_connectivity.py b/oonib/test/test_web_connectivity.py
new file mode 100644
index 0000000..24e2be0
--- /dev/null
+++ b/oonib/test/test_web_connectivity.py
@@ -0,0 +1,71 @@
+from hashlib import sha256
+from twisted.internet import defer
+from twisted.trial import unittest
+
+from oonib.testhelpers.http_helpers import WebConnectivityCache
+
+class WebConnectivityCacheTestCase(unittest.TestCase):
+ def setUp(self):
+ self.web_connectivity_cache = WebConnectivityCache()
+
+ def tearDown(self):
+ return self.web_connectivity_cache.expire_all()
+
+ @defer.inlineCallbacks
+ def test_http_request(self):
+ value = yield self.web_connectivity_cache.http_request(
+ 'https://www.google.com/humans.txt')
+ self.assertEqual(
+ value['body_length'], 286
+ )
+ self.assertEqual(
+ value['status_code'], 200
+ )
+ self.assertIsInstance(value['headers'],
+ dict)
+
+ @defer.inlineCallbacks
+ def test_dns_consistency(self):
+ # The twisted.names resolve set a reactor.callLater() on parsing the
+ # resolv.conf and this leads to the reactor being dirty. Look into
+ # a clean way to solve this and reactive this integration test.
+ self.skipTest("Skipping to avoid dirty reactor")
+ value = yield self.web_connectivity_cache.dns_consistency(
+ 'www.torproject.org')
+ self.assertIsInstance(
+ value['addrs'],
+ list
+ )
+ self.assertIn(
+ 'failure',
+ value.keys()
+ )
+
+ @defer.inlineCallbacks
+ def test_tcp_connect(self):
+ value = yield self.web_connectivity_cache.tcp_connect(
+ '216.58.213.14:80')
+ self.assertIsInstance(
+ value['status'],
+ bool
+ )
+ self.assertIn(
+ 'failure',
+ value.keys()
+ )
+
+ @defer.inlineCallbacks
+ def test_cache_lifecycle(self):
+ key = 'http://example.com'
+ key_hash = sha256(key).hexdigest()
+ value = {'spam': 'ham'}
+
+ miss = yield self.web_connectivity_cache.lookup('http_request', key)
+ self.assertEqual(miss, None)
+
+ yield self.web_connectivity_cache.cache_value('http_request', key,
+ value)
+ hit = yield self.web_connectivity_cache.lookup('http_request', key)
+ self.assertEqual(hit, value)
+
+ yield self.web_connectivity_cache.expire('http_request', key_hash)
diff --git a/oonib/testhelpers/http_helpers.py b/oonib/testhelpers/http_helpers.py
index a28cbad..1cddf24 100644
--- a/oonib/testhelpers/http_helpers.py
+++ b/oonib/testhelpers/http_helpers.py
@@ -1,16 +1,37 @@
import json
+import os
import random
+import re
import string
-
-from twisted.internet import protocol, defer
-
-from cyclone.web import RequestHandler, Application
-
+import tempfile
+from hashlib import sha256
+from urlparse import urlparse
+
+from cyclone.web import RequestHandler, Application, HTTPError
+from cyclone.web import asynchronous
+from twisted.internet import protocol, defer, reactor
+from twisted.internet.endpoints import TCP4ClientEndpoint
+from twisted.internet.error import ConnectionRefusedError
+from twisted.internet.error import DNSLookupError, TimeoutError
+from twisted.names import client as dns_client
+from twisted.names import dns
+from twisted.names.error import DNSNameError, DNSServerError
from twisted.protocols import policies, basic
+from twisted.web.client import readBody
+from twisted.web.client import ContentDecoderAgent, GzipDecoder
+from twisted.web.client import PartialDownloadError
from twisted.web.http import Request
from oonib import log, randomStr
+from oonib.common.txextra import FixedRedirectAgent, TrueHeaders
+from oonib.common.txextra import TrueHeadersAgent
+from oonib.common.http_utils import representBody, extractTitle
+from oonib.common.http_utils import REQUEST_HEADERS
+from oonib.common.tcp_utils import TCPConnectFactory
+
+from oonib.handlers import OONIBHandler
+
class SimpleHTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
"""
@@ -168,7 +189,320 @@ class HTTPRandomPage(HTTPTrapAll):
length = 100000
self.write(self.genRandomPage(length, keyword))
+
+def encodeResponse(response):
+ body = None
+ body_length = 0
+ if (hasattr(response, 'body') and
+ response.body is not None):
+ body = response.body
+ body_length = len(response.body)
+ headers = {}
+ for k, v in response.headers.getAllRawHeaders():
+ headers[k.lower()] = unicode(v[0], errors='ignore')
+ return {
+ 'headers': headers,
+ 'code': response.code,
+ 'body_length': body_length,
+ 'body': representBody(body)
+ }
+
+def encodeResponses(response):
+ responses = []
+ responses += [encodeResponse(response)]
+ if response.previousResponse:
+ responses += encodeResponses(response.previousResponse)
+ return responses
+
+
+class WebConnectivityCache(object):
+ expiration_time = 200
+ enable_caching = True
+ http_retries = 2
+
+ def __init__(self):
+ self._response_types = (
+ 'http_request',
+ 'tcp_connect',
+ 'dns_consistency'
+ )
+ self._cache_lifecycle = {}
+ self._cache_dir = tempfile.mkdtemp()
+ for response_type in self._response_types:
+ os.mkdir(os.path.join(self._cache_dir, response_type))
+ self._cache_lifecycle[response_type] = {}
+
+ @defer.inlineCallbacks
+ def expire_all(self):
+ for response_type in self._cache_lifecycle.keys():
+ for key_hash in self._cache_lifecycle[response_type].keys():
+ yield self.expire(response_type, key_hash)
+
+ @defer.inlineCallbacks
+ def cache_value(self, response_type, key, value):
+ if response_type not in self._response_types:
+ raise Exception("Invalid response type")
+ if self.enable_caching:
+ key_hash = sha256(key).hexdigest()
+ cache_file = os.path.join(self._cache_dir, response_type, key_hash)
+
+ if key_hash in self._cache_lifecycle[response_type]:
+ yield self.expire(response_type, key_hash)
+
+ self._cache_lifecycle[response_type][key_hash] = {
+ 'expiration': reactor.callLater(self.expiration_time,
+ self.expire,
+ response_type, key_hash),
+ 'lock': defer.DeferredLock()
+ }
+ lock = self._cache_lifecycle[response_type][key_hash]['lock']
+ yield lock.acquire()
+ with open(cache_file, 'w+') as fw:
+ json.dump(value, fw)
+ lock.release()
+
+ @defer.inlineCallbacks
+ def expire(self, response_type, key_hash):
+ if response_type not in self._response_types:
+ raise Exception("Invalid response type")
+ lifecycle = self._cache_lifecycle[response_type][key_hash]
+ if lifecycle['expiration'].active():
+ lifecycle['expiration'].cancel()
+
+ yield lifecycle['lock'].acquire()
+ try:
+ os.remove(os.path.join(self._cache_dir, response_type, key_hash))
+ except OSError:
+ pass
+ lifecycle['lock'].release()
+ del self._cache_lifecycle[response_type][key_hash]
+
+ @defer.inlineCallbacks
+ def lookup(self, response_type, key):
+ if not self.enable_caching:
+ defer.returnValue(None)
+
+ key_hash = sha256(key).hexdigest()
+ cache_file = os.path.join(self._cache_dir, response_type, key_hash)
+
+ if key_hash not in self._cache_lifecycle[response_type]:
+ defer.returnValue(None)
+
+ lock = self._cache_lifecycle[response_type][key_hash]['lock']
+ expiration = \
+ self._cache_lifecycle[response_type][key_hash]['expiration']
+
+ yield lock.acquire()
+
+ if not os.path.exists(cache_file):
+ lock.release()
+ defer.returnValue(None)
+
+ with open(cache_file, 'r') as fh:
+ value = json.load(fh)
+
+ expiration.reset(self.expiration_time)
+ lock.release()
+ defer.returnValue(value)
+
+ @defer.inlineCallbacks
+ def http_request(self, url, include_http_responses=False):
+ cached_value = yield self.lookup('http_request', url)
+ if cached_value is not None:
+ if include_http_responses is not True:
+ cached_value.pop('responses', None)
+ defer.returnValue(cached_value)
+
+ page_info = {
+ 'body_length': -1,
+ 'status_code': -1,
+ 'headers': {},
+ 'failure': None
+ }
+
+ agent = ContentDecoderAgent(
+ FixedRedirectAgent(TrueHeadersAgent(reactor)),
+ [('gzip', GzipDecoder)]
+ )
+ try:
+ retries = 0
+ while True:
+ try:
+ response = yield agent.request('GET', url,
+ TrueHeaders(REQUEST_HEADERS))
+ headers = {}
+ for name, value in response.headers.getAllRawHeaders():
+ headers[name] = unicode(value[0], errors='ignore')
+ body_length = -1
+ body = None
+ try:
+ body = yield readBody(response)
+ body_length = len(body)
+ except PartialDownloadError as pde:
+ if pde.response:
+ body_length = len(pde.response)
+ body = pde.response
+ page_info['body_length'] = body_length
+ page_info['status_code'] = response.code
+ page_info['headers'] = headers
+ page_info['title'] = extractTitle(body)
+ response.body = body
+ page_info['responses'] = encodeResponses(response)
+ break
+ except:
+ if retries > self.http_retries:
+ raise
+ retries += 1
+ except DNSLookupError:
+ page_info['failure'] = 'dns_lookup_error'
+ except TimeoutError:
+ page_info['failure'] = 'generic_timeout_error'
+ except ConnectionRefusedError:
+ page_info['failure'] = 'connection_refused_error'
+ except:
+ # XXX map more failures
+ page_info['failure'] = 'unknown_error'
+
+ yield self.cache_value('http_request', url, page_info)
+ if include_http_responses is not True:
+ page_info.pop('responses', None)
+ defer.returnValue(page_info)
+
+ @defer.inlineCallbacks
+ def tcp_connect(self, socket):
+ cached_value = yield self.lookup('tcp_connect', socket)
+ if cached_value is not None:
+ defer.returnValue(cached_value)
+
+ socket_info = {
+ 'status': None,
+ 'failure': None
+ }
+
+ ip_address, port = socket.split(":")
+ try:
+ point = TCP4ClientEndpoint(reactor, ip_address, int(port))
+ yield point.connect(TCPConnectFactory())
+ socket_info['status'] = True
+ except TimeoutError:
+ socket_info['status'] = False
+ socket_info['failure'] = 'generic_timeout_error'
+ except ConnectionRefusedError:
+ socket_info['status'] = False
+ socket_info['failure'] = 'connection_refused_error'
+ except:
+ socket_info['status'] = False
+ socket_info['failure'] = 'unknown_error'
+ yield self.cache_value('tcp_connect', socket, socket_info)
+ defer.returnValue(socket_info)
+
+ @defer.inlineCallbacks
+ def dns_consistency(self, hostname):
+ cached_value = yield self.lookup('dns_consistency', hostname)
+ if cached_value is not None:
+ defer.returnValue(cached_value)
+
+ dns_info = {
+ 'addrs': [],
+ 'failure': None
+ }
+
+ try:
+ records = yield dns_client.lookupAddress(hostname)
+ answers = records[0]
+ for answer in answers:
+ if answer.type is dns.A:
+ dns_info['addrs'].append(answer.payload.dottedQuad())
+ elif answer.type is dns.CNAME:
+ dns_info['addrs'].append(answer.payload.name.name)
+ except DNSNameError:
+ dns_info['failure'] = 'dns_name_error'
+ except DNSServerError:
+ dns_info['failure'] = 'dns_server_failure'
+ except:
+ dns_info['failure'] = 'unknown_error'
+
+ yield self.cache_value('dns_consistency', hostname, dns_info)
+ defer.returnValue(dns_info)
+
+
+# Taken from
+# http://stackoverflow.com/questions/7160737/python-how-to-validate-a-url-in-python-malformed-or-not
+HTTP_REQUEST_REGEXP = re.compile(
+ r'^(?:http)s?://' # http:// or https://
+ r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
+ r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
+ r'(?::\d+)?' # optional port
+ r'(?:/?|[/?]\S+)$', re.IGNORECASE)
+
+SOCKET_REGEXP = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+$')
+
+web_connectivity_cache = WebConnectivityCache()
+
+class WebConnectivity(OONIBHandler):
+ @defer.inlineCallbacks
+ def control_measurement(self, http_url, socket_list,
+ include_http_responses):
+ hostname = urlparse(http_url).netloc
+ dl = [
+ web_connectivity_cache.http_request(http_url, include_http_responses),
+ web_connectivity_cache.dns_consistency(hostname)
+ ]
+ for socket in socket_list:
+ dl.append(web_connectivity_cache.tcp_connect(socket))
+ responses = yield defer.DeferredList(dl)
+ http_request = responses[0][1]
+ dns = responses[1][1]
+ tcp_connect = {}
+ for idx, response in enumerate(responses[2:]):
+ tcp_connect[socket_list[idx]] = response[1]
+ self.finish({
+ 'http_request': http_request,
+ 'tcp_connect': tcp_connect,
+ 'dns': dns
+ })
+
+ def validate_request(self, request):
+ required_keys = ['http_request', 'tcp_connect']
+ for rk in required_keys:
+ if rk not in request.keys():
+ raise HTTPError(400, "Missing %s" % rk)
+ if not HTTP_REQUEST_REGEXP.match(request['http_request']):
+ raise HTTPError(400, "Invalid http_request URL")
+ if any([not SOCKET_REGEXP.match(socket)
+ for socket in request['tcp_connect']]):
+ raise HTTPError(400, "Invalid tcp_connect URL")
+
+ @asynchronous
+ def post(self):
+ try:
+ request = json.loads(self.request.body)
+ self.validate_request(request)
+ include_http_responses = request.get("include_http_responses",
+ False)
+ self.control_measurement(
+ str(request['http_request']),
+ request['tcp_connect'],
+ include_http_responses
+ )
+ except HTTPError:
+ raise
+ except Exception as exc:
+ log.msg("Got invalid request")
+ log.exception(exc)
+ raise HTTPError(400, 'invalid request')
+
+class WebConnectivityStatus(RequestHandler):
+ def get(self):
+ self.write({"status": "ok"})
+
+
HTTPRandomPageHelper = Application([
# XXX add regexps here
(r"/(.*)/(.*)", HTTPRandomPage)
])
+
+WebConnectivityHelper = Application([
+ (r"/status", WebConnectivityStatus),
+ (r"/", WebConnectivity)
+])
More information about the tor-commits
mailing list