[tor-commits] [ooni-probe/master] Implement basic collector for ooniprobe reports

art at torproject.org art at torproject.org
Sun Nov 11 17:18:31 UTC 2012


commit 5544ee71c001d64aecc5a729724af53c8ec2f246
Author: Arturo Filastò <art at fuffa.org>
Date:   Sun Nov 11 18:15:54 2012 +0100

    Implement basic collector for ooniprobe reports
    * Reports can be submitted over the network via http to a remote collector
    * Implement the backend component of the collector that writes submitted reports
      to flat files, following the report_id naming convention.
    * XXX add support for connecting to the collector via Tor Hidden Services
---
 nettests/core/http_host.py |    2 +-
 ooni/oonicli.py            |    2 +
 ooni/reporter.py           |  176 ++++++++++++++++++++------------------------
 ooni/runner.py             |   19 +++--
 ooni/utils/net.py          |    2 +-
 oonib/config.py            |    6 +-
 oonib/report/api.py        |   44 ++++++++++-
 7 files changed, 138 insertions(+), 113 deletions(-)

diff --git a/nettests/core/http_host.py b/nettests/core/http_host.py
index 0e73f82..662cc40 100644
--- a/nettests/core/http_host.py
+++ b/nettests/core/http_host.py
@@ -16,7 +16,7 @@ from ooni.utils import log
 from ooni.templates import httpt
 
 class UsageOptions(usage.Options):
-    optParameters = [['backend', 'b', 'http://127.0.0.1:1234', 
+    optParameters = [['backend', 'b', 'http://127.0.0.1:57001', 
                         'URL of the test backend to use'],
                      ['content', 'c', None, 
                         'The file to read from containing the content of a block page']]
diff --git a/ooni/oonicli.py b/ooni/oonicli.py
index 9287174..9473ad8 100644
--- a/ooni/oonicli.py
+++ b/ooni/oonicli.py
@@ -40,6 +40,8 @@ class Options(usage.Options, app.ReactorSelectionMixin):
                     'Report deferred creation and callback stack traces'],]
 
     optParameters = [["reportfile", "o", None, "report file name"],
+                     ["collector", "c", None, 
+                         "Address of the collector of test results. (example: http://127.0.0.1:8888)"],
                      ["logfile", "l", None, "log file name"],
                      ["pcapfile", "p", None, "pcap file name"]]
 
diff --git a/ooni/reporter.py b/ooni/reporter.py
index 7147b35..05ea94a 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -106,6 +106,8 @@ class OReporter(object):
         pass
 
     def testDone(self, test, test_name):
+        log.debug("Finished running %s" % test_name)
+        log.debug("Writing report")
         test_report = dict(test.report)
 
         if isinstance(test.input, packet.Packet):
@@ -120,14 +122,15 @@ class OReporter(object):
                 'test_name': test_name,
                 'test_started': test_started,
                 'report': test_report}
-        self.writeReportEntry(report)
+        return self.writeReportEntry(report)
 
     def allDone(self):
         log.debug("allDone: Finished running all tests")
-        self.finish()
         try:
+            log.debug("Stopping the reactor")
             reactor.stop()
         except:
+            log.debug("Unable to stop the reactor")
             pass
         return None
 
@@ -151,6 +154,7 @@ class YAMLReporter(OReporter):
         untilConcludes(self._stream.flush)
 
     def writeReportEntry(self, entry):
+        log.debug("Writing report with YAML reporter")
         self._write('---\n')
         self._write(safe_dump(entry))
         self._write('...\n')
@@ -169,88 +173,60 @@ class YAMLReporter(OReporter):
     def finish(self):
         self._stream.close()
 
-class OONIBReporter(object):
+
+class OONIBReportUpdateFailed(Exception):
+    pass
+
+class OONIBReportCreationFailed(Exception):
+    pass
+
+class OONIBTestDetailsLookupFailed(Exception):
+    pass
+
+class OONIBReporter(OReporter):
     def __init__(self, backend_url):
         from twisted.web.client import Agent
         from twisted.internet import reactor
         self.agent = Agent(reactor)
         self.backend_url = backend_url
 
-    def _newReportCreated(self, data):
-        log.debug("newReportCreated %s" % data)
-        return data
-
-    def _processResponseBody(self, response, body_cb):
-        log.debug("processResponseBody %s" % response)
-        done = defer.Deferred()
-        response.deliverBody(BodyReceiver(done))
-        done.addCallback(body_cb)
-        return done
-
-    def createReport(self, test_name,
-            test_version, report_header):
-        url = self.backend_url + '/new'
-        software_version = '0.0.1'
-
-        request = {'software_name': 'ooni-probe',
-                'software_version': software_version,
-                'test_name': test_name,
-                'test_version': test_version,
-                'progress': 0,
-                'content': report_header
-        }
-        def gotDetails(test_details):
-            log.debug("Creating report via url %s" % url)
-
-            bodyProducer = StringProducer(json.dumps(request))
-            d = self.agent.request("POST", url, 
-                    bodyProducer=bodyProducer)
-            d.addCallback(self._processResponseBody, 
-                    self._newReportCreated)
-            return d
-
-        d = getTestDetails(options)
-        d.addCallback(gotDetails)
-        return d
-
-    def writeReportEntry(self, entry, test_id=None):
-        if not test_id:
-            log.err("Write report entry on OONIB requires test id")
-            raise NoTestIDSpecified
+    @defer.inlineCallbacks
+    def writeReportEntry(self, entry):
+        log.debug("Writing report with OONIB reporter")
+        content = '---\n'
+        content += safe_dump(entry)
+        content += '...\n'
 
