[tor-commits] [sbws/master] new: destination: Recover destination when it failed
juga at torproject.org
juga at torproject.org
Thu Mar 21 18:30:42 UTC 2019
commit 083e7c702313ffd3e4bf2be31ec19cb90bc046a1
Author: juga0 <juga at riseup.net>
Date: Sat Mar 16 10:21:22 2019 +0000
new: destination: Recover destination when it failed
Closes: #29589.
---
sbws/core/scanner.py | 24 ++---
sbws/lib/destination.py | 171 +++++++++++++++++++++++-------
tests/integration/lib/test_destination.py | 52 ++++-----
tests/unit/lib/test_destination.py | 54 ++++++++++
4 files changed, 222 insertions(+), 79 deletions(-)
diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py
index 24a975b..4b06225 100644
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@ -267,16 +267,13 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
log.critical("There are not any functional destinations.\n"
"It is recommended to set several destinations so that "
"the scanner can continue if one fails.")
- # Exit the scanner with error stopping threads first.
- stop_threads(signal.SIGTERM, None, 1)
- # When the destinations can recover would be implemented;
- # reason = 'Unable to get destination'
- # log.debug(reason + ' to measure %s %s',
- # relay.nickname, relay.fingerprint)
- # return [
- # ResultErrorDestination(relay, [], dest.url, our_nick,
- # msg=reason),
- # ]
+ # NOTE: Because this is executed in a thread, stop_threads can not
+ # be call from here, it has to be call from the main thread.
+ # Instead set the singleton end event, that will call stop_threads
+ # from the main process.
+ # Errors with only one destination are set in ResultErrorStream.
+ settings.end_event.set()
+ return None
# Pick a relay to help us measure the given relay. If the given relay is an
# exit, then pick a non-exit. Otherwise pick an exit.
helper = None
@@ -321,10 +318,9 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
log.debug('Destination %s unusable via circuit %s (%s), %s',
dest.url, circ_fps, nicknames, usable_data)
cb.close_circuit(circ_id)
- # TODO: Return a different/new type of ResultError?
- msg = 'The destination seemed to have stopped being usable'
return [
- ResultErrorStream(relay, circ_fps, dest.url, our_nick, msg=msg),
+ ResultErrorStream(relay, circ_fps, dest.url, our_nick,
+ msg=usable_data),
]
assert is_usable
assert 'content_length' in usable_data
@@ -561,7 +557,7 @@ def wait_for_results(num_relays_to_measure, pending_results):
than the time to request over the network.)
"""
num_last_measured = 1
- while num_last_measured > 0:
+ while num_last_measured > 0 and not settings.end_event.is_set():
log.info("Pending measurements: %s out of %s: ",
len(pending_results), num_relays_to_measure)
time.sleep(TIMEOUT_MEASUREMENTS)
diff --git a/sbws/lib/destination.py b/sbws/lib/destination.py
index a92df61..59474a6 100644
--- a/sbws/lib/destination.py
+++ b/sbws/lib/destination.py
@@ -1,3 +1,5 @@
+import collections
+import datetime
import logging
import random
import requests
@@ -6,8 +8,13 @@ from stem.control import EventType
from sbws.globals import DESTINATION_VERIFY_CERTIFICATE
import sbws.util.stem as stem_utils
+from ..globals import (
+ MAX_NUM_DESTINATION_FAILURES,
+ DELTA_SECONDS_RETRY_DESTINATION,
+ NUM_DESTINATION_ATTEMPTS_STORED,
+ FACTOR_INCREMENT_DESTINATION_RETRY
+ )
-from ..globals import MAXIMUM_NUMBER_DESTINATION_FAILURES
log = logging.getLogger(__name__)
@@ -98,73 +105,155 @@ def connect_to_destination_over_circuit(dest, circ_id, session, cont, max_dl):
try:
head = session.head(dest.url, verify=dest.verify)
except requests.exceptions.RequestException as e:
- dest.set_failure()
+ dest.add_failure()
return False, 'Could not connect to {} over circ {} {}: {}'.format(
dest.url, circ_id, stem_utils.circuit_str(cont, circ_id), e)
finally:
stem_utils.remove_event_listener(cont, listener)
if head.status_code != requests.codes.ok:
- dest.set_failure()
+ dest.add_failure()
return False, error_prefix + 'we expected HTTP code '\
'{} not {}'.format(requests.codes.ok, head.status_code)
if 'content-length' not in head.headers:
- dest.set_failure()
+ dest.add_failure()
return False, error_prefix + 'we except the header Content-Length '\
'to exist in the response'
content_length = int(head.headers['content-length'])
if max_dl > content_length:
- dest.set_failure()
+ dest.add_failure()
return False, error_prefix + 'our maximum configured download size '\
'is {} but the content is only {}'.format(max_dl, content_length)
log.debug('Connected to %s over circuit %s', dest.url, circ_id)
- # Any failure connecting to the destination will call set_failure,
- # which will set `failed` to True and count consecutives failures.
- # It can not be set at the start, to be able to know if it failed a
- # a previous time, which is checked by set_failure.
- # Future improvement: use a list to count consecutive failures
- # or calculate it from the results.
- dest.failed = False
+ # Any failure connecting to the destination will call add_failure,
+ # It can not be set at the start, to be able to know whether it is
+ # failing consecutive times.
+ dest.add_success()
return True, {'content_length': content_length}
class Destination:
- def __init__(self, url, max_dl, verify):
+ """Web server from which data is downloaded to measure bandwidth.
+ """
+ # NOTE: max_dl and verify should be optional and have defaults
+ def __init__(self, url, max_dl, verify,
+ max_num_failures=MAX_NUM_DESTINATION_FAILURES,
+ delta_seconds_retry=DELTA_SECONDS_RETRY_DESTINATION,
+ num_attempts_stored=NUM_DESTINATION_ATTEMPTS_STORED,
+ factor_increment_retry=FACTOR_INCREMENT_DESTINATION_RETRY):
+ """Initalizes the Web server from which the data is downloaded.
+
+ :param str url: Web server data URL to download.
+ :param int max_dl: Maximum size of the the data to download.
+ :param bool verify: Whether to verify or not the TLS certificate.
+ :param int max_num_failures: Number of consecutive failures when the
+ destination is not considered functional.
+ :param int delta_seconds_retry: Delta time to try a destination
+ that was not functional.
+ :param int num_attempts_stored: Number of attempts to store.
+ :param int factor_increment_retry: Factor to increment delta by
+ before trying to use a destination again.
+ """
self._max_dl = max_dl
u = urlparse(url)
self._url = u
self._verify = verify
- # Flag to record whether this destination failed in the last
- # measurement.
- # Failures can happen if:
- # - an HTTPS request can not be made over Tor
- # (which might be the relays fault, not the destination being
- # unreachable)
- # - the destination does not support HTTP Range requests.
- self.failed = False
- self.consecutive_failures = 0
- @property
- def is_functional(self):
+ # Attributes to decide whether a destination is functional or not.
+ self._max_num_failures = max_num_failures
+ self._num_attempts_stored = num_attempts_stored
+ # Default delta time to try a destination that was not functional.
+ self._default_delta_seconds_retry = delta_seconds_retry
+ self._delta_seconds_retry = delta_seconds_retry
+ # Using a deque (FIFO) to do not grow forever and
+ # to do not have to remove old attempts.
+ # Store tuples of timestamp and whether the destination succed or not
+ # (succed, 1, failed, 0).
+ # Initialize it as if it never failed.
+ self._attempts = collections.deque([(datetime.datetime.utcnow(), 1), ],
+ maxlen=self._num_attempts_stored)
+ self._factor = factor_increment_retry
+
+ def _last_attempts(self, n=None):
+ """Return the last ``n`` attempts the destination was used."""
+ # deque does not accept slices,
+ # a new deque is returned with the last n items
+ # (or less if there were less).
+ return collections.deque(self._attempts,
+ maxlen=(n or self._max_num_failures))
+
+ def _are_last_attempts_failures(self, n=None):
+ """
+ Return True if the last`` n`` times the destination was used
+ and failed.
+ """
+ # Count the number that there was a failure when used
+ n = n if n else self._max_num_failures
+ return ([i[1] for i in self._last_attempts(n)].count(0)
+ >= self._max_num_failures)
+
+ def _increment_time_to_retry(self, factor=None):
+ """
+ Increment the time a destination will be tried again by a ``factor``.
"""
- Returns True if there has not been a number consecutive measurements.
- Otherwise warn about it and return False.
+ self._delta_seconds_retry *= factor or self._factor
+ log.info("Incremented the time to try destination %s to %s hours.",
+ self.url, self._delta_seconds_retry / 60 / 60)
+ def _is_last_try_old_enough(self, n=None):
+ """
+ Return True if the last time it was used it was ``n`` seconds ago.
"""
- if self.consecutive_failures > MAXIMUM_NUMBER_DESTINATION_FAILURES:
- log.warning("Destination %s is not functional. Please check that "
- "it is correct.", self._url)
+ # Timestamp of the last attempt.
+ last_time = self._attempts[-1][0]
+ # If the last attempt is older than _delta_seconds_retry,
+ if (datetime.datetime.utcnow()
+ - datetime.timedelta(seconds=self._delta_seconds_retry)
+ > last_time):
+ # And try again.
+ return True
+ return False
+
+ def is_functional(self):
+ """Whether connections to a destination are failing or not.
+
+ Return True if:
+ - It did not fail more than n (by default 3) consecutive times.
+ - The last time the destination was tried
+ was x (by default 3h) seconds ago.
+ And False otherwise.
+
+ When the destination is tried again after the consecutive failures,
+ the time to try again is incremented and resetted as soon as the
+ destination does not fail.
+ """
+ # Failed the last X consecutive times
+ if self._are_last_attempts_failures():
+ log.warning("The last %s times the destination %s failed."
+ "It will not be used again in %s hours.\n",
+ self._max_num_failures, self.url,
+ self._delta_seconds_retry / 60 / 60)
+ log.warning("Please, add more destinations or increment the "
+ "number of maximum number of consecutive failures "
+ "in the configuration.")
+ # It was not used for a while and the last time it was used
+ # was long ago, then try again
+ if self._is_last_try_old_enough():
+ log.info("The destination %s was not tried for %s hours, "
+ "it is going to by tried again.")
+ # Set the next time to retry higher, in case this attempt fails
+ self._increment_time_to_retry()
+ return True
return False
+ # Reset the time to retry to the initial value
+ # In case it was incrememented
+ self._delta_seconds_retry = self._default_delta_seconds_retry
return True
- def set_failure(self):
- """Set failed to True and increase the number of consecutive failures.
- Only if it also failed in the previous measuremnt.
+ def add_failure(self, dt=None):
+ self._attempts.append((dt or datetime.datetime.utcnow(), 0))
- """
- # if it failed in the last measurement
- if self.failed:
- self.consecutive_failures += 1
- self.failed = True
+ def add_success(self, dt=None):
+ self._attempts.append((dt or datetime.datetime.utcnow(), 1))
@property
def url(self):
@@ -213,7 +302,7 @@ class DestinationList:
@property
def functional_destinations(self):
- return [d for d in self._all_dests if d.is_functional]
+ return [d for d in self._all_dests if d.is_functional()]
@staticmethod
def from_config(conf, circuit_builder, relay_list, controller):
@@ -250,4 +339,8 @@ class DestinationList:
# This removes the need for an extra lock for every measurement.
# Do not change the order of the destinations, just return a
# destination.
- return self._rng.choice(self.functional_destinations)
+ # random.choice raises IndexError with an empty list.
+ if self.functional_destinations:
+ return self._rng.choice(self.functional_destinations)
+ else:
+ return None
diff --git a/tests/integration/lib/test_destination.py b/tests/integration/lib/test_destination.py
index 54cbacc..98ed89f 100644
--- a/tests/integration/lib/test_destination.py
+++ b/tests/integration/lib/test_destination.py
@@ -1,5 +1,4 @@
"""Integration tests for destination.py"""
-from sbws.globals import MAXIMUM_NUMBER_DESTINATION_FAILURES
import sbws.util.requests as requests_utils
from sbws.lib.destination import (DestinationList, Destination,
connect_to_destination_over_circuit)
@@ -36,16 +35,12 @@ def test_connect_to_destination_over_circuit_success(persistent_launch_tor,
destination, circuit_id, session, persistent_launch_tor, 1024)
assert is_usable is True
assert 'content_length' in response
- assert not destination.failed
- assert destination.consecutive_failures == 0
- assert destination.is_functional
+ assert destination.is_functional()
def test_connect_to_destination_over_circuit_fail(persistent_launch_tor,
dests, cb, rl):
bad_destination = Destination('https://example.example', 1024, False)
- # dests._all_dests.append(bad_destination)
- # dests._usable_dests.append(bad_destination)
session = requests_utils.make_session(persistent_launch_tor, 10)
# Choose a relay that is not an exit
relay = [r for r in rl.relays
@@ -61,35 +56,40 @@ def test_connect_to_destination_over_circuit_fail(persistent_launch_tor,
assert is_usable is False
# because it is the first time it fails, failures aren't count
- assert bad_destination.failed
- assert bad_destination.consecutive_failures == 0
- assert bad_destination.is_functional
+ assert bad_destination.is_functional()
- # fail twice in a row
+ # fail three times in a row
is_usable, response = connect_to_destination_over_circuit(
bad_destination, circuit_id, session, persistent_launch_tor, 1024)
- assert bad_destination.failed
- assert bad_destination.consecutive_failures == 1
- assert bad_destination.is_functional
+ is_usable, response = connect_to_destination_over_circuit(
+ bad_destination, circuit_id, session, persistent_launch_tor, 1024)
+ assert not bad_destination.is_functional()
def test_functional_destinations(conf, cb, rl, persistent_launch_tor):
good_destination = Destination('https://127.0.0.1:28888', 1024, False)
- # Mock that it failed before and just now, but it's still considered
- # functional.
- good_destination.consecutive_failures = 3
- good_destination.failed = True
bad_destination = Destination('https://example.example', 1024, False)
- # Mock that it didn't fail now, but it already failed 11 consecutive
- # times.
- bad_destination.consecutive_failures = \
- MAXIMUM_NUMBER_DESTINATION_FAILURES + 1
- bad_destination.failed = False
- # None of the arguments are used, move to unit tests when this get
- # refactored
+
+ session = requests_utils.make_session(persistent_launch_tor, 10)
+ # Choose a relay that is not an exit
+ relay = [r for r in rl.relays
+ if r.nickname == 'relay1mbyteMAB'][0]
+ # Choose an exit, for this test it does not matter the bandwidth
+ helper = rl.exits_not_bad_allowing_port(bad_destination.port)[0]
+ circuit_path = [relay.fingerprint, helper.fingerprint]
+ # Build a circuit.
+ circuit_id, _ = cb.build_circuit(circuit_path)
+
+ # fail three times in a row
+ is_usable, response = connect_to_destination_over_circuit(
+ bad_destination, circuit_id, session, persistent_launch_tor, 1024)
+ is_usable, response = connect_to_destination_over_circuit(
+ bad_destination, circuit_id, session, persistent_launch_tor, 1024)
+ is_usable, response = connect_to_destination_over_circuit(
+ bad_destination, circuit_id, session, persistent_launch_tor, 1024)
+
destination_list = DestinationList(
conf, [good_destination, bad_destination], cb, rl,
persistent_launch_tor)
- expected_functional_destinations = [good_destination]
functional_destinations = destination_list.functional_destinations
- assert expected_functional_destinations == functional_destinations
+ assert [good_destination] == functional_destinations
diff --git a/tests/unit/lib/test_destination.py b/tests/unit/lib/test_destination.py
new file mode 100644
index 0000000..fc8b489
--- /dev/null
+++ b/tests/unit/lib/test_destination.py
@@ -0,0 +1,54 @@
+"""Unit tests for sbws.lib.destination."""
+from datetime import datetime, timedelta
+
+from sbws.lib import destination
+
+
+def test_destination_is_functional():
+ eight_hours_ago = datetime.utcnow() - timedelta(hours=8)
+ four_hours_ago = datetime.utcnow() - timedelta(hours=4)
+ two_hours_ago = datetime.utcnow() - timedelta(hours=2)
+
+ d = destination.Destination('unexistenturl', 0, False)
+ assert d.is_functional()
+
+ # Fail 3 consecutive times
+ d.add_failure()
+ d.add_failure()
+ d.add_failure()
+ assert d._are_last_attempts_failures()
+ assert not d._is_last_try_old_enough()
+ assert not d.is_functional()
+
+ # Then doesn't fail and it's functional again
+ d.add_success()
+ assert not d._are_last_attempts_failures()
+ assert d.is_functional()
+
+ # Fail again 3 times
+ d.add_failure()
+ d.add_failure()
+ # And last failure was 2h ago
+ d.add_failure(two_hours_ago)
+ assert d._are_last_attempts_failures()
+ assert not d._is_last_try_old_enough()
+ assert not d.is_functional()
+
+ # But if the last failure was 4h ago, try to use it again
+ # And last failure was 4h ago
+ d.add_failure(four_hours_ago)
+ assert d._is_last_try_old_enough()
+ assert d.is_functional()
+
+ # If last failure was 8h ago, try to use it again again
+ d.add_failure(eight_hours_ago)
+ assert d._is_last_try_old_enough()
+ assert d.is_functional()
+
+ # Whenever it does not fail again, reset the time to try again
+ # on 3 consecutive failures
+ d.add_success()
+ assert not d._are_last_attempts_failures()
+ assert d.is_functional()
+ # And the delta to try is resetted
+ assert not d._is_last_try_old_enough()
More information about the tor-commits
mailing list