[tor-commits] [ooni-probe/master] Make reporting more robust
art at torproject.org
art at torproject.org
Tue Apr 30 13:01:44 UTC 2013
commit f7bfd1198d5b13ad571838372f7316488a6fb3c4
Author: Arturo Filastò <art at fuffa.org>
Date: Thu Apr 11 11:59:46 2013 +0200
Make reporting more robust
Get rid of ridiculous logic that uses a deferred list, instead keep track of
the report task via a single deferred.
---
ooni/director.py | 6 +-
ooni/reporter.py | 166 +++++++++++++++++++++++++++++-------------------------
2 files changed, 93 insertions(+), 79 deletions(-)
diff --git a/ooni/director.py b/ooni/director.py
index ec4e461..e963049 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -189,14 +189,16 @@ class Director(object):
net_test = NetTest(net_test_loader, report)
net_test.director = self
- net_test.report.open()
+
+ yield net_test.report.open()
self.measurementManager.schedule(net_test.generateMeasurements())
self.activeNetTests.append(net_test)
net_test.done.addBoth(self.netTestDone, net_test)
net_test.done.addBoth(report.close)
- return net_test.done
+
+ yield net_test.done
def startSniffing(self):
""" Start sniffing with Scapy. Exits if required privileges (root) are not
diff --git a/ooni/reporter.py b/ooni/reporter.py
index a3811b7..5338f3a 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -26,11 +26,10 @@ except ImportError:
log.err("Scapy is not installed.")
-from ooni.errors import InvalidOONIBCollectorAddress
-from ooni.errors import ReportNotCreated, ReportAlreadyClosed
+from ooni import errors
from ooni import otime
-from ooni.utils import pushFilenameStack
+from ooni.utils import geodata, pushFilenameStack
from ooni.utils.net import BodyReceiver, StringProducer, userAgents
from ooni import config
@@ -119,7 +118,6 @@ def safe_dump(data, stream=None, **kw):
class OReporter(object):
def __init__(self, test_details):
- self.created = defer.Deferred()
self.testDetails = test_details
def createReport(self):
@@ -190,9 +188,9 @@ class YAMLReporter(OReporter):
def _write(self, format_string, *args):
if not self._stream:
- raise ReportNotCreated
+ raise errors.ReportNotCreated
if self._stream.closed:
- raise ReportAlreadyClosed
+ raise errors.ReportAlreadyClosed
s = str(format_string)
assert isinstance(s, type(''))
if args:
@@ -225,8 +223,6 @@ class YAMLReporter(OReporter):
self._writeln("###########################################")
self.writeReportEntry(self.testDetails)
- self.created.callback(self)
- return defer.succeed(None)
def finish(self):
self._stream.close()
@@ -260,7 +256,7 @@ class OONIBReporter(OReporter):
regexp = '^(http|httpo):\/\/[a-zA-Z0-9\-\.]+(:\d+)?$'
if not re.match(regexp, self.collectorAddress) or \
len(self.collectorAddress) < 30:
- raise InvalidOONIBCollectorAddress
+ raise errors.InvalidOONIBCollectorAddress
@defer.inlineCallbacks
def writeReportEntry(self, entry):
@@ -306,7 +302,7 @@ class OONIBReporter(OReporter):
self.agent = Agent(reactor, sockshost="127.0.0.1",
socksport=int(config.tor.socks_port))
except Exception, e:
- yield defer.fail(e)
+ log.exception(e)
url = self.collectorAddress + '/report'
@@ -339,11 +335,16 @@ class OONIBReporter(OReporter):
bodyProducer=bodyProducer)
except ConnectionRefusedError:
log.err("Connection to reporting backend failed (ConnectionRefusedError)")
- self.created.errback(defer.fail(OONIBReportCreationError))
+ raise OONIBReportCreationError
+
+ except errors.HostUnreachable:
+ log.err("Host is not reachable (HostUnreachable error")
+ raise OONIBReportCreationError
except Exception, e:
+ log.err("Failed to connect to reporter backend")
log.exception(e)
- yield defer.fail(OONIBReportCreationError)
+ raise OONIBReportCreationError
# This is a little trix to allow us to unspool the response. We create
# a deferred and call yield on it.
@@ -355,13 +356,13 @@ class OONIBReporter(OReporter):
try:
parsed_response = json.loads(backend_response)
except Exception, e:
+ log.err("Failed to parse collector response")
log.exception(e)
- yield defer.fail(e)
+ raise OONIBReportCreationError
self.reportID = parsed_response['report_id']
self.backendVersion = parsed_response['backend_version']
log.debug("Created report with id %s" % parsed_response['report_id'])
- self.created.callback(self)
class ReportClosed(Exception):
pass
@@ -386,42 +387,39 @@ class Report(object):
self.done = defer.Deferred()
self.reportEntryManager = reportEntryManager
+ self._reporters_openned = 0
+ self._reporters_written = 0
+ self._reporters_closed = 0
+
def open(self):
"""
This will create all the reports that need to be created and fires the
created callback of the reporter whose report got created.
"""
- l = []
+ all_openned = defer.Deferred()
+
for reporter in self.reporters[:]:
- d = reporter.createReport()
- d.addErrback(self.failedOpeningReport, reporter)
- reporter.created.addErrback(self.failedOpeningReport, reporter)
- l.append(reporter.created)
- log.debug("Reporters created: %s" % l)
- # Should we consume errors silently?
- dl = defer.DeferredList(l)
- return dl
- def failedOpeningReport(self, failure, reporter):
- """
- This errback get's called every time we fail to create a report.
- By fail we mean that the number of retries has exceeded.
- Once a report has failed to be created with a reporter we give up and
- remove the reporter from the list of reporters to write to.
- """
- log.err("Failed to open %s reporter, giving up..." % reporter)
- log.err("Reporter %s failed, removing from report..." % reporter)
- self.reporters.remove(reporter)
- # Don't forward the exception unless there are no more reporters
- if len(self.reporters) == 0:
- log.err("Removed last reporter %s" % reporter)
- raise NoMoreReporters
+ def report_created(result):
+ self._reporters_openned += 1
+ if len(self.reporters) == self._reporters_openned:
+ all_openned.callback(self._reporters_openned)
+
+ def report_failed(failure):
+ print "WE HAVE FAILED!"
+ try:
+ self.failedOpeningReport(failure, reporter)
+ except errors.NoMoreReporters, e:
+ all_openned.errback(defer.fail(e))
+
+ d = defer.maybeDeferred(reporter.createReport)
+ d.addErrback(report_failed)
+ d.addCallback(report_created)
+
+ return all_openned
def write(self, measurement):
"""
- This is a lazy call that will write to all the reporters by waiting on
- them to be created.
-
Will return a deferred that will fire once the report for the specified
measurement have been written to all the reporters.
@@ -431,41 +429,45 @@ class Report(object):
an instance of :class:ooni.tasks.Measurement
Returns:
- a deferred list that will fire once all the report entries have
- been written.
+ a deferred that will fire once all the report entries have
+ been written or errbacks when no more reporters
"""
- l = []
+ all_written = defer.Deferred()
+
for reporter in self.reporters[:]:
- report_write_task = ReportEntry(reporter, measurement)
- def scheduleWriteReportEntry(result):
- self.reportEntryManager.schedule(report_write_task)
-
- # delay scheduling the task until after the report is created
- log.debug("Adding report entry task %s" % report_write_task)
- reporter.created.addCallback(scheduleWriteReportEntry)
-
- # if the write task fails n times, kill the reporter
- report_write_task.done.addErrback(self.failedOpeningReport, reporter)
- l.append(report_write_task.done)
-
- # XXX: This seems a bit fragile.
- # failedOpeningReport will forward the errback if the remaining
- # reporter has failed. If we fireOnOneErrback, this means that
- # the caller of report.write is responsible for attaching an
- # errback to the returned deferred and handle this case. That
- # probably means stopping the net test.
-
- # Here, fireOnOneErrback means to call the deferredlists errback
- # as soon as any of the deferreds return a failure. consumeErrors
- # is used to prevent any uncaught failures from raising an
- # exception. Alternately we could attach a logger to the errback
- # of each deferred and it would have the same effect
-
- # Probably the better thing to do here would be to add a callback
- # to the deferredlist that checks to see if any reporters are left
- # and raise an exception if there are no remaining reporters
- dl = defer.DeferredList(l,fireOnOneErrback=True, consumeErrors=True)
- return dl
+ def report_written(result):
+ self._reporters_written += 1
+ if len(self.reporters) == self._reporters_written:
+ all_written.callback(self._reporters_written)
+
+ def report_failed(failure):
+ log.err("Failed writing report entry")
+ log.exception(failure)
+
+ report_entry_task = ReportEntry(reporter, measurement)
+ self.reportEntryManager.schedule(report_entry_task)
+
+ report_entry_task.done.addCallback(report_written)
+ report_entry_task.done.addErrback(report_failed)
+
+ return all_written
+
+ def failedOpeningReport(self, failure, reporter):
+ """
+ This errback get's called every time we fail to create a report.
+ By fail we mean that the number of retries has exceeded.
+ Once a report has failed to be created with a reporter we give up and
+ remove the reporter from the list of reporters to write to.
+ """
+ log.err("Failed to open %s reporter, giving up..." % reporter)
+ log.err("Reporter %s failed, removing from report..." % reporter)
+ log.exception(failure)
+ self.reporters.remove(reporter)
+ # Don't forward the exception unless there are no more reporters
+ if len(self.reporters) == 0:
+ log.err("Removed last reporter %s" % reporter)
+ raise errors.NoMoreReporters
+ return
def close(self, _):
"""
@@ -476,10 +478,20 @@ class Report(object):
all the reports have been closed.
"""
- l = []
+ all_closed = defer.Deferred()
+
for reporter in self.reporters[:]:
+ def report_closed(result):
+ self._reporters_closed += 1
+ if len(self.reporters) == self._reporters_closed:
+ all_closed.callback(self._reporters_closed)
+
+ def report_failed(failure):
+ log.err("Failed closing report")
+ log.exception(failure)
+
d = defer.maybeDeferred(reporter.finish)
- l.append(d)
- dl = defer.DeferredList(l)
- return dl
+ d.addCallback(report_closed)
+ d.addErrback(report_failed)
+ return all_closed
More information about the tor-commits
mailing list