[tor-commits] [stem/master] Putting all locks under a 'with' clause

atagar at torproject.org atagar at torproject.org
Thu Feb 9 04:27:53 UTC 2012


commit 8f92a27705e78ec6751b6974f6f31d772137b026
Author: Damian Johnson <atagar at torproject.org>
Date:   Wed Feb 8 20:24:10 2012 -0800

    Putting all locks under a 'with' clause
    
    At first I was dubious of the usefulness of 'with' keyword gsathya showed me.
    However, now that I've discovered that locking can be done under it I take that
    all back - it's a wonderful, wonderful thing and I don't know how I got by with
    manual locking/releasing before.
    
    ... and then they ate Sir Robin's minstrels and there was much rejoicing.
---
 stem/socket.py    |  113 +++++++++++++++-----------------------
 stem/util/conf.py |  127 +++++++++++++++++++------------------------
 test/runner.py    |  155 ++++++++++++++++++++++++-----------------------------
 3 files changed, 171 insertions(+), 224 deletions(-)

diff --git a/stem/socket.py b/stem/socket.py
index 5ae4418..9379884 100644
--- a/stem/socket.py
+++ b/stem/socket.py
@@ -110,18 +110,15 @@ class ControlSocket:
       stem.socket.SocketClosed if the socket is known to be shut down
     """
     
-    self._send_lock.acquire()
-    
-    try:
-      if not self.is_alive(): raise SocketClosed()
-      send_message(self._socket_file, message, raw)
-    except SocketClosed, exc:
-      # if send_message raises a SocketClosed then we should properly shut
-      # everything down
-      if self.is_alive(): self.close()
-      raise exc
-    finally:
-      self._send_lock.release()
+    with self._send_lock:
+      try:
+        if not self.is_alive(): raise SocketClosed()
+        send_message(self._socket_file, message, raw)
+      except SocketClosed, exc:
+        # if send_message raises a SocketClosed then we should properly shut
+        # everything down
+        if self.is_alive(): self.close()
+        raise exc
   
   def recv(self):
     """
@@ -137,18 +134,15 @@ class ControlSocket:
         complete message
     """
     
-    self._recv_lock.acquire()
-    
-    try:
-      if not self.is_alive(): raise SocketClosed()
-      return recv_message(self._socket_file)
-    except SocketClosed, exc:
-      # if recv_message raises a SocketClosed then we should properly shut
-      # everything down
-      if self.is_alive(): self.close()
-      raise exc
-    finally:
-      self._recv_lock.release()
+    with self._recv_lock:
+      try:
+        if not self.is_alive(): raise SocketClosed()
+        return recv_message(self._socket_file)
+      except SocketClosed, exc:
+        # if recv_message raises a SocketClosed then we should properly shut
+        # everything down
+        if self.is_alive(): self.close()
+        raise exc
   
   def is_alive(self):
     """
