[tor-commits] [ooni-probe/master] Link TaskManagers by Least Available Slots

art at torproject.org art at torproject.org
Tue Aug 27 10:03:52 UTC 2013


commit 0467a6ea8c33a35beb315e56de3b2f5bdae605d6
Author: aagbsn <aagbsn at extc.org>
Date:   Thu Aug 22 13:00:45 2013 +0200

    Link TaskManagers by Least Available Slots
    
    A LinkedTaskManager only has availableSlots if its child TaskManager
    also has availableSlots. Children LinkedTaskManagers must notify the
    parent LinkedTaskManager when a task is complete because the task
    queue is event-driven.
---
 ooni/director.py |    4 ++++
 ooni/managers.py |   28 ++++++++++++++++++++++++++--
 2 files changed, 30 insertions(+), 2 deletions(-)

diff --git a/ooni/director.py b/ooni/director.py
index 2c48bb0..1e3f062 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -71,6 +71,10 @@ class Director(object):
 
         self.reportEntryManager = ReportEntryManager()
         self.reportEntryManager.director = self
+        # Link the TaskManager's by least available slots.
+        self.measurementManager.child = self.reportEntryManager
+        # Notify the parent when tasks complete # XXX deadlock!?
+        self.reportEntryManager.parent = self.measurementManager
 
         self.successfulMeasurements = 0
         self.failedMeasurements = 0
diff --git a/ooni/managers.py b/ooni/managers.py
index ff7c2f2..cc2d067 100644
--- a/ooni/managers.py
+++ b/ooni/managers.py
@@ -129,7 +129,31 @@ class TaskManager(object):
         """
         raise NotImplemented
 
-class MeasurementManager(TaskManager):
+class LinkedTaskManager(TaskManager):
+    def __init__(self):
+        super(LinkedTaskManager, self).__init__()
+        self.child = None
+        self.parent = None
+
+    @property
+    def availableSlots(self):
+        mySlots = self.concurrency - len(self._active_tasks)
+        if self.child:
+            s = self.child.availableSlots
+            return min(s, mySlots)
+        return mySlots
+
+    def _succeeded(self, result, task):
+        super(LinkedTaskManager, self)._succeeded(result, task)
+        if self.parent:
+            self.parent._fillSlots()
+
+    def _failed(self, result, task):
+        super(LinkedTaskManager, self)._failed(result, task)
+        if self.parent:
+            self.parent._fillSlots()
+
+class MeasurementManager(LinkedTaskManager):
     """
     This is the Measurement Tracker. In here we keep track of active measurements
     and issue new measurements once the active ones have been completed.
@@ -155,7 +179,7 @@ class MeasurementManager(TaskManager):
     def failed(self, failure, measurement):
         pass
 
-class ReportEntryManager(TaskManager):
+class ReportEntryManager(LinkedTaskManager):
     def __init__(self):
         if config.advanced.reporting_retries:
             self.retries = config.advanced.reporting_retries





More information about the tor-commits mailing list