[tor-commits] [stem/master] Function and testing for password authentication

atagar at torproject.org atagar at torproject.org
Tue Nov 29 18:04:03 UTC 2011

commit 7f760f86414ee0bfbd050480e1753555c66e9a5b
Author: Damian Johnson <atagar at torproject.org>
Date:   Tue Nov 29 10:02:24 2011 -0800

    Function and testing for password authentication
    Adding a function for password authentication. This included escaping quotes
    but otherwise is trivial - most of the effort was refactoring the
    authentication integ tests.
 stem/connection.py                      |   30 +++++++
 test/integ/connection/authentication.py |  130 +++++++++++++++++++++++++++----
 2 files changed, 145 insertions(+), 15 deletions(-)

diff --git a/stem/connection.py b/stem/connection.py
index bef8d50..3ea5747 100644
--- a/stem/connection.py
+++ b/stem/connection.py
@@ -62,6 +62,36 @@ def authenticate_none(control_socket):
   if str(auth_response) != "OK":
     raise ValueError(str(auth_response))
+def authenticate_password(control_socket, password):
+  """
+  Authenticates to a control socket that uses a password (via the
+  HashedControlPassword torrc option). Quotes in the password are escaped.
+  If authentication fails then tor will close the control socket.
+  Arguments:
+    control_socket (stem.socket.ControlSocket) - socket to be authenticated
+    password (str) - passphrase to present to the socket
+  Raises:
+    ValueError if the authentication credentials aren't accepted
+    stem.socket.ProtocolError the content from the socket is malformed
+    stem.socket.SocketError if problems arise in using the socket
+  """
+  # Escapes quotes. Tor can include those in the password hash, in which case
+  # it expects escaped quotes from the controller. For more information see...
+  # https://trac.torproject.org/projects/tor/ticket/4600
+  password = password.replace('"', '\\"')
+  control_socket.send("AUTHENTICATE \"%s\"" % password)
+  auth_response = control_socket.recv()
+  # if we got anything but an OK response then error
+  if str(auth_response) != "OK":
+    raise ValueError(str(auth_response))
 def get_protocolinfo_by_port(control_addr = "", control_port = 9051, get_socket = False):
   Issues a PROTOCOLINFO query to a control port, getting information about the
diff --git a/test/integ/connection/authentication.py b/test/integ/connection/authentication.py
index 4ba0bfa..c3759e6 100644
--- a/test/integ/connection/authentication.py
+++ b/test/integ/connection/authentication.py
@@ -4,6 +4,7 @@ stem.connection.authenticate_* functions.
 import unittest
+import functools
 import test.runner
 import stem.connection
@@ -15,6 +16,9 @@ COOKIE_AUTH_FAIL = "Authentication failed: Wrong length on authentication cookie
 PASSWORD_AUTH_FAIL = "Authentication failed: Password did not match HashedControlPassword value from configuration. Maybe you tried a plain text password? If so, the standard requires that you put it in double quotes."
 MULTIPLE_AUTH_FAIL = "Authentication failed: Password did not match HashedControlPassword *or* authentication cookie."
+# this only arises in password-only auth when we authenticate by password
+INCORRECT_PASSWORD_FAIL = "Authentication failed: Password did not match HashedControlPassword value from configuration"
 class TestAuthenticate(unittest.TestCase):
   Tests the authentication methods. This should be run with the 'CONN_ALL'
@@ -32,31 +36,127 @@ class TestAuthenticate(unittest.TestCase):
     if connection_type == test.runner.TorConnection.NONE:
       self.skipTest("(no connection)")
-    # If the connection has authentication then this will fail with a message
-    # based on the authentication type. If not then this will succeed.
+    expect_success = self._is_authenticateable(stem.connection.AuthMethod.NONE)
+    self._check_auth(stem.connection.AuthMethod.NONE, None, expect_success)
+  def test_authenticate_password(self):
+    """
+    Tests the authenticate_password function.
+    """
+    runner = test.runner.get_runner()
+    connection_type = runner.get_connection_type()
+    if connection_type == test.runner.TorConnection.NONE:
+      self.skipTest("(no connection)")
+    expect_success = self._is_authenticateable(stem.connection.AuthMethod.PASSWORD)
+    self._check_auth(stem.connection.AuthMethod.PASSWORD, test.runner.CONTROL_PASSWORD, expect_success)
+    # Check with an empty, invalid, and quoted password. These should work if
+    # we have no authentication, and fail otherwise.
+    expect_success = self._is_authenticateable(stem.connection.AuthMethod.NONE)
+    self._check_auth(stem.connection.AuthMethod.PASSWORD, "", expect_success)
+    self._check_auth(stem.connection.AuthMethod.PASSWORD, "blarg", expect_success)
+    self._check_auth(stem.connection.AuthMethod.PASSWORD, "this has a \" in it", expect_success)
+  def _get_socket_auth(self):
+    """
+    Provides the types of authentication that our current test socket accepts.
-    control_socket = test.runner.get_runner().get_tor_socket(False)
+    Returns:
+      bool tuple of the form (password_auth, cookie_auth)
+    """
+    connection_type = test.runner.get_runner().get_connection_type()
     connection_options = test.runner.CONNECTION_OPTS[connection_type]