@@ -178,54 +172,41 @@ class ControlSocket:
       stem.socket.SocketError if unable to make a socket
     """
     
-    # we need both locks for this
-    self._send_lock.acquire()
-    self._recv_lock.acquire()
-    
-    # close the socket if we're currently attached to one
-    if self.is_alive(): self.close()
-    
-    try:
+    with self._send_lock, self._recv_lock:
+      # close the socket if we're currently attached to one
+      if self.is_alive(): self.close()
+      
       self._socket = self._make_socket()
       self._socket_file = self._socket.makefile()
       self._is_alive = True
-    finally:
-      self._send_lock.release()
-      self._recv_lock.release()
   
   def close(self):
     """
     Shuts down the socket. If it's already closed then this is a no-op.
     """
     
-    # we need both locks for this
-    self._send_lock.acquire()
-    self._recv_lock.acquire()
-    
-    if self._socket:
-      # if we haven't yet established a connection then this raises an error
-      # socket.error: [Errno 107] Transport endpoint is not connected
-      try: self._socket.shutdown(socket.SHUT_RDWR)
-      except socket.error: pass
+    with self._send_lock, self._recv_lock:
+      if self._socket:
+        # if we haven't yet established a connection then this raises an error
+        # socket.error: [Errno 107] Transport endpoint is not connected
+        try: self._socket.shutdown(socket.SHUT_RDWR)
+        except socket.error: pass
+        
+        # Suppressing unexpected exceptions from close. For instance, if the
+        # socket's file has already been closed then with python 2.7 that raises
+        # with...
+        # error: [Errno 32] Broken pipe
+        
+        try: self._socket.close()
+        except: pass
       
-      # Suppressing unexpected exceptions from close. For instance, if the
-      # socket's file has already been closed then with python 2.7 that raises
-      # with...
-      # error: [Errno 32] Broken pipe
+      if self._socket_file:
+        try: self._socket_file.close()
+        except: pass
       
-      try: self._socket.close()
-      except: pass
-    
-    if self._socket_file:
-      try: self._socket_file.close()
-      except: pass
-    
-    self._socket = None
-    self._socket_file = None
-    self._is_alive = False
-    
-    self._send_lock.release()
-    self._recv_lock.release()
+      self._socket = None
+      self._socket_file = None
+      self._is_alive = False
   
   def __enter__(self):
     return self
@@ -545,13 +526,10 @@ class ControlLine(str):
       IndexError if we don't have any remaining content left to parse
     """
     
-    try:
-      self._remainder_lock.acquire()
+    with self._remainder_lock:
       next_entry, remainder = _parse_entry(self._remainder, quoted, escaped)
       self._remainder = remainder
       return next_entry
-    finally:
-      self._remainder_lock.release()
   
   def pop_mapping(self, quoted = False, escaped = False):
     """
@@ -570,8 +548,7 @@ class ControlLine(str):
         the value being quoted
     """
     
-    try:
-      self._remainder_lock.acquire()
+    with self._remainder_lock:
       if self.is_empty(): raise IndexError("no remaining content to parse")
       key_match = KEY_ARG.match(self._remainder)
       
@@ -585,8 +562,6 @@ class ControlLine(str):
       next_entry, remainder = _parse_entry(remainder, quoted, escaped)
       self._remainder = remainder
       return (key, next_entry)
