[tor-commits] [ooni-probe/master] Add test state execution tracking and resume support
art at torproject.org
art at torproject.org
Sun Nov 25 23:05:01 UTC 2012
commit 659dda6b388ecc658f8060b7bef2aa7b4015dce2
Author: Arturo Filastò <art at fuffa.org>
Date: Sun Nov 25 20:05:04 2012 +0100
Add test state execution tracking and resume support
* Clean up some debug messages
* Handle DNS lookup problems
* Handle TCP Timeouts properly
* Better error handling in HTTP tests
* Make sure that responses are written even if the response is not received
---
ooni/config.py | 31 ++++++++--
ooni/oonicli.py | 54 +++++-----------
ooni/reporter.py | 5 +-
ooni/runner.py | 161 +++++++++++++++++++++++++++++++++++++++++++---
ooni/templates/httpt.py | 60 +++++++++++++----
ooni/templates/scapyt.py | 1 -
ooniprobe.conf | 2 +-
7 files changed, 243 insertions(+), 71 deletions(-)
diff --git a/ooni/config.py b/ooni/config.py
index b017e87..d86f4d7 100644
--- a/ooni/config.py
+++ b/ooni/config.py
@@ -6,15 +6,28 @@
import os
import yaml
-from twisted.internet import reactor, threads
+from twisted.internet import reactor, threads, defer
from ooni.utils import otime
from ooni.utils import Storage
reports = Storage()
+scapyFactory = None
+stateDict = None
+
+# XXX refactor this to use a database
+resume_lock = defer.DeferredLock()
+
basic = None
cmd_line_options = None
-scapyFactory = None
+resume_filename = None
+
+# XXX-Twisted this is used to check if we have started the reactor or not. It
+# is necessary because if the tests are already concluded because we have
+# resumed a test session then it will call reactor.run() even though there is
+# no condition that will ever stop it.
+# There should be a more twisted way of doing this.
+start_reactor = True
def get_root_path():
this_directory = os.path.dirname(__file__)
@@ -58,19 +71,27 @@ class TestFilenameNotSet(Exception):
def generateReportFilenames():
try:
- test_file_name = os.path.basename(cmd_line_options['test'])
+ test_filename = os.path.basename(cmd_line_options['test'])
except IndexError:
raise TestFilenameNotSet
- test_name = '.'.join(test_file_name.split(".")[:-1])
+ test_name = '.'.join(test_filename.split(".")[:-1])
base_filename = "%s_%s_"+otime.timestamp()+".%s"
- print "Setting yamloo to %s" % base_filename
reports.yamloo = base_filename % (test_name, "report", "yamloo")
+ print "Setting yamloo to %s" % reports.yamloo
reports.pcap = base_filename % (test_name, "packets", "pcap")
+ print "Setting pcap to %s" % reports.pcap
if not basic:
# Here we make sure that we instance the config file attributes only once
basic, privacy, advanced = loadConfigFile()
+if not resume_filename:
+ resume_filename = os.path.join(get_root_path(), 'ooniprobe.resume')
+ try:
+ with open(resume_filename) as f: pass
+ except IOError as e:
+ with open(resume_filename, 'w+') as f: pass
+
# This is used to keep track of the state of the sniffer
sniffer_running = None
diff --git a/ooni/oonicli.py b/ooni/oonicli.py
index 1a316b3..3a8b3df 100644
--- a/ooni/oonicli.py
+++ b/ooni/oonicli.py
@@ -36,7 +36,8 @@ class Options(usage.Options):
" network tests. These are loaded from modules, packages and"
" files listed on the command line")
- optFlags = [["help", "h"]]
+ optFlags = [["help", "h"],
+ ["resume", "r"]]
optParameters = [["reportfile", "o", None, "report file name"],
["testdeck", "i", None,
@@ -82,39 +83,9 @@ def testsEnded(*arg, **kw):
You can place here all the post shutdown tasks.
"""
log.debug("testsEnded: Finished running all tests")
- reactor.stop()
-
-def runTest(cmd_line_options):
- config.cmd_line_options = cmd_line_options
- config.generateReportFilenames()
-
- if cmd_line_options['reportfile']:
- config.reports.yamloo = cmd_line_options['reportfile']
- config.reports.pcap = config.reports.yamloo+".pcap"
-
- if os.path.exists(config.reports.pcap):
- print "Report PCAP already exists with filename %s" % config.reports.pcap
- print "Renaming it to %s" % config.reports.pcap+'.old'
- os.rename(config.reports.pcap, config.reports.pcap+'.old')
-
- classes = runner.findTestClassesFromFile(cmd_line_options['test'])
- test_cases, options = runner.loadTestsAndOptions(classes, cmd_line_options)
- if config.privacy.includepcap:
- from ooni.utils.txscapy import ScapyFactory, ScapySniffer
- try:
- checkForRoot()
- except NotRootError:
- print "[!] Includepcap options requires root priviledges to run"
- print " you should run ooniprobe as root or disable the options in ooniprobe.conf"
- sys.exit(1)
-
- print "Starting sniffer"
- config.scapyFactory = ScapyFactory(config.advanced.interface)
-
- sniffer = ScapySniffer(config.reports.pcap)
- config.scapyFactory.registerProtocol(sniffer)
-
- return runner.runTestCases(test_cases, options, cmd_line_options)
+ config.start_reactor = False
+ try: reactor.stop()
+ except: pass
def run():
"""
@@ -129,6 +100,7 @@ def run():
raise SystemExit, "%s: %s" % (sys.argv[0], ue)
deck_dl = []
+ resume = cmd_line_options['resume']
log.start(cmd_line_options['logfile'])
if cmd_line_options['testdeck']:
@@ -136,15 +108,21 @@ def run():
for test in test_deck:
del cmd_line_options
cmd_line_options = test['options']
- d1 = runTest(cmd_line_options)
+ if resume:
+ cmd_line_options['resume'] = True
+ else:
+ cmd_line_options['resume'] = False
+ d1 = runner.runTest(cmd_line_options)
deck_dl.append(d1)
else:
log.msg("No test deck detected")
del cmd_line_options['testdeck']
- d1 = runTest(cmd_line_options)
+ d1 = runner.runTest(cmd_line_options)
deck_dl.append(d1)
d2 = defer.DeferredList(deck_dl)
- d2.addCallback(testsEnded)
+ d2.addBoth(testsEnded)
- reactor.run()
+ if config.start_reactor:
+ log.debug("Starting reactor")
+ reactor.run()
diff --git a/ooni/reporter.py b/ooni/reporter.py
index 63f501e..3cc77d7 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -152,7 +152,7 @@ def getTestDetails(options):
'test_version': options['version'],
'software_name': 'ooniprobe',
'software_version': software_version
- }
+ }
defer.returnValue(test_details)
class OReporter(object):
@@ -191,7 +191,7 @@ class OReporter(object):
'test_started': test_started,
'test_runtime': test_runtime,
'report': test_report}
- return self.writeReportEntry(report)
+ return defer.maybeDeferred(self.writeReportEntry, report)
class YAMLReporter(OReporter):
"""
@@ -224,6 +224,7 @@ class YAMLReporter(OReporter):
self._write('---\n')
self._write(safe_dump(entry))
self._write('...\n')
+ return
@defer.inlineCallbacks
def createReport(self, options):
diff --git a/ooni/runner.py b/ooni/runner.py
index d7856e6..e7a40fd 100644
--- a/ooni/runner.py
+++ b/ooni/runner.py
@@ -14,6 +14,7 @@ import time
import inspect
import traceback
import itertools
+import yaml
from twisted.python import reflect, usage
from twisted.internet import defer
@@ -155,7 +156,9 @@ def runTestCasesWithInput(test_cases, test_input, oreporter):
return oreporter.testDone(test_instance, test_name)
def test_error(failure, test_instance, test_name):
+ log.err("run Test Cases With Input problem")
log.exception(failure)
+ return
def tests_done(result, test_class):
test_instance = test_class()
@@ -168,6 +171,7 @@ def runTestCasesWithInput(test_cases, test_input, oreporter):
return oreporter.testDone(test_instance, 'summary')
except NoPostProcessor:
log.debug("No post processor configured")
+ return
dl = []
for test_case in test_cases:
@@ -191,7 +195,6 @@ def runTestCasesWithInput(test_cases, test_input, oreporter):
d = defer.maybeDeferred(test)
d.addCallback(test_done, test_instance, test_method)
d.addErrback(test_error, test_instance, test_method)
- log.debug("returning %s input" % test_method)
dl.append(d)
test_methods_d = defer.DeferredList(dl)
@@ -221,6 +224,104 @@ def runTestCasesWithInputUnit(test_cases, input_unit, oreporter):
dl.append(d)
return defer.DeferredList(dl)
+class InvalidResumeFile(Exception):
+ pass
+
+class noResumeSession(Exception):
+ pass
+
+def loadResumeFile():
+ """
+ Sets the singleton stateDict object to the content of the resume file.
+ If the file is empty then it will create an empty one.
+
+ Raises:
+
+ :class:ooni.runner.InvalidResumeFile if the resume file is not valid
+
+ """
+ if not config.stateDict:
+ try:
+ config.stateDict = yaml.safe_load(open(config.resume_filename))
+ except:
+ log.err("Error loading YAML file")
+ raise InvalidResumeFile
+
+ if not config.stateDict:
+ yaml.safe_dump(dict(), open(config.resume_filename, 'w+'))
+ config.stateDict = dict()
+
+ elif isinstance(config.stateDict, dict):
+ return
+ else:
+ log.err("The resume file is of the wrong format")
+ raise InvalidResumeFile
+
+def resumeTest(test_filename, input_unit_factory):
+ """
+ Returns the an input_unit_factory that is at the index of the previous run of the test
+ for the specified test_filename.
+
+ Args:
+
+ test_filename (str): the filename of the test that is being run
+ including the .py extension.
+
+ input_unit_factory (:class:ooni.inputunit.InputUnitFactory): with the
+ same input of the past run.
+
+ Returns:
+
+ :class:ooni.inputunit.InputUnitFactory that is at the index of the
+ previous test run.
+
+ """
+ try:
+ idx = config.stateDict[test_filename]
+ for x in range(idx):
+ try:
+ input_unit_factory.next()
+ except StopIteration:
+ log.msg("Previous run was complete")
+ return input_unit_factory
+
+ return input_unit_factory
+
+ except KeyError:
+ log.debug("No resume key found for selected test name. It is therefore 0")
+ config.stateDict[test_filename] = 0
+ return input_unit_factory
+
+ at defer.inlineCallbacks
+def updateResumeFile(test_filename):
+ """
+ update the resume file with the current stateDict state.
+ """
+ log.debug("Acquiring lock for %s" % test_filename)
+ yield config.resume_lock.acquire()
+
+ current_resume_state = yaml.safe_load(open(config.resume_filename))
+ current_resume_state = config.stateDict
+ yaml.safe_dump(current_resume_state, open(config.resume_filename, 'w+'))
+
+ log.debug("Releasing lock for %s" % test_filename)
+ config.resume_lock.release()
+ defer.returnValue(config.stateDict[test_filename])
+
+ at defer.inlineCallbacks
+def increaseInputUnitIdx(test_filename):
+ """
+ Args:
+
+ test_filename (str): the filename of the test that is being run
+ including the .py extension.
+
+ input_unit_idx (int): the current input unit index for the test.
+
+ """
+ config.stateDict[test_filename] += 1
+ yield updateResumeFile(test_filename)
+
@defer.inlineCallbacks
def runTestCases(test_cases, options, cmd_line_options):
log.debug("Running %s" % test_cases)
@@ -245,10 +346,10 @@ def runTestCases(test_cases, options, cmd_line_options):
test_inputs = [None]
if cmd_line_options['collector']:
- log.debug("Using remote collector")
+ log.msg("Using remote collector, please be patient while we create the report.")
oreporter = reporter.OONIBReporter(cmd_line_options)
else:
- log.debug("Reporting to file %s" % config.reports.yamloo)
+ log.msg("Reporting to file %s" % config.reports.yamloo)
oreporter = reporter.YAMLReporter(cmd_line_options)
try:
@@ -256,8 +357,6 @@ def runTestCases(test_cases, options, cmd_line_options):
except Exception, e:
log.exception(e)
- log.debug("Creating report")
-
try:
yield oreporter.createReport(options)
except reporter.OONIBReportCreationFailed:
@@ -266,17 +365,59 @@ def runTestCases(test_cases, options, cmd_line_options):
except Exception, e:
log.exception(e)
- # This deferred list is a deferred list of deferred lists
- # it is used to store all the deferreds of the tests that
- # are run
- input_unit_idx = 0
+ try:
+ loadResumeFile()
+ except InvalidResumeFile:
+ log.err("Error in loading resume file %s" % config.resume_filename)
+ log.err("Try deleting the resume file")
+ raise InvalidResumeFile
+
+ test_filename = os.path.basename(cmd_line_options['test'])
+
+ if cmd_line_options['resume']:
+ resumeTest(test_filename, input_unit_factory)
+ else:
+ config.stateDict[test_filename] = 0
+
try:
for input_unit in input_unit_factory:
log.debug("Running this input unit %s" % input_unit)
+
yield runTestCasesWithInputUnit(test_cases, input_unit,
oreporter)
- input_unit_idx += 1
+ yield increaseInputUnitIdx(test_filename)
except Exception:
log.exception("Problem in running test")
+def runTest(cmd_line_options):
+ config.cmd_line_options = cmd_line_options
+ config.generateReportFilenames()
+
+ if cmd_line_options['reportfile']:
+ config.reports.yamloo = cmd_line_options['reportfile']
+ config.reports.pcap = config.reports.yamloo+".pcap"
+
+ if os.path.exists(config.reports.pcap):
+ print "Report PCAP already exists with filename %s" % config.reports.pcap
+ print "Renaming it to %s" % config.reports.pcap+".old"
+ os.rename(config.reports.pcap, config.reports.pcap+".old")
+
+ classes = findTestClassesFromFile(cmd_line_options['test'])
+ test_cases, options = loadTestsAndOptions(classes, cmd_line_options)
+ if config.privacy.includepcap:
+ from ooni.utils.txscapy import ScapyFactory, ScapySniffer
+ try:
+ checkForRoot()
+ except NotRootError:
+ print "[!] Includepcap options requires root priviledges to run"
+ print " you should run ooniprobe as root or disable the options in ooniprobe.conf"
+ sys.exit(1)
+
+ print "Starting sniffer"
+ config.scapyFactory = ScapyFactory(config.advanced.interface)
+
+ sniffer = ScapySniffer(config.reports.pcap)
+ config.scapyFactory.registerProtocol(sniffer)
+
+ return runTestCases(test_cases, options, cmd_line_options)
diff --git a/ooni/templates/httpt.py b/ooni/templates/httpt.py
index e36c049..0d53ebe 100644
--- a/ooni/templates/httpt.py
+++ b/ooni/templates/httpt.py
@@ -12,9 +12,9 @@ from twisted.internet import protocol, defer
from twisted.internet.ssl import ClientContextFactory
from twisted.internet import reactor
-from twisted.internet.error import ConnectionRefusedError
+from twisted.internet.error import ConnectionRefusedError, DNSLookupError, TCPTimedOutError
-from twisted.web._newclient import Request
+from twisted.web._newclient import Request, Response
from ooni.nettest import NetTestCase
from ooni.utils import log
@@ -53,6 +53,9 @@ class HTTPTest(NetTestCase):
request = {}
response = {}
+ requests = []
+ responses = []
+
def _setUp(self):
try:
import OpenSSL
@@ -97,21 +100,37 @@ class HTTPTest(NetTestCase):
def processInputs(self):
pass
- def _processResponseBody(self, response_body, request, response, body_processor):
- log.debug("Processing response body")
- self.report['requests'].append({
+ def addToReport(self, request, response=None):
+ """
+ Adds to the report the specified request and response.
+
+ Args:
+ request (dict): A dict describing the request that was made
+
+ response (instance): An instance of
+ :class:twisted.web.client.Response.
+ Note: headers is our modified True Headers version.
+ """
+ log.debug("Adding %s to report" % request)
+ request_response = {
'request': {
'headers': request['headers'],
'body': request['body'],
'url': request['url'],
'method': request['method']
- },
- 'response': {
+ }
+ }
+ if response:
+ request_response['response'] = {
'headers': list(response.headers.getAllRawHeaders()),
'body': response_body,
'code': response.code
}
- })
+ self.report['requests'].append(request_response)
+
+ def _processResponseBody(self, response_body, request, response, body_processor):
+ log.debug("Processing response body")
+ self.addToReport(request, response)
if body_processor:
body_processor(response_body)
else:
@@ -184,6 +203,7 @@ class HTTPTest(NetTestCase):
return
else:
log.debug("Got response %s" % response)
+ return
if str(response.code).startswith('3'):
self.processRedirect(response.headers.getRawHeaders('Location')[0])
@@ -272,20 +292,32 @@ class HTTPTest(NetTestCase):
headers = TrueHeaders(request['headers'])
- def errback(failure):
- failure.trap(ConnectionRefusedError, SOCKSError)
+ def errback(failure, request):
+ failure.trap(ConnectionRefusedError, SOCKSError, DNSLookupError, TCPTimedOutError)
+ log.err("Error performing %s" % request)
+ self.addToReport(request)
if isinstance(failure.value, ConnectionRefusedError):
log.err("Connection refused. The backend may be down")
self.report['failure'] = 'connection_refused_error'
elif isinstance(failure.value, SOCKSError):
- log.err("Sock error. The SOCK proxy may be down")
- self.report['failure'] = 'sockserror'
+ log.err("Sock error. The SOCKS proxy may be down")
+ self.report['failure'] = 'socks_error'
+
+ elif isinstance(failure.value, DNSLookupError):
+ log.err("DNS lookup failure")
+ self.report['failure'] = 'dns_lookup_error'
+
+ elif isinstance(failure.value, TCPTimedOutError):
+ log.err("DNS lookup failure")
+ self.report['failure'] = 'tcp_timed_out_error'
+ return
d = agent.request(request['method'], request['url'], headers,
body_producer)
- d.addCallback(self._cbResponse, request, headers_processor, body_processor)
- d.addErrback(errback)
+ d.addCallback(self._cbResponse, request, headers_processor,
+ body_processor)
+ d.addErrback(errback, request)
return d
diff --git a/ooni/templates/scapyt.py b/ooni/templates/scapyt.py
index a1eade4..a787115 100644
--- a/ooni/templates/scapyt.py
+++ b/ooni/templates/scapyt.py
@@ -140,7 +140,6 @@ class BaseScapyTest(NetTestCase):
scapySender = ScapySender()
config.scapyFactory.registerProtocol(scapySender)
-
scapySender.sendPackets(packets)
scapySender.stopSending()
diff --git a/ooniprobe.conf b/ooniprobe.conf
index e9f208f..66ab017 100644
--- a/ooniprobe.conf
+++ b/ooniprobe.conf
@@ -26,7 +26,7 @@ advanced:
threadpool_size: 10
tor_socksport: 9050
# For auto detection
- interface: auto
+ interface: auto
# Of specify a specific interface
#interface: wlan0
More information about the tor-commits
mailing list