[tor-commits] [stem/master] Implement CollecTor download retries
atagar at torproject.org
atagar at torproject.org
Sat Aug 17 20:44:27 UTC 2019
commit d3de8b55528f4ed186fb9f99d032f421a4c1f301
Author: Damian Johnson <atagar at torproject.org>
Date: Thu Jul 4 17:27:32 2019 -0700
Implement CollecTor download retries
---
stem/descriptor/collector.py | 63 +++++++++++++++++++++++++++++++--------
test/unit/descriptor/collector.py | 10 ++++++-
2 files changed, 60 insertions(+), 13 deletions(-)
diff --git a/stem/descriptor/collector.py b/stem/descriptor/collector.py
index 21c9d91d..0bbf4251 100644
--- a/stem/descriptor/collector.py
+++ b/stem/descriptor/collector.py
@@ -47,9 +47,11 @@ With this you can either download and read directly from CollecTor...
"""
import json
+import sys
import time
from stem.descriptor import Compression
+from stem.util import log
try:
# account for urllib's change between python 2.x and 3.x
@@ -88,6 +90,53 @@ def url(resource, compression = Compression.PLAINTEXT):
return COLLECTOR_URL + '/'.join(path) + extension
+def _download(url, compression, timeout, retries):
+ """
+ Download from the given url.
+
+ :param str url: url to download from
+ :param descriptor.Compression compression: decompression type
+ :param int timeout: timeout when connection becomes idle, no timeout applied
+ if **None**
+ :param int retires: maximum attempts to impose
+
+ :returns: content of the given url
+
+ :raises:
+ * **IOError** if unable to decompress
+ * **socket.timeout** if our request timed out
+ * **urllib2.URLError** for most request failures
+
+ Note that the urllib2 module may fail with other exception types, in
+ which case we'll pass it along.
+ """
+
+ start_time = time.time()
+
+ try:
+ response = urllib.urlopen(url, timeout = timeout).read()
+ except:
+ exc = sys.exc_info()[1]
+
+ if timeout is not None:
+ timeout -= time.time() - start_time
+
+ if retries > 0 and (timeout is None or timeout > 0):
+ log.debug("Failed to download from CollecTor at '%s' (%i retries remaining): %s" % (url, retries, exc))
+ return _download(url, compression, timeout, retries - 1)
+ else:
+ log.debug("Failed to download from CollecTor at '%s': %s" % (url, exc))
+ raise
+
+ if compression not in (None, Compression.PLAINTEXT):
+ try:
+ response = compression.decompress(response)
+ except Exception as exc:
+ raise IOError('Unable to decompress %s response from %s: %s' % (compression, url, exc))
+
+ return stem.util.str_tools._to_unicode(response)
+
+
class CollecTor(object):
"""
Downloader for descriptors from CollecTor. The contents of CollecTor are
@@ -110,7 +159,6 @@ class CollecTor(object):
self._cached_index_at = 0
if compression == 'best':
-
for option in (Compression.LZMA, Compression.BZ2, Compression.GZIP):
if option.available:
self.compression = option
@@ -134,17 +182,8 @@ class CollecTor(object):
"""
if not self._cached_index or time.time() - self._cached_index_at >= REFRESH_INDEX_RATE:
- # TODO: add retry support
-
- response = urllib.urlopen(url('index', self.compression), timeout = self.timeout).read()
-
- if self.compression:
- try:
- response = self.compression.decompress(response)
- except Exception as exc:
- raise IOError('Unable to decompress response as %s: %s' % (self.compression, exc))
-
- self._cached_index = json.loads(stem.util.str_tools._to_unicode(response))
+ response = _download(url('index', self.compression), self.compression, self.timeout, self.retries)
+ self._cached_index = json.loads(response)
self._cached_index_at = time.time()
return self._cached_index
diff --git a/test/unit/descriptor/collector.py b/test/unit/descriptor/collector.py
index 6aeda8f0..c46fb60c 100644
--- a/test/unit/descriptor/collector.py
+++ b/test/unit/descriptor/collector.py
@@ -28,6 +28,14 @@ class TestCollector(unittest.TestCase):
self.assertEqual('https://collector.torproject.org/index/index.json.bz2', url('index', compression = Compression.BZ2))
self.assertEqual('https://collector.torproject.org/index/index.json.xz', url('index', compression = Compression.LZMA))
+ @patch(URL_OPEN)
+ def test_retries(self, urlopen_mock):
+ collector = CollecTor(retries = 4)
+ urlopen_mock.side_effect = IOError('boom')
+
+ self.assertRaisesRegexp(IOError, 'boom', collector.index)
+ self.assertEqual(5, urlopen_mock.call_count)
+
@patch(URL_OPEN, Mock(return_value = io.BytesIO(b'{"index_created":"2017-12-25 21:06","build_revision":"56a303e","path":"https://collector.torproject.org"}')))
def test_index(self):
expected = {
@@ -52,4 +60,4 @@ class TestCollector(unittest.TestCase):
for compression in (Compression.GZIP, Compression.BZ2, Compression.LZMA):
with patch(URL_OPEN, Mock(return_value = io.BytesIO(b'not compressed'))):
collector = CollecTor(compression = compression)
- self.assertRaisesRegexp(IOError, 'Unable to decompress response as %s' % compression, collector.index)
+ self.assertRaisesRegexp(IOError, 'Unable to decompress %s response' % compression, collector.index)
More information about the tor-commits
mailing list