[tor-commits] [arm/master] Changing resource tracker to subclass Daemon
atagar at torproject.org
atagar at torproject.org
Mon Oct 21 21:10:15 UTC 2013
commit 7b22e543fdd24f2979a912f847aa56c54978d70b
Author: Damian Johnson <atagar at torproject.org>
Date: Fri Oct 18 14:11:53 2013 -0700
Changing resource tracker to subclass Daemon
Extending our Daemon new class so we can drop the custom pausing/stopping
funcionality.
---
arm/util/sysTools.py | 212 +++++++++++++++++++++-----------------------------
1 file changed, 89 insertions(+), 123 deletions(-)
diff --git a/arm/util/sysTools.py b/arm/util/sysTools.py
index cd5814d..e8a9e49 100644
--- a/arm/util/sysTools.py
+++ b/arm/util/sysTools.py
@@ -6,6 +6,8 @@ import os
import time
import threading
+import arm.util.tracker
+
from stem.util import conf, log, proc, str_tools, system
RESOURCE_TRACKERS = {} # mapping of pids to their resource tracker instances
@@ -60,7 +62,7 @@ def getResourceTracker(pid, noSpawn = False):
tracker.start()
return tracker
-class ResourceTracker(threading.Thread):
+class ResourceTracker(arm.util.tracker.Daemon):
"""
Periodically fetches the resource usage (cpu and memory usage) for a given
process.
@@ -77,8 +79,7 @@ class ResourceTracker(threading.Thread):
disabled if zero
"""
- threading.Thread.__init__(self)
- self.setDaemon(True)
+ arm.util.tracker.Daemon.__init__(self, resolveRate)
self.processPid = processPid
self.resolveRate = resolveRate
@@ -95,9 +96,7 @@ class ResourceTracker(threading.Thread):
self._lastCpuTotal = 0
self.lastLookup = -1
- self._halt = False # terminates thread if true
self._valLock = threading.RLock()
- self._cond = threading.Condition() # used for pausing the thread
# number of successful calls we've made
self._runCount = 0
@@ -133,124 +132,91 @@ class ResourceTracker(threading.Thread):
return self._failureCount != 0
- def run(self):
- while not self._halt:
- timeSinceReset = time.time() - self.lastLookup
-
- if self.resolveRate == 0:
- self._cond.acquire()
- if not self._halt: self._cond.wait(0.2)
- self._cond.release()
-
- continue
- elif timeSinceReset < self.resolveRate:
- sleepTime = max(0.2, self.resolveRate - timeSinceReset)
-
- self._cond.acquire()
- if not self._halt: self._cond.wait(sleepTime)
- self._cond.release()
-
- continue # done waiting, try again
-
+ def task(self):
+ timeSinceReset = time.time() - self.lastLookup
+
+ newValues = {}
+ try:
+ if self._useProc:
+ utime, stime, startTime = proc.get_stats(self.processPid, proc.Stat.CPU_UTIME, proc.Stat.CPU_STIME, proc.Stat.START_TIME)
+ totalCpuTime = float(utime) + float(stime)
+ cpuDelta = totalCpuTime - self._lastCpuTotal
+ newValues["cpuSampling"] = cpuDelta / timeSinceReset
+ newValues["cpuAvg"] = totalCpuTime / (time.time() - float(startTime))
+ newValues["_lastCpuTotal"] = totalCpuTime
+
+ memUsage = int(proc.get_memory_usage(self.processPid)[0])
+ totalMemory = proc.get_physical_memory()
+ newValues["memUsage"] = memUsage
+ newValues["memUsagePercentage"] = float(memUsage) / totalMemory
+ else:
+ # the ps call formats results as:
+ #
+ # TIME ELAPSED RSS %MEM
+ # 3-08:06:32 21-00:00:12 121844 23.5
+ #
+ # or if Tor has only recently been started:
+ #
+ # TIME ELAPSED RSS %MEM
+ # 0:04.40 37:57 18772 0.9
+
+ psCall = system.call("ps -p %s -o cputime,etime,rss,%%mem" % self.processPid)
+
+ isSuccessful = False
+ if psCall and len(psCall) >= 2:
+ stats = psCall[1].strip().split()
+
+ if len(stats) == 4:
+ try:
+ totalCpuTime = str_tools.parse_short_time_label(stats[0])
+ uptime = str_tools.parse_short_time_label(stats[1])
+ cpuDelta = totalCpuTime - self._lastCpuTotal
+ newValues["cpuSampling"] = cpuDelta / timeSinceReset
+ newValues["cpuAvg"] = totalCpuTime / uptime
+ newValues["_lastCpuTotal"] = totalCpuTime
+
+ newValues["memUsage"] = int(stats[2]) * 1024 # ps size is in kb
+ newValues["memUsagePercentage"] = float(stats[3]) / 100.0
+ isSuccessful = True
+ except ValueError, exc: pass
+
+ if not isSuccessful:
+ raise IOError("unrecognized output from ps: %s" % psCall)
+ except IOError, exc:
newValues = {}
- try:
- if self._useProc:
- utime, stime, startTime = proc.get_stats(self.processPid, proc.Stat.CPU_UTIME, proc.Stat.CPU_STIME, proc.Stat.START_TIME)
- totalCpuTime = float(utime) + float(stime)
- cpuDelta = totalCpuTime - self._lastCpuTotal
- newValues["cpuSampling"] = cpuDelta / timeSinceReset
- newValues["cpuAvg"] = totalCpuTime / (time.time() - float(startTime))
- newValues["_lastCpuTotal"] = totalCpuTime
-
- memUsage = int(proc.get_memory_usage(self.processPid)[0])
- totalMemory = proc.get_physical_memory()
- newValues["memUsage"] = memUsage
- newValues["memUsagePercentage"] = float(memUsage) / totalMemory
- else:
- # the ps call formats results as:
- #
- # TIME ELAPSED RSS %MEM
- # 3-08:06:32 21-00:00:12 121844 23.5
- #
- # or if Tor has only recently been started:
- #
- # TIME ELAPSED RSS %MEM
- # 0:04.40 37:57 18772 0.9
-
- psCall = system.call("ps -p %s -o cputime,etime,rss,%%mem" % self.processPid)
-
- isSuccessful = False
- if psCall and len(psCall) >= 2:
- stats = psCall[1].strip().split()
-
- if len(stats) == 4:
- try:
- totalCpuTime = str_tools.parse_short_time_label(stats[0])
- uptime = str_tools.parse_short_time_label(stats[1])
- cpuDelta = totalCpuTime - self._lastCpuTotal
- newValues["cpuSampling"] = cpuDelta / timeSinceReset
- newValues["cpuAvg"] = totalCpuTime / uptime
- newValues["_lastCpuTotal"] = totalCpuTime
-
- newValues["memUsage"] = int(stats[2]) * 1024 # ps size is in kb
- newValues["memUsagePercentage"] = float(stats[3]) / 100.0
- isSuccessful = True
- except ValueError, exc: pass
-
- if not isSuccessful:
- raise IOError("unrecognized output from ps: %s" % psCall)
- except IOError, exc:
- newValues = {}
- self._failureCount += 1
-
- if self._useProc:
- if self._failureCount >= 3:
- # We've failed three times resolving via proc. Warn, and fall back
- # to ps resolutions.
- log.info("Failed three attempts to get process resource usage from proc, falling back to ps (%s)" % exc)
-
- self._useProc = False
- self._failureCount = 1 # prevents lastQueryFailed() from thinking that we succeeded
- else:
- # wait a bit and try again
- log.debug("Unable to query process resource usage from proc (%s)" % exc)
- self._cond.acquire()
- if not self._halt: self._cond.wait(0.5)
- self._cond.release()
- else:
- # exponential backoff on making failed ps calls
- sleepTime = 0.01 * (2 ** self._failureCount) + self._failureCount
- log.debug("Unable to query process resource usage from ps, waiting %0.2f seconds (%s)" % (sleepTime, exc))
- self._cond.acquire()
- if not self._halt: self._cond.wait(sleepTime)
- self._cond.release()
-
- # sets the new values
- if newValues:
- # If this is the first run then the cpuSampling stat is meaningless
- # (there isn't a previous tick to sample from so it's zero at this
- # point). Setting it to the average, which is a fairer estimate.
- if self.lastLookup == -1:
- newValues["cpuSampling"] = newValues["cpuAvg"]
-
- self._valLock.acquire()
- self.cpuSampling = newValues["cpuSampling"]
- self.cpuAvg = newValues["cpuAvg"]
- self.memUsage = newValues["memUsage"]
- self.memUsagePercentage = newValues["memUsagePercentage"]
- self._lastCpuTotal = newValues["_lastCpuTotal"]
- self.lastLookup = time.time()
- self._runCount += 1
- self._failureCount = 0
- self._valLock.release()
-
- def stop(self):
- """
- Halts further resolutions and terminates the thread.
- """
+ self._failureCount += 1
- self._cond.acquire()
- self._halt = True
- self._cond.notifyAll()
- self._cond.release()
+ if self._useProc:
+ if self._failureCount >= 3:
+ # We've failed three times resolving via proc. Warn, and fall back
+ # to ps resolutions.
+ log.info("Failed three attempts to get process resource usage from proc, falling back to ps (%s)" % exc)
+ self._useProc = False
+ self._failureCount = 1 # prevents lastQueryFailed() from thinking that we succeeded
+ else:
+ # wait a bit and try again
+ log.debug("Unable to query process resource usage from proc (%s)" % exc)
+ else:
+ # exponential backoff on making failed ps calls
+ sleepTime = 0.01 * (2 ** self._failureCount) + self._failureCount
+ log.debug("Unable to query process resource usage from ps, waiting %0.2f seconds (%s)" % (sleepTime, exc))
+
+ # sets the new values
+ if newValues:
+ # If this is the first run then the cpuSampling stat is meaningless
+ # (there isn't a previous tick to sample from so it's zero at this
+ # point). Setting it to the average, which is a fairer estimate.
+ if self.lastLookup == -1:
+ newValues["cpuSampling"] = newValues["cpuAvg"]
+
+ self._valLock.acquire()
+ self.cpuSampling = newValues["cpuSampling"]
+ self.cpuAvg = newValues["cpuAvg"]
+ self.memUsage = newValues["memUsage"]
+ self.memUsagePercentage = newValues["memUsagePercentage"]
+ self._lastCpuTotal = newValues["_lastCpuTotal"]
+ self.lastLookup = time.time()
+ self._runCount += 1
+ self._failureCount = 0
+ self._valLock.release()
More information about the tor-commits
mailing list