-    finally:
-      self._remainder_lock.release()
 
 def _parse_entry(line, quoted, escaped):
   """
diff --git a/stem/util/conf.py b/stem/util/conf.py
index 7d086f4..05e98f8 100644
--- a/stem/util/conf.py
+++ b/stem/util/conf.py
@@ -30,7 +30,7 @@ three things...
 
 There are many ways of using the Config class but the most common ones are...
 
-- Call config_dict to get a dictionary that's always synced with with a Config.
+- Call config_dict to get a dictionary that's always synced with a Config.
 
 - Make a dictionary and call synchronize() to bring it into sync with the
   Config. This does not keep it in sync as the Config changes. See the Config
@@ -216,43 +216,41 @@ class Config():
     with open(self._path, "r") as config_file:
       read_contents = config_file.readlines()
     
-    self._contents_lock.acquire()
-    self._raw_contents = read_contents
-    remainder = list(self._raw_contents)
-    
-    while remainder:
-      line = remainder.pop(0)
-      
-      # strips any commenting or excess whitespace
-      comment_start = line.find("#")
-      if comment_start != -1: line = line[:comment_start]
-      line = line.strip()
+    with self._contents_lock:
+      self._raw_contents = read_contents
+      remainder = list(self._raw_contents)
       
-      # parse the key/value pair
-      if line:
-        try:
-          key, value = line.split(" ", 1)
-          value = value.strip()
-        except ValueError:
-          log.debug("Config entry '%s' is expected to be of the format 'Key Value', defaulting to '%s' -> ''" % (line, line))
-          key, value = line, ""
+      while remainder:
+        line = remainder.pop(0)
+        
+        # strips any commenting or excess whitespace
+        comment_start = line.find("#")
+        if comment_start != -1: line = line[:comment_start]
+        line = line.strip()
         
-        if not value:
-          # this might be a multi-line entry, try processing it as such
-          multiline_buffer = []
+        # parse the key/value pair
+        if line:
+          try:
+            key, value = line.split(" ", 1)
+            value = value.strip()
+          except ValueError:
+            log.debug("Config entry '%s' is expected to be of the format 'Key Value', defaulting to '%s' -> ''" % (line, line))
+            key, value = line, ""
           
-          while remainder and remainder[0].lstrip().startswith("|"):
-            content = remainder.pop(0).lstrip()[1:] # removes '\s+|' prefix
-            content = content.rstrip("\n")          # trailing newline
-            multiline_buffer.append(content)
+          if not value:
+            # this might be a multi-line entry, try processing it as such
+            multiline_buffer = []
+            
+            while remainder and remainder[0].lstrip().startswith("|"):
+              content = remainder.pop(0).lstrip()[1:] # removes '\s+|' prefix
+              content = content.rstrip("\n")          # trailing newline
+              multiline_buffer.append(content)
+            
+            if multiline_buffer:
+              self.set(key, "\n".join(multiline_buffer), False)
+              continue
           
-          if multiline_buffer:
-            self.set(key, "\n".join(multiline_buffer), False)
-            continue
-        
-        self.set(key, value, False)
-    
-    self._contents_lock.release()
+          self.set(key, value, False)
   
   def save(self, path = None):
     """
@@ -272,17 +270,13 @@ class Config():
     elif not self._path:
       raise ValueError("Unable to save configuration: no path provided")
     
-    self._contents_lock.acquire()
-    
-    with open(self._path, 'w') as output_file:
+    with self._contents_lock, open(self._path, 'w') as output_file:
       for entry_key in sorted(self.keys()):
         for entry_value in self.get_value(entry_key, multiple = True):
           # check for multi line entries
           if "\n" in entry_value: entry_value = "\n|" + entry_value.replace("\n", "\n|")
           
           output_file.write('%s %s\n' % (entry_key, entry_value))
-    
-    self._contents_lock.release()
   
   def clear(self):
     """
@@ -290,11 +284,10 @@ class Config():
     state.
     """
     
-    self._contents_lock.acquire()
-    self._contents.clear()
-    self._raw_contents = []
-    self._requested_keys = set()
-    self._contents_lock.release()
+    with self._contents_lock:
+      self._contents.clear()
+      self._raw_contents = []
+      self._requested_keys = set()
   
   def synchronize(self, conf_mappings, limits = None):
     """
@@ -344,14 +337,12 @@ class Config():
       backfill (bool)    - calls the function with our current values if true
     """
     
-    self._contents_lock.acquire()
-    self._listeners.append(listener)
-    
-    if backfill:
-      for key in self.keys():
-        listener(self, key)
-    
-    self._contents_lock.release()
+    with self._contents_lock:
+      self._listeners.append(listener)
+      
+      if backfill:
+        for key in self.keys():
+          listener(self, key)
   
   def clear_listeners(self):
     """
@@ -393,9 +384,7 @@ class Config():
                             the values are appended
     """
     
-    try:
-      self._contents_lock.acquire()
-      
+    with self._contents_lock:
       if isinstance(value, str):
         if not overwrite and key in self._contents: self._contents[key].append(value)
         else: self._contents[key] = [value]
@@ -409,8 +398,6 @@ class Config():
         for listener in self._listeners: listener(self, key)
       else:
         raise ValueError("Config.set() only accepts str, list, or tuple. Provided value was a '%s'" % type(value))
-    finally:
-      self._contents_lock.release()
   
   def get(self, key, default = None):
     """
@@ -498,20 +485,18 @@ class Config():
       key, providing the default if no such key exists
     """
     