-        report = '---\n'
-        report += safe_dump(entry)
-        report += '...\n'
+        url = self.backend_url + '/report/new'
 
-        url = self.backend_url + '/new'
+        request = {'report_id': self.report_id,
+                'content': content}
 
-        request = {'test_id': test_id,
-                'content': report}
+        log.debug("Updating report with id %s" % self.report_id)
+        request_json = json.dumps(request)
+        log.debug("Sending %s" % request_json)
 
         bodyProducer = StringProducer(json.dumps(request))
-        d = self.agent.request("PUT", url,
-                bodyProducer=bodyProducer)
-
-        d.addCallback(self._processResponseBody,
-                    self._newReportCreated)
-        return d
-
+        log.debug("Creating report via url %s" % url)
 
+        try:
+            response = yield self.agent.request("PUT", url, 
+                                bodyProducer=bodyProducer)
+        except:
+            # XXX we must trap this in the runner and make sure to report the data later.
+            raise OONIBReportUpdateFailed
 
-class OONIBReporter(OReporter):
-    def __init__(self, backend_url):
-        from twisted.web.client import Agent
-        from twisted.internet import reactor
-        self.agent = Agent(reactor)
-        self.backend_url = backend_url
+        #parsed_response = json.loads(backend_response)
+        #self.report_id = parsed_response['report_id']
+        #self.backend_version = parsed_response['backend_version']
+        #log.debug("Created report with id %s" % parsed_response['report_id'])
 
-    def _processResponseBody(self, *arg, **kw):
-        #done = defer.Deferred()
-        #response.deliverBody(BodyReceiver(done))
-        #done.addCallback(self._newReportCreated)
-        #return done
 
+    @defer.inlineCallbacks
     def createReport(self, options):
+        """
+        Creates a report on the oonib collector.
+        """
         test_name = options['name']
         test_version = options['version']
 
@@ -258,33 +234,41 @@ class OONIBReporter(OReporter):
         url = self.backend_url + '/report/new'
         software_version = '0.0.1'
 
-        def gotDetails(test_details):
-            content = '---\n'
-            content += safe_dump(test_details)
-            content += '...\n'
+        test_details = yield getTestDetails(options)
+
+        content = '---\n'
+        content += safe_dump(test_details)
+        content += '...\n'
 
-            request = {'software_name': 'ooniprobe',
-                'software_version': software_version,
-                'test_name': test_name,
-                'test_version': test_version,
-                'progress': 0,
-                'content': content
-            }
-            log.debug("Creating report via url %s" % url)
-            request_json = json.dumps(request)
-            log.debug("Sending %s" % request_json)
-
-            def bothCalls(*arg, **kw):
-                print arg, kw
-
-            body_producer = StringProducer(request_json)
-            d = self.agent.request("POST", url, None,
-                    body_producer)
-            d.addBoth(self._processResponseBody)
-            return d
-
-        d = getTestDetails(options)
-        d.addCallback(gotDetails)
-        # XXX handle errors
-        return d
+        request = {'software_name': 'ooniprobe',
+            'software_version': software_version,
+            'test_name': test_name,
+            'test_version': test_version,
+            'progress': 0,
+            'content': content
+        }
+        log.debug("Creating report via url %s" % url)
+        request_json = json.dumps(request)
+        log.debug("Sending %s" % request_json)
+
+        bodyProducer = StringProducer(json.dumps(request))
+        log.debug("Creating report via url %s" % url)
+
+        try:
+            response = yield self.agent.request("POST", url, 
+                                bodyProducer=bodyProducer)
+        except:
+            raise OONIBReportCreationFailed
+
+        # This is a little trix to allow us to unspool the response. We create
+        # a deferred and call yield on it.
+        response_body = defer.Deferred()
+        response.deliverBody(BodyReceiver(response_body))
+
+        backend_response = yield response_body
+
+        parsed_response = json.loads(backend_response)
+        self.report_id = parsed_response['report_id']
+        self.backend_version = parsed_response['backend_version']
+        log.debug("Created report with id %s" % parsed_response['report_id'])
 
diff --git a/ooni/runner.py b/ooni/runner.py
index b92ad7e..1aac145 100644
--- a/ooni/runner.py
+++ b/ooni/runner.py
@@ -153,8 +153,10 @@ def loadTestsAndOptions(classes, cmd_line_options):
 
 def runTestWithInput(test_class, test_method, test_input, oreporter):
     log.debug("Running %s with %s" % (test_method, test_input))
+
     def test_done(result, test_instance, test_name):
