[tor-commits] [sbws/m12] fix: scanner: Rename functions

juga at torproject.org juga at torproject.org
Wed Jun 30 08:53:27 UTC 2021


commit dbf50208a1a76e5a17f927d92fce43e2e15778fb
Author: juga0 <juga at riseup.net>
Date:   Tue Jun 29 09:20:45 2021 +0000

    fix: scanner: Rename functions
    
    to more appropriate names, after switching to concurrent. futures.
---
 sbws/core/scanner.py            | 27 ++++++++++++++-------------
 tests/unit/core/test_scanner.py | 29 +++++++++++++++++------------
 2 files changed, 31 insertions(+), 25 deletions(-)

diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py
index bb723bf..25ad01a 100644
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@ -599,7 +599,7 @@ def _next_expected_amount(
     return expected_amount
 
 
-def result_putter(result_dump, measurement):
+def measurement_writer(result_dump, measurement):
     # Since result_dump thread is calling queue.get() every second,
     # the queue should be full for only 1 second.
     # This call blocks at maximum timeout seconds.
@@ -615,7 +615,7 @@ def result_putter(result_dump, measurement):
         )
 
 
-def result_putter_error(target, exception):
+def log_measurement_exception(target, exception):
     print("in result putter error")
     if settings.end_event.is_set():
         return
@@ -666,8 +666,8 @@ def main_loop(
     After that, it will reuse a thread that has finished for every relay to
     measure.
 
-    Then ``wait_for_results`` is call, to obtain the results in the completed
-    ``future``\s.
+    Then ``process_completed_futures`` is call, to obtain the results in the
+    completed ``future``\s.
 
     """
     log.info("Started the main loop to measure the relays.")
@@ -710,18 +710,19 @@ def main_loop(
             # `Future`s.
 
             # Each target relay_recent_measurement_attempt is incremented in
-            # `wait_for_results` as well as hbeat measured fingerprints.
+            # `process_completed_futures` as well as hbeat measured
+            # fingerprints.
             num_relays = len(pending_results)
             # Without a callback, it's needed to pass `result_dump` here to
             # call the function that writes the measurement when it's
             # finished.
-            wait_for_results(
+            process_completed_futures(
                 executor,
                 hbeat,
                 result_dump,
                 pending_results,
             )
-            force_get_results(pending_results)
+            wait_futures_completed(pending_results)
 
         # Print the heartbeat message
         hbeat.print_heartbeat_message()
@@ -742,11 +743,11 @@ def main_loop(
             stop_threads(signal.SIGTERM, None)
 
 
-def wait_for_results(executor, hbeat, result_dump, pending_results):
+def process_completed_futures(executor, hbeat, result_dump, pending_results):
     """Obtain the relays' measurements as they finish.
 
     For every ``Future`` measurements that gets completed, obtain the
-    ``result`` and call ``result_putter``, which put the ``Result`` in
+    ``result`` and call ``measurement_writer``, which put the ``Result`` in
     ``ResultDump`` queue and complete immediately.
 
     ``ResultDump`` thread (started before and out of this function) will get
@@ -754,7 +755,7 @@ def wait_for_results(executor, hbeat, result_dump, pending_results):
     the measurement threads.
 
     If there was an exception not caught by ``measure_relay``, it will call
-    instead ``result_putter_error``, which logs the error and complete
+    instead ``log_measurement_exception``, which logs the error and complete
     immediately.
 
     """
@@ -779,7 +780,7 @@ def wait_for_results(executor, hbeat, result_dump, pending_results):
             try:
                 measurement = future_measurement.result()
             except Exception as e:
-                result_putter_error(target, e)
+                log_measurement_exception(target, e)
                 import psutil
 
                 log.warning(psutil.Process(os.getpid()).memory_full_info())
@@ -791,7 +792,7 @@ def wait_for_results(executor, hbeat, result_dump, pending_results):
                 dumpstacks()
             else:
                 log.info("Measurement ready: %s" % (measurement))
-                result_putter(result_dump, measurement)
+                measurement_writer(result_dump, measurement)
             # `pending_results` has all the initial queued `Future`s,
             # they don't decrease as they get completed, but we know 1 has be
             # completed in each loop,
@@ -803,7 +804,7 @@ def wait_for_results(executor, hbeat, result_dump, pending_results):
             )
 
 
-def force_get_results(pending_results):
+def wait_futures_completed(pending_results):
     """Wait for last futures to finish, before starting new loop."""
     log.info("Wait for any remaining measurements.")
     done, not_done = concurrent.futures.wait(
diff --git a/tests/unit/core/test_scanner.py b/tests/unit/core/test_scanner.py
index f7ec69d..f805d50 100644
--- a/tests/unit/core/test_scanner.py
+++ b/tests/unit/core/test_scanner.py
@@ -14,24 +14,26 @@ from sbws.lib.relayprioritizer import RelayPrioritizer
 log = logging.getLogger(__name__)
 
 
-def test_result_putter(sbwshome_only_datadir, result_success, rd, end_event):
+def test_measurement_writer(
+    sbwshome_only_datadir, result_success, rd, end_event
+):
     if rd is None:
         pytest.skip("ResultDump is None")
     # Put one item in the queue
-    scanner.result_putter(rd, result_success)
+    scanner.measurement_writer(rd, result_success)
     assert rd.queue.qsize() == 1
 
     # Make queue maxsize 1, so that it'll be full after the first callback.
     # The second callback will wait 1 second, then the queue will be empty
     # again.
     rd.queue.maxsize = 1
-    scanner.result_putter(rd, result_success)
+    scanner.measurement_writer(rd, result_success)
     # after putting 1 result, the queue will be full
     assert rd.queue.qsize() == 1
     assert rd.queue.full()
     # it's still possible to put another results, because the callback will
     # wait 1 second and the queue will be empty again.
-    scanner.result_putter(rd, result_success)
+    scanner.measurement_writer(rd, result_success)
     assert rd.queue.qsize() == 1
     assert rd.queue.full()
     end_event.set()
@@ -49,9 +51,10 @@ def test_complete_measurements(
 ):
     """
     Test that the ``ThreadPoolExecutor``` creates the epexted number of
-    futures, ``wait_for_results``process all of them and ``force_get_results``
-    completes them if they were not already completed by the time
-    ``wait_for_results`` has already processed them.
+    futures, ``process_completed_futures``process all of them and
+    ``wait_futures_completed``  completes them if they were not already
+    completed by the time ``process_completed_futures`` has already processed
+    them.
     There are not real measurements done and the ``results`` are None objects.
     Running the scanner with the test network, test the real measurements.
 
@@ -90,12 +93,14 @@ def test_complete_measurements(
 
             assert len(pending_results) == 321
             assert len(hbeat.measured_fp_set) == 0
-            log.debug("Before wait_for_results.")
-            scanner.wait_for_results(executor, hbeat, rd, pending_results)
-            log.debug("After wait_for_results")
+            log.debug("Before process_completed_futures.")
+            scanner.process_completed_futures(
+                executor, hbeat, rd, pending_results
+            )
+            log.debug("After process_completed_futures")
             for pending_result in pending_results:
                 assert pending_result.done() is True
             assert len(hbeat.measured_fp_set) == 321
-            scanner.force_get_results(pending_results)
-            log.debug("After force_get_results.")
+            scanner.wait_futures_completed(pending_results)
+            log.debug("After wait_futures_completed.")
             assert concurrent.futures.ALL_COMPLETED





More information about the tor-commits mailing list