-    self._contents_lock.acquire()
-    
-    if key in self._contents:
-      val = self._contents[key]
-      if not multiple: val = val[-1]
-      self._requested_keys.add(key)
-    else:
-      message_id = "stem.util.conf.missing_config_key_%s" % key
-      log.log_once(message_id, log.TRACE, "config entry '%s' not found, defaulting to '%s'" % (key, default))
-      val = default
-    
-    self._contents_lock.release()
-    
-    return val
+    with self._contents_lock:
+      if key in self._contents:
+        self._requested_keys.add(key)
+        
+        if multiple:
+          return self._contents[key]
+        else:
+          return self._contents[key][-1]
+      else:
+        message_id = "stem.util.conf.missing_config_key_%s" % key
+        log.log_once(message_id, log.TRACE, "config entry '%s' not found, defaulting to '%s'" % (key, default))
+        return default
   
   def get_str_csv(self, key, default = None, count = None, sub_key = None):
     """
diff --git a/test/runner.py b/test/runner.py
index 07a54fd..cf596d8 100644
--- a/test/runner.py
+++ b/test/runner.py
@@ -160,79 +160,75 @@ class Runner:
       OSError if unable to run test preparations or start tor
     """
     
-    self._runner_lock.acquire()
-    
-    # if we're holding on to a tor process (running or not) then clean up after
-    # it so we can start a fresh instance
-    if self._tor_process: self.stop()
-    
-    test.output.print_line("Setting up a test instance...", *STATUS_ATTR)
-    
-    # if 'test_directory' is unset then we make a new data directory in /tmp
-    # and clean it up when we're done
-    
-    config_test_dir = CONFIG["integ.test_directory"]
-    
-    if config_test_dir:
-      self._test_dir = stem.util.system.expand_path(config_test_dir, STEM_BASE)
-    else:
-      self._test_dir = tempfile.mktemp("-stem-integ")
-    
-    original_cwd, data_dir_path = os.getcwd(), self._test_dir
-    
-    if CONFIG["test.target.relative_data_dir"]:
-      tor_cwd = os.path.dirname(self._test_dir)
-      if not os.path.exists(tor_cwd): os.makedirs(tor_cwd)
+    with self._runner_lock:
+      # if we're holding on to a tor process (running or not) then clean up after
+      # it so we can start a fresh instance
+      if self._tor_process: self.stop()
       
-      os.chdir(tor_cwd)
-      data_dir_path = "./%s" % os.path.basename(self._test_dir)
-    
-    self._tor_cmd = tor_cmd
-    self._custom_opts = extra_torrc_opts
-    self._torrc_contents = BASE_TORRC % data_dir_path
-    
-    if extra_torrc_opts:
-      self._torrc_contents += "\n".join(extra_torrc_opts) + "\n"
-    
-    try:
-      self._tor_cwd = os.getcwd()
-      self._run_setup()
-      self._start_tor(tor_cmd)
+      test.output.print_line("Setting up a test instance...", *STATUS_ATTR)
+      
+      # if 'test_directory' is unset then we make a new data directory in /tmp
+      # and clean it up when we're done
+      
+      config_test_dir = CONFIG["integ.test_directory"]
+      
+      if config_test_dir:
+        self._test_dir = stem.util.system.expand_path(config_test_dir, STEM_BASE)
+      else:
+        self._test_dir = tempfile.mktemp("-stem-integ")
+      
+      original_cwd, data_dir_path = os.getcwd(), self._test_dir
       
-      # revert our cwd back to normal
       if CONFIG["test.target.relative_data_dir"]:
-        os.chdir(original_cwd)
-    except OSError, exc:
-      self.stop()
-      raise exc
-    finally:
-      self._runner_lock.release()
+        tor_cwd = os.path.dirname(self._test_dir)
+        if not os.path.exists(tor_cwd): os.makedirs(tor_cwd)
+        
+        os.chdir(tor_cwd)
+        data_dir_path = "./%s" % os.path.basename(self._test_dir)
+      
+      self._tor_cmd = tor_cmd
+      self._custom_opts = extra_torrc_opts
+      self._torrc_contents = BASE_TORRC % data_dir_path
+      
+      if extra_torrc_opts:
+        self._torrc_contents += "\n".join(extra_torrc_opts) + "\n"
+      
+      try:
+        self._tor_cwd = os.getcwd()
+        self._run_setup()
+        self._start_tor(tor_cmd)
+        
+        # revert our cwd back to normal
+        if CONFIG["test.target.relative_data_dir"]:
+          os.chdir(original_cwd)
+      except OSError, exc:
+        self.stop()
+        raise exc
   
   def stop(self):
     """
     Stops our tor test instance and cleans up any temporary resources.
     """
     
-    self._runner_lock.acquire()
-    test.output.print_noline("Shutting down tor... ", *STATUS_ATTR)
-    
-    if self._tor_process:
-      self._tor_process.kill()
-      self._tor_process.communicate() # blocks until the process is done
-    
-    # if we've made a temporary data directory then clean it up
-    if self._test_dir and CONFIG["integ.test_directory"] == "":
-      shutil.rmtree(self._test_dir, ignore_errors = True)
-    
-    self._test_dir = ""
-    self._tor_cmd = None
-    self._tor_cwd = ""
-    self._torrc_contents = ""
-    self._custom_opts = None
-    self._tor_process = None
-    
-    test.output.print_line("done", *STATUS_ATTR)
-    self._runner_lock.release()
+    with self._runner_lock:
+      test.output.print_noline("Shutting down tor... ", *STATUS_ATTR)
+      
+      if self._tor_process:
+        self._tor_process.kill()
+        self._tor_process.communicate() # blocks until the process is done
+      
+      # if we've made a temporary data directory then clean it up
+      if self._test_dir and CONFIG["integ.test_directory"] == "":
+        shutil.rmtree(self._test_dir, ignore_errors = True)
+      
+      self._test_dir = ""
+      self._tor_cmd = None
+      self._tor_cwd = ""
+      self._torrc_contents = ""
+      self._custom_opts = None
+      self._tor_process = None
+      
+      test.output.print_line("done", *STATUS_ATTR)
   
   def is_running(self):
     """
@@ -242,21 +238,16 @@ class Runner:
       True if we have a running tor test instance, False otherwise
     """
     
-    # subprocess.Popen.poll() checks the return code, returning None if it's
-    # still going
-    
-    self._runner_lock.acquire()
-    is_running = self._tor_process and self._tor_process.poll() == None
-    
-    # If the tor process closed unexpectedly then this is probably the first
-    # place that we're realizing it. Clean up the temporary resources now since
-    # we might not end up calling stop() as normal.
-    
-    if not is_running: self.stop(True)
-    
-    self._runner_lock.release()
-    
-    return is_running
+    with self._runner_lock:
+      # Check for an unexpected shutdown by calling subprocess.Popen.poll(),
+      # which returns the exit code or None if we're still running.
+      
+      if self._tor_process and self._tor_process.poll() != None:
+        # clean up the temporary resources and note the unexpected shutdown
+        self.stop()
+        test.output.print_line("tor shut down unexpectedly", *ERROR_ATTR)
+      
+      return bool(self._tor_process)
   
   def is_accessible(self):
     """
@@ -439,14 +430,10 @@ class Runner:
       RunnerStopped if we aren't running
     """
     
-    try:
-      self._runner_lock.acquire()
-      
+    with self._runner_lock:
       if self.is_running():
         return self.__dict__[attr]
       else: raise RunnerStopped()
-    finally:
-      self._runner_lock.release()
   
   def _run_setup(self):
     """



More information about the tor-commits mailing list