-        oreporter.testDone(test_instance, test_name)
+        log.debug("runTestWithInput: concluded %s" % test_name)
+        return oreporter.testDone(test_instance, test_name)
 
     def test_error(error, test_instance, test_name):
         log.err("%s\n" % error)
@@ -221,22 +223,21 @@ def runTestCases(test_cases, options,
             log.msg("Could not find inputs!")
             log.msg("options[0] = %s" % first)
             test_inputs = [None]
-    
+
     reportFile = open(yamloo_filename, 'w+')
 
-    #oreporter = reporter.YAMLReporter(reportFile)
-    oreporter = reporter.OONIBReporter('http://127.0.0.1:8888')
+
+    if cmd_line_options['collector']:
+        oreporter = reporter.OONIBReporter(cmd_line_options['collector'])
+    else:
+        oreporter = reporter.YAMLReporter(reportFile)
 
     input_unit_factory = InputUnitFactory(test_inputs)
 
     log.debug("Creating report")
-    yield oreporter.createReport(options)
-
-    oreporter = reporter.YAMLReporter(reportFile)
-
-    input_unit_factory = InputUnitFactory(test_inputs)
 
     yield oreporter.createReport(options)
+
     # This deferred list is a deferred list of deferred lists
     # it is used to store all the deferreds of the tests that 
     # are run
diff --git a/ooni/utils/net.py b/ooni/utils/net.py
index d43261a..3ddba61 100644
--- a/ooni/utils/net.py
+++ b/ooni/utils/net.py
@@ -7,7 +7,7 @@
 import sys
 from zope.interface import implements
 
-from twisted.internet import protocol
+from twisted.internet import protocol, defer
 from twisted.internet import threads, reactor
 from twisted.web.iweb import IBodyProducer
 
diff --git a/oonib/config.py b/oonib/config.py
index dc2be2f..b34a8ae 100644
--- a/oonib/config.py
+++ b/oonib/config.py
@@ -18,11 +18,13 @@ main.db_threadpool_size = 10
 helpers = Storage()
 
 helpers.http_return_request = Storage()
-helpers.http_return_request.port = 57001 
+helpers.http_return_request.port = 57001
+# XXX this actually needs to be the advertised Server HTTP header of our web
+# server
 helpers.http_return_request.server_version = "Apache"
 
 helpers.tcp_echo = Storage()
-helpers.tcp_echo.port = 57002 
+helpers.tcp_echo.port = 57002
 
 helpers.daphn3 = Storage()
 helpers.daphn3.yaml_file = "/path/to/data/oonib/daphn3.yaml"
diff --git a/oonib/report/api.py b/oonib/report/api.py
index 489cc25..bd409b0 100644
--- a/oonib/report/api.py
+++ b/oonib/report/api.py
@@ -59,6 +59,28 @@ def parseNewReportRequest(request):
             raise InvalidRequestField(k)
     return parsed_request
 
+def parseUpdateReportRequest(request):
+    # XXX this and the function above can probably be refactored into something
+    # more compact. There is quite a bit of code duplication going on here.
+
+    report_id_regexp = re.compile("[a-zA-Z0-9]+$")
+
+    # XXX here we are actually parsing a json object that could be quite big.
+    # If we want this to scale properly we only want to look at the test_id
+    # field.
+    # We are also keeping in memory multiple copies of the same object. A lot
+    # of optimization can be done.
+    parsed_request = json.loads(request)
+    try:
+        report_id = parsed_request['report_id']
+    except KeyError:
+        raise MissingField('report_id')
+    
+    if not re.match(report_id_regexp, report_id):
+        raise InvalidRequestField('report_id')
+
+    return parsed_request
+
 class NewReportHandlerFile(web.RequestHandler):
     """
     Responsible for creating and updating reports by writing to flat file.
@@ -129,7 +151,20 @@ class NewReportHandlerFile(web.RequestHandler):
            'content': 'XXX'
           }
         """
-        pass
+        parsed_request = parseUpdateReportRequest(self.request.body)
+        report_id = parsed_request['report_id']
+        print "Got this request %s" % parsed_request
+
+        report_filename = report_id
+        report_filename += '.yamloo'
+        try:
+            with open(report_filename, 'a+') as f: 
+                # XXX this could be quite big. We should probably use the
+                # twisted.internet.fdesc module
+                print parsed_request['content']
+                f.write(parsed_request['content'])
+        except IOError as e:
+            web.HTTPError(404, "Report not found")
 
 class NewReportHandlerDB(web.RequestHandler):
     """
@@ -188,7 +223,8 @@ class PCAPReportHandler(web.RequestHandler):
     def post(self):
         pass
 
-spec = [(r"/report/new", NewReportHandlerFile),
-        (r"/report/pcap", PCAPReportHandler)]
+reportingBackendAPI = [(r"/report/new", NewReportHandlerFile),
+    (r"/report/pcap", PCAPReportHandler)
+]
 
-reportingBackend = web.Application(spec, debug=True)
+reportingBackend = web.Application(reportingBackendAPI, debug=True)



More information about the tor-commits mailing list