[tor-commits] [ooni-probe/master] Link TaskManagers by Least Available Slots
art at torproject.org
art at torproject.org
Tue Aug 27 10:03:52 UTC 2013
commit 0467a6ea8c33a35beb315e56de3b2f5bdae605d6
Author: aagbsn <aagbsn at extc.org>
Date: Thu Aug 22 13:00:45 2013 +0200
Link TaskManagers by Least Available Slots
A LinkedTaskManager only has availableSlots if its child TaskManager
also has availableSlots. Children LinkedTaskManagers must notify the
parent LinkedTaskManager when a task is complete because the task
queue is event-driven.
---
ooni/director.py | 4 ++++
ooni/managers.py | 28 ++++++++++++++++++++++++++--
2 files changed, 30 insertions(+), 2 deletions(-)
diff --git a/ooni/director.py b/ooni/director.py
index 2c48bb0..1e3f062 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -71,6 +71,10 @@ class Director(object):
self.reportEntryManager = ReportEntryManager()
self.reportEntryManager.director = self
+ # Link the TaskManager's by least available slots.
+ self.measurementManager.child = self.reportEntryManager
+ # Notify the parent when tasks complete # XXX deadlock!?
+ self.reportEntryManager.parent = self.measurementManager
self.successfulMeasurements = 0
self.failedMeasurements = 0
diff --git a/ooni/managers.py b/ooni/managers.py
index ff7c2f2..cc2d067 100644
--- a/ooni/managers.py
+++ b/ooni/managers.py
@@ -129,7 +129,31 @@ class TaskManager(object):
"""
raise NotImplemented
-class MeasurementManager(TaskManager):
+class LinkedTaskManager(TaskManager):
+ def __init__(self):
+ super(LinkedTaskManager, self).__init__()
+ self.child = None
+ self.parent = None
+
+ @property
+ def availableSlots(self):
+ mySlots = self.concurrency - len(self._active_tasks)
+ if self.child:
+ s = self.child.availableSlots
+ return min(s, mySlots)
+ return mySlots
+
+ def _succeeded(self, result, task):
+ super(LinkedTaskManager, self)._succeeded(result, task)
+ if self.parent:
+ self.parent._fillSlots()
+
+ def _failed(self, result, task):
+ super(LinkedTaskManager, self)._failed(result, task)
+ if self.parent:
+ self.parent._fillSlots()
+
+class MeasurementManager(LinkedTaskManager):
"""
This is the Measurement Tracker. In here we keep track of active measurements
and issue new measurements once the active ones have been completed.
@@ -155,7 +179,7 @@ class MeasurementManager(TaskManager):
def failed(self, failure, measurement):
pass
-class ReportEntryManager(TaskManager):
+class ReportEntryManager(LinkedTaskManager):
def __init__(self):
if config.advanced.reporting_retries:
self.retries = config.advanced.reporting_retries
More information about the tor-commits
mailing list