[tor-commits] [stem/master] Moving retry functionality to Query class
atagar at torproject.org
atagar at torproject.org
Mon Jul 22 03:10:17 UTC 2013
commit 7d04653d908f0c62e197bafa27a5cd94634cbb53
Author: Damian Johnson <atagar at torproject.org>
Date: Mon Jul 15 09:16:54 2013 -0700
Moving retry functionality to Query class
It's far better if the Query class handles retries rather than the
DescriptorDownloader. Advantages include...
* The DescriptorDownloader no longer has a reason to support blocking queries.
There's no advantage to them since the Query knows if/when the request fails
(and hence can retry it on our behalf).
* The Query class is easier to test. The more functionality we can push down
into it the simpler the DescriptorDownloader will be.
* More advanced use cases will be using Query instances rather than
DescriptorDownloader. By having retry functionality there it'll be easily
available to them.
---
stem/descriptor/remote.py | 88 ++++++++++++++++++++++-----------------
test/integ/descriptor/remote.py | 5 +--
test/unit/descriptor/remote.py | 17 ++++----
3 files changed, 59 insertions(+), 51 deletions(-)
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index 464e7cd..88e3258 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -48,6 +48,8 @@ import urllib2
import stem.descriptor
+from stem.util import log
+
# Tor directory authorities as of commit f631b73 (7/4/13). This should only
# include authorities with 'v3ident':
#
@@ -72,27 +74,34 @@ class Query(object):
mirror. The caller can block on the response by either calling
:func:~stem.descriptor.remote.run: or iterating over our descriptor content.
- :var str address: address of the authority or mirror we're querying
- :var int port: directory port we're querying
:var str resource: resource being fetched, such as '/tor/status-vote/current/consensus.z'
+ :var str descriptor_type: type of descriptors being fetched, see
+ :func:`~stem.descriptor.__init__.parse_file`
+
+ :var list endpoints: (address, dirport) tuples of the authority or mirror
+ we're querying, this uses authorities if undefined
+ :var int retries: number of times to attempt the request if it fails
+ :var bool fall_back_to_authority: when retrying request issues the last
+ request to a directory authority if **True**
:var Exception error: exception if a problem occured
:var bool is_done: flag that indicates if our request has finished
- :var str descriptor_type: type of descriptors being fetched, see :func:`~stem.descriptor.__init__.parse_file`
:var float start_time: unix timestamp when we first started running
:var float timeout: duration before we'll time out our request
:var float runtime: time our query took, this is **None** if it's not yet finished
"""
- def __init__(self, address, port, resource, descriptor_type, timeout = None, start = True):
- self.address = address
- self.port = port
+ def __init__(self, resource, descriptor_type, endpoints = None, retries = 2, fall_back_to_authority = True, timeout = None, start = True):
self.resource = resource
+ self.descriptor_type = descriptor_type
+
+ self.endpoints = endpoints if endpoints else []
+ self.retries = retries
+ self.fall_back_to_authority = fall_back_to_authority
self.error = None
self.is_done = False
- self.descriptor_type = descriptor_type
self.start_time = None
self.timeout = timeout
@@ -106,14 +115,23 @@ class Query(object):
if start:
self.start()
- def get_url(self):
+ def pick_url(self, use_authority = False):
"""
- Provides the url being queried.
+ Provides a url that can be queried. If we have multiple endpoints then one
+ will be picked randomly.
+
+ :param bool use_authority: ignores our endpoints and uses a directory
+ authority instead
:returns: **str** for the url being queried by this request
"""
- return "http://%s:%i/%s" % (self.address, self.port, self.resource.lstrip('/'))
+ if use_authority or not self.endpoints:
+ address, dirport = random.choice(DIRECTORY_AUTHORITIES.values())
+ else:
+ address, dirport = random.choice(self.endpoints)
+
+ return "http://%s:%i/%s" % (address, dirport, self.resource.lstrip('/'))
def start(self):
"""
@@ -122,7 +140,7 @@ class Query(object):
with self._downloader_thread_lock:
if self._downloader_thread is None:
- self._downloader_thread = threading.Thread(target = self._download_descriptors, name="Descriptor Query")
+ self._downloader_thread = threading.Thread(target = self._download_descriptors, name="Descriptor Query", args = (self.retries,))
self._downloader_thread.setDaemon(True)
self._downloader_thread.start()
@@ -174,10 +192,13 @@ class Query(object):
for desc in self.run(True):
yield desc
- def _download_descriptors(self):
+ def _download_descriptors(self, retries):
try:
+ use_authority = retries == 0 and self.fall_back_to_authority
+ resource_url = self.pick_url(use_authority)
+
self.start_time = time.time()
- response = urllib2.urlopen(self.get_url(), timeout = self.timeout)
+ response = urllib2.urlopen(resource_url, timeout = self.timeout)
self.runtime = time.time() - self.start_time
# This sucks. We need to read the full response into memory before
@@ -188,8 +209,16 @@ class Query(object):
response = io.BytesIO(response.read().strip())
self._results = stem.descriptor.parse_file(response, self.descriptor_type)
+ log.trace("Descriptors retrieved from '%s' in %0.2fs" % (resource_url, self.runtime))
except:
- self.error = sys.exc_info()[1]
+ exc = sys.exc_info()[1]
+
+ if retries > 0:
+ log.debug("Unable to download descriptors from '%s' (%i retries remaining): %s" % (resource_url, retries, exc))
+ return self._download_descriptors(retries - 1)
+ else:
+ log.debug("Unable to download descriptors from '%s': %s" % (resource_url, exc))
+ self.error = exc
finally:
self.is_done = True
@@ -200,15 +229,9 @@ class DescriptorDownloader(object):
caching, retries, and other capabilities to make downloading descriptors easy
and efficient.
- Queries can be made in either a blocking or non-blocking fashion. If
- non-blocking then retries cannot be performed (since we do not know at the
- time of the request if it succeeded or failed).
-
For more advanced use cases you can use the
:class:`~stem.descriptor.remote.Query` class directly.
- :var bool block: blocks until requests have been concluded if **True**,
- otherwise provides the query as soon as its been issued
:var int retries: number of times to attempt the request if it fails
:var float timeout: duration before we'll time out our request, no timeout is
applied if **None**
@@ -219,37 +242,24 @@ class DescriptorDownloader(object):
request to a directory authority if **True**
"""
- def __init__(self, block = True, retries = 2, timeout = None, start_when_requested = True, fall_back_to_authority = True):
- self.block = block
+ def __init__(self, retries = 2, timeout = None, start_when_requested = True, fall_back_to_authority = True):
self.retries = retries
self.timeout = timeout
self.start_when_requested = start_when_requested
self.fall_back_to_authority = fall_back_to_authority
- self._directories = DIRECTORY_AUTHORITIES.values()
+ self._endpoints = DIRECTORY_AUTHORITIES.values()
def _query(self, resource, descriptor_type, retries):
"""
Issues a request for the given resource.
"""
- if self.fall_back_to_authority and retries == 0:
- address, dirport = random.choice(DIRECTORY_AUTHORITIES.values())
- else:
- address, dirport = random.choice(self._directories)
-
- query = Query(
- address,
- dirport,
+ return Query(
resource,
descriptor_type,
+ endpoints = self._endpoints,
+ retries = self.retries,
+ fall_back_to_authority = self.fall_back_to_authority,
timeout = self.timeout,
start = self.start_when_requested,
)
-
- if self.block:
- query.run(True)
-
- if query.error and retries > 0:
- return self.query(resource, descriptor_type, retries - 1)
-
- return query
diff --git a/test/integ/descriptor/remote.py b/test/integ/descriptor/remote.py
index 50d6d28..8509556 100644
--- a/test/integ/descriptor/remote.py
+++ b/test/integ/descriptor/remote.py
@@ -29,11 +29,10 @@ class TestDescriptorReader(unittest.TestCase):
for authority, (address, dirport) in stem.descriptor.remote.DIRECTORY_AUTHORITIES.items():
queries.append(stem.descriptor.remote.Query(
- address,
- dirport,
'/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
'server-descriptor 1.0',
- 30,
+ endpoints = [(address, dirport)],
+ timeout = 30,
))
for query in queries:
diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py
index ab2276e..3aadaac 100644
--- a/test/unit/descriptor/remote.py
+++ b/test/unit/descriptor/remote.py
@@ -65,14 +65,13 @@ class TestDescriptorDownloader(unittest.TestCase):
urlopen_mock.return_value = io.BytesIO(TEST_DESCRIPTOR)
query = stem.descriptor.remote.Query(
- '128.31.0.39',
- 9131,
'/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
'server-descriptor 1.0',
+ endpoints = [('128.31.0.39', 9131)],
)
expeced_url = 'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31'
- self.assertEqual(expeced_url, query.get_url())
+ self.assertEqual(expeced_url, query.pick_url())
descriptors = list(query)
self.assertEqual(1, len(descriptors))
@@ -95,10 +94,9 @@ class TestDescriptorDownloader(unittest.TestCase):
urlopen_mock.return_value = io.BytesIO(descriptor_content)
query = stem.descriptor.remote.Query(
- '128.31.0.39',
- 9131,
'/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
'server-descriptor 1.0',
+ endpoints = [('128.31.0.39', 9131)],
)
# checking via the iterator
@@ -119,15 +117,16 @@ class TestDescriptorDownloader(unittest.TestCase):
urlopen_mock.side_effect = socket.timeout('connection timed out')
query = stem.descriptor.remote.Query(
- '128.31.0.39',
- 9131,
'/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
'server-descriptor 1.0',
- 5,
+ endpoints = [('128.31.0.39', 9131)],
+ fall_back_to_authority = False,
+ timeout = 5,
)
self.assertRaises(socket.timeout, list, query.run())
- urlopen_mock.assert_called_once_with(
+ urlopen_mock.assert_called_with(
'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
timeout = 5,
)
+ self.assertEqual(3, urlopen_mock.call_count)
More information about the tor-commits
mailing list