-    cookie_auth = test.runner.OPT_COOKIE in connection_options
     password_auth = test.runner.OPT_PASSWORD in connection_options
+    cookie_auth = test.runner.OPT_COOKIE in connection_options
-    if cookie_auth or password_auth:
-      if cookie_auth and password_auth: failure_msg = MULTIPLE_AUTH_FAIL
-      elif cookie_auth: failure_msg = COOKIE_AUTH_FAIL
-      else: failure_msg = PASSWORD_AUTH_FAIL
-      try:
-        stem.connection.authenticate_none(control_socket)
-        self.fail()
-      except ValueError, exc:
-        self.assertEqual(failure_msg, str(exc))
+    return password_auth, cookie_auth
+  def _is_authenticateable(self, auth_type):
+    """
+    Checks if the given authentication type should be able to authenticate to
+    our current socket.
+    Arguments:
+      auth_type (stem.connection.AuthMethod) - authentication method to check
+    Returns:
+      bool that's True if we should be able to authenticate and False otherwise
+    """
+    password_auth, cookie_auth = self._get_socket_auth()
+    # If the control socket is open then all authentication methods will be
+    # accepted. Otherwise check if our auth type matches what the socket
+    # accepts.
+    if not password_auth and not cookie_auth: return True
+    elif auth_type == stem.connection.AuthMethod.PASSWORD: return password_auth
+    elif auth_type == stem.connection.AuthMethod.COOKIE: return cookie_auth
+    else: return False
+  def _check_auth(self, auth_type, auth_value, expect_success):
+    """
+    Attempts to use the given authentication function against our connection.
+    If this works then checks that we can use the connection. If not then we
+    check that the error message is what we'd expect.
+    Arguments:
+      auth_type (stem.connection.AuthMethod) - method by which we should
+          authentiate to the control socket
+      auth_value (str) - value to be provided to the authentication function
+      expect_success (bool) - true if the authentication should succeed, false
+          otherwise
+    """
+    runner = test.runner.get_runner()
+    control_socket = runner.get_tor_socket(False)
+    password_auth, cookie_auth = self._get_socket_auth()
+    # construct the function call
+    if auth_type == stem.connection.AuthMethod.NONE:
+      auth_function = stem.connection.authenticate_none
+    elif auth_type == stem.connection.AuthMethod.PASSWORD:
+      auth_function = stem.connection.authenticate_password
+    elif auth_type == stem.connection.AuthMethod.COOKIE:
+      auth_function = None # TODO: fill in
+    else:
+      raise ValueError("unexpected auth type: %s" % auth_type)
+    if auth_value != None:
+      auth_function = functools.partial(auth_function, control_socket, auth_value)
-      stem.connection.authenticate_none(control_socket)
+      auth_function = functools.partial(auth_function, control_socket)
+    if expect_success:
+      auth_function()
       # issues a 'GETINFO config-file' query to confirm that we can use the socket
       control_socket.send("GETINFO config-file")
       config_file_response = control_socket.recv()
       self.assertEquals("config-file=%s\nOK" % runner.get_torrc_path(), str(config_file_response))
+      control_socket.close()
+    else:
+      if cookie_auth and password_auth: failure_msg = MULTIPLE_AUTH_FAIL
+      elif cookie_auth: failure_msg = COOKIE_AUTH_FAIL
+      else:
+        # if we're attempting to authenticate with a password then it's a
+        # truncated message
+        if auth_type == stem.connection.AuthMethod.PASSWORD:
+          failure_msg = INCORRECT_PASSWORD_FAIL
+        else:
+          failure_msg = PASSWORD_AUTH_FAIL
+      try:
+        auth_function()
+        self.fail()
+      except ValueError, exc:
+        self.assertEqual(failure_msg, str(exc))

More information about the tor-commits mailing list