[tor-commits] [ooni-probe/master] Refactoring of reporting managers
isis at torproject.org
isis at torproject.org
Sun Mar 10 01:57:01 UTC 2013
commit 055993061eb1a69a9e9876091d9b07f894deda32
Author: Arturo Filastò <art at fuffa.org>
Date: Fri Jan 11 20:56:45 2013 +0100
Refactoring of reporting managers
Make the dependencies between modules much clearer
Sketch out a graph that illustrates such depedencies
---
ooni/managers.py | 281 ++++++++++++++++++++++++++++++++++++++++--------------
1 files changed, 209 insertions(+), 72 deletions(-)
diff --git a/ooni/managers.py b/ooni/managers.py
index 3383840..ec47ca7 100644
--- a/ooni/managers.py
+++ b/ooni/managers.py
@@ -3,7 +3,108 @@ import itertools
from .ratelimiting import StaticRateLimiter
from .measurements import Measurement, NetTest
-class MeasurementsManager(object):
+class TaskManager(object):
+ retries = 2
+
+ failures = []
+ concurrency = 10
+
+ _tasks = iter()
+ _active_tasks = []
+
+ def _failed(self, failure, task):
+ """
+ The has failed to complete, we append it to the end of the task chain
+ to be re-run once all the currently scheduled tasks have run.
+ """
+ self._active_tasks.remove(task)
+ self.failures.append((failure, task))
+
+ if task.failures < self.retries:
+ self._tasks = itertools.chain(self._tasks,
+ iter(task))
+
+ self.fillSlots()
+
+ self.failed(failure, task)
+
+ def _fillSlots(self):
+ """
+ Called on test completion and schedules measurements to be run for the
+ available slots.
+ """
+ for _ in range(self.availableSlots()):
+ try:
+ task = self._tasks.next()
+ self._run(task)
+ except StopIteration:
+ break
+
+ def _suceeded(self, result, task):
+ """
+ We have successfully completed a measurement.
+ """
+ self._active_tasks.remove(task)
+ self.completedTasks += 1
+
+ self.fillSlots()
+
+ self.suceeded(result, task)
+
+ def _run(self, task):
+ """
+ This gets called to add a task to the list of currently active and
+ running tasks.
+ """
+ self._active_tasks.append(task)
+
+ d = task.run()
+ d.addCallback(self.succeeded)
+ d.addCallback(self.failed)
+
+ @property
+ def failedMeasurements(self):
+ return len(self.failures)
+
+ def availableSlots(self):
+ """
+ Returns the number of available slots for running tests.
+ """
+ return self.concurrency - len(self._active_tasks)
+
+ def schedule(self, task_or_task_iterator):
+ """
+ Takes as argument a single task or a task iterable and appends it to the task
+ generator queue.
+ """
+ self._tasks = itertools.chain(self._tasks, task_or_task_iterator)
+
+ def start(self):
+ self.initializeTaskList()
+ self._fillSlots()
+
+ def initializeTaskList(self):
+ """
+ This should contain all the logic that gets run at first start to
+ pre-populate the list of tasks to be run and the tasks currently
+ running.
+ """
+ raise NotImplemented
+
+ def failed(self, failure, task):
+ """
+ This method should be overriden by the subclass and should contains
+ logic for dealing with a failure that is subclass specific.
+
+ The default failure handling logic is to reschedule the task up until
+ we reach the maximum number of retries.
+ """
+ raise NotImplemented
+
+ def succeeded(self, result, task):
+ raise NotImplemented
+
+class MeasurementsManager(TaskManager):
"""
This is the Measurement Tracker. In here we keep track of active measurements
and issue new measurements once the active ones have been completed.
@@ -20,113 +121,149 @@ class MeasurementsManager(object):
failures = []
concurrency = 10
- _measurements = iter()
- _active_measurements = []
+ director = None
- def __init__(self, manager, netTests=None):
+ def __init__(self, netTests=None):
self.netTests = netTests if netTests else []
- self.manager = manager
- @property
- def failedMeasurements(self):
- return len(self.failures)
+ def initializeTaskList(self):
+ for net_test in self.netTests:
+ self.schedule(net_test.generateMeasurements())
- def start(self):
+ def suceeded(self, result, measurement):
+ pass
+
+ def failed(self, failure, measurement):
+ pass
+
+class Report(object):
+ def __init__(self, reporters, net_test):
"""
- Start running the measurements.
+ This will instantiate all the reporters and add them to the list of
+ available reporters.
+
+ net_test:
+ is a reference to the net_test to which the report object belongs to.
"""
- self.populateMeasurements()
- self.runMoreMeasurements()
+ self.netTest = net_test
+ self.reporters = []
+ for r in reporters:
+ reporter = r()
+ self.reporters.append(reporter)
+
+ self.createReports()
- def populateMeasurements(self):
+ def createReports(self):
"""
- Take all the setup netTests and create the measurements iterator from
- them.
+ This will create all the reports that need to be created.
"""
- for net_test in self.netTests:
- self._measurements = itertools.chain(self._measurements,
- net_test.generateMeasurements())
+ for reporter in self.reporters:
+ reporter.createReport()
- def availableSlots(self):
+ def write(self, measurement):
"""
- Returns the number of available slots for running tests.
+ This will write to all the reporters, by waiting on the created
+ callback to fire.
"""
- return self.concurrency - len(self._active_measurements)
+ for reporter in self.reporters:
+ @reporter.created.addCallback
+ def cb(result):
+ report_write_task = ReportWrite(reporter, measurement)
- def schedule(self, measurement):
- self._active_measurements.append(measurement)
+class ReportEntryManager(object):
- d = measurement.run()
- d.addCallback(self.done)
- d.addCallback(self.failed)
- return d
+ director = None
- def fillSlots(self):
- """
- Called on test completion and schedules measurements to be run for the
- available slots.
- """
- for _ in range(self.availableSlots()):
- try:
- measurement = self._measurements.next()
- self.schedule(measurement)
- except StopIteration:
- break
+ def __init__(self, manager, netTests=None):
+ self.netTests = netTests if netTests else []
+ self.manager = manager
- def done(self, result, measurement):
- """
- We have successfully completed a measurement.
- """
- self._active_measurements.remove(measurement)
- self.completedMeasurements += 1
+ def addNetTest(self, netTest):
+ self.netTests.append(netTest)
- self.fillSlots()
+ def initializeTaskList(self):
+ pass
+
+ def suceeded(self, result, measurement):
+ pass
def failed(self, failure, measurement):
- """
- The measurement has failed to complete.
- """
- self._active_measurements.remove(measurement)
- self.failures.append((failure, measurement))
+ pass
- if measurement.failures < self.retries:
- self._measurements = itertools.chain(self._measurements,
- iter(measurement))
+class Director(object):
+ """
+ Singleton object responsible for coordinating the Measurements Manager and the
+ Reporting Manager.
- self.fillSlots()
+ How this all looks like is as follows:
-class OManager(object):
- """
- Singleton object responsible for managing the Measurements Tracker and the
- Reporting Tracker.
- """
+ +------------------------------------------------+
+ | Director |<--+
+ +------------------------------------------------+ |
+ ^ ^ |
+ | Measurement | |
+ +---------+ [---------] +--------------------+ |
+ | | | MeasurementManager | |
+ | NetTest | [---------] +--------------------+ |
+ | | | [----------------] | |
+ +---------+ [---------] | [----------------] | |
+ | | [----------------] | |
+ | +--------------------+ |
+ v |
+ +---------+ ReportEntry |
+ | | [---------] +--------------------+ |
+ | Report | | ReportEntryManager | |
+ | | [---------] +--------------------+ |
+ +---------+ | [----------------] | |
+ [---------] | [----------------] |--
+ | [----------------] |
+ +--------------------+
+ [------------] are Tasks
+
+ +------+
+ | | are TaskManagers
+ +------+
+ | |
+ +------+
+
+ +------+
+ | | are general purpose objects
+ +------+
+
+ """
_scheduledTests = 0
- def __init__(self, reporters=[]):
+ def __init__(self):
self.reporters = reporters
self.netTests = []
self.measurementsManager = MeasurementsManager(manager=self,
netTests=self.netTests)
- self.measurementsManager.manager = self
+ self.measurementsManager.director = self
- self.reportingManager
+ self.reportingManager = ReportingEntryManager()
+ self.reportingManager.director = self
- def writeReport(self, measurement):
+ def startTest(self, net_test_file, inputs, options):
"""
- Write to all the configured reporters.
+ Create the Report for the NetTest and start the report NetTest.
"""
- for reporter in self.reporters:
- reporter.write(measurement)
-
- def writeFailure(self, measurement, failure):
- pass
+ report = Report()
+ net_test = NetTest(net_test_file, inputs, options, report)
+ net_test.director = self
- def addNetTest(self, net_test):
+ def measurementTimedOut(self, measurement):
"""
- This is called to add a NetTest to the list of running network tests.
+ This gets called every time a measurement times out independenty from
+ the fact that it gets re-scheduled or not.
"""
- self.netTests.append(net_test)
+ pass
+
+ def measurementFailed(self, measurement, failure):
+ pass
+
+ def writeFailure(self, measurement, failure):
+ pass
More information about the tor-commits
mailing list