[tor-commits] [stem/master] Function and testing for password authentication
atagar at torproject.org
atagar at torproject.org
Tue Nov 29 18:04:03 UTC 2011
commit 7f760f86414ee0bfbd050480e1753555c66e9a5b
Author: Damian Johnson <atagar at torproject.org>
Date: Tue Nov 29 10:02:24 2011 -0800
Function and testing for password authentication
Adding a function for password authentication. This included escaping quotes
but otherwise is trivial - most of the effort was refactoring the
authentication integ tests.
---
stem/connection.py | 30 +++++++
test/integ/connection/authentication.py | 130 +++++++++++++++++++++++++++----
2 files changed, 145 insertions(+), 15 deletions(-)
diff --git a/stem/connection.py b/stem/connection.py
index bef8d50..3ea5747 100644
--- a/stem/connection.py
+++ b/stem/connection.py
@@ -62,6 +62,36 @@ def authenticate_none(control_socket):
if str(auth_response) != "OK":
raise ValueError(str(auth_response))
+def authenticate_password(control_socket, password):
+ """
+ Authenticates to a control socket that uses a password (via the
+ HashedControlPassword torrc option). Quotes in the password are escaped.
+
+ If authentication fails then tor will close the control socket.
+
+ Arguments:
+ control_socket (stem.socket.ControlSocket) - socket to be authenticated
+ password (str) - passphrase to present to the socket
+
+ Raises:
+ ValueError if the authentication credentials aren't accepted
+ stem.socket.ProtocolError the content from the socket is malformed
+ stem.socket.SocketError if problems arise in using the socket
+ """
+
+ # Escapes quotes. Tor can include those in the password hash, in which case
+ # it expects escaped quotes from the controller. For more information see...
+ # https://trac.torproject.org/projects/tor/ticket/4600
+
+ password = password.replace('"', '\\"')
+
+ control_socket.send("AUTHENTICATE \"%s\"" % password)
+ auth_response = control_socket.recv()
+
+ # if we got anything but an OK response then error
+ if str(auth_response) != "OK":
+ raise ValueError(str(auth_response))
+
def get_protocolinfo_by_port(control_addr = "127.0.0.1", control_port = 9051, get_socket = False):
"""
Issues a PROTOCOLINFO query to a control port, getting information about the
diff --git a/test/integ/connection/authentication.py b/test/integ/connection/authentication.py
index 4ba0bfa..c3759e6 100644
--- a/test/integ/connection/authentication.py
+++ b/test/integ/connection/authentication.py
@@ -4,6 +4,7 @@ stem.connection.authenticate_* functions.
"""
import unittest
+import functools
import test.runner
import stem.connection
@@ -15,6 +16,9 @@ COOKIE_AUTH_FAIL = "Authentication failed: Wrong length on authentication cookie
PASSWORD_AUTH_FAIL = "Authentication failed: Password did not match HashedControlPassword value from configuration. Maybe you tried a plain text password? If so, the standard requires that you put it in double quotes."
MULTIPLE_AUTH_FAIL = "Authentication failed: Password did not match HashedControlPassword *or* authentication cookie."
+# this only arises in password-only auth when we authenticate by password
+INCORRECT_PASSWORD_FAIL = "Authentication failed: Password did not match HashedControlPassword value from configuration"
+
class TestAuthenticate(unittest.TestCase):
"""
Tests the authentication methods. This should be run with the 'CONN_ALL'
@@ -32,31 +36,127 @@ class TestAuthenticate(unittest.TestCase):
if connection_type == test.runner.TorConnection.NONE:
self.skipTest("(no connection)")
- # If the connection has authentication then this will fail with a message
- # based on the authentication type. If not then this will succeed.
+ expect_success = self._is_authenticateable(stem.connection.AuthMethod.NONE)
+ self._check_auth(stem.connection.AuthMethod.NONE, None, expect_success)
+
+ def test_authenticate_password(self):
+ """
+ Tests the authenticate_password function.
+ """
+
+ runner = test.runner.get_runner()
+ connection_type = runner.get_connection_type()
+
+ if connection_type == test.runner.TorConnection.NONE:
+ self.skipTest("(no connection)")
+
+ expect_success = self._is_authenticateable(stem.connection.AuthMethod.PASSWORD)
+ self._check_auth(stem.connection.AuthMethod.PASSWORD, test.runner.CONTROL_PASSWORD, expect_success)
+
+ # Check with an empty, invalid, and quoted password. These should work if
+ # we have no authentication, and fail otherwise.
+
+ expect_success = self._is_authenticateable(stem.connection.AuthMethod.NONE)
+ self._check_auth(stem.connection.AuthMethod.PASSWORD, "", expect_success)
+ self._check_auth(stem.connection.AuthMethod.PASSWORD, "blarg", expect_success)
+ self._check_auth(stem.connection.AuthMethod.PASSWORD, "this has a \" in it", expect_success)
+
+ def _get_socket_auth(self):
+ """
+ Provides the types of authentication that our current test socket accepts.
- control_socket = test.runner.get_runner().get_tor_socket(False)
+ Returns:
+ bool tuple of the form (password_auth, cookie_auth)
+ """
+ connection_type = test.runner.get_runner().get_connection_type()
connection_options = test.runner.CONNECTION_OPTS[connection_type]
- cookie_auth = test.runner.OPT_COOKIE in connection_options
password_auth = test.runner.OPT_PASSWORD in connection_options
+ cookie_auth = test.runner.OPT_COOKIE in connection_options
- if cookie_auth or password_auth:
- if cookie_auth and password_auth: failure_msg = MULTIPLE_AUTH_FAIL
- elif cookie_auth: failure_msg = COOKIE_AUTH_FAIL
- else: failure_msg = PASSWORD_AUTH_FAIL
-
- try:
- stem.connection.authenticate_none(control_socket)
- self.fail()
- except ValueError, exc:
- self.assertEqual(failure_msg, str(exc))
+ return password_auth, cookie_auth
+
+ def _is_authenticateable(self, auth_type):
+ """
+ Checks if the given authentication type should be able to authenticate to
+ our current socket.
+
+ Arguments:
+ auth_type (stem.connection.AuthMethod) - authentication method to check
+
+ Returns:
+ bool that's True if we should be able to authenticate and False otherwise
+ """
+
+ password_auth, cookie_auth = self._get_socket_auth()
+
+ # If the control socket is open then all authentication methods will be
+ # accepted. Otherwise check if our auth type matches what the socket
+ # accepts.
+
+ if not password_auth and not cookie_auth: return True
+ elif auth_type == stem.connection.AuthMethod.PASSWORD: return password_auth
+ elif auth_type == stem.connection.AuthMethod.COOKIE: return cookie_auth
+ else: return False
+
+ def _check_auth(self, auth_type, auth_value, expect_success):
+ """
+ Attempts to use the given authentication function against our connection.
+ If this works then checks that we can use the connection. If not then we
+ check that the error message is what we'd expect.
+
+ Arguments:
+ auth_type (stem.connection.AuthMethod) - method by which we should
+ authentiate to the control socket
+ auth_value (str) - value to be provided to the authentication function
+ expect_success (bool) - true if the authentication should succeed, false
+ otherwise
+ """
+
+ runner = test.runner.get_runner()
+ control_socket = runner.get_tor_socket(False)
+ password_auth, cookie_auth = self._get_socket_auth()
+
+ # construct the function call
+
+ if auth_type == stem.connection.AuthMethod.NONE:
+ auth_function = stem.connection.authenticate_none
+ elif auth_type == stem.connection.AuthMethod.PASSWORD:
+ auth_function = stem.connection.authenticate_password
+ elif auth_type == stem.connection.AuthMethod.COOKIE:
+ auth_function = None # TODO: fill in
+ else:
+ raise ValueError("unexpected auth type: %s" % auth_type)
+
+ if auth_value != None:
+ auth_function = functools.partial(auth_function, control_socket, auth_value)
else:
- stem.connection.authenticate_none(control_socket)
+ auth_function = functools.partial(auth_function, control_socket)
+
+ if expect_success:
+ auth_function()
# issues a 'GETINFO config-file' query to confirm that we can use the socket
control_socket.send("GETINFO config-file")
config_file_response = control_socket.recv()
self.assertEquals("config-file=%s\nOK" % runner.get_torrc_path(), str(config_file_response))
+ control_socket.close()
+ else:
+ if cookie_auth and password_auth: failure_msg = MULTIPLE_AUTH_FAIL
+ elif cookie_auth: failure_msg = COOKIE_AUTH_FAIL
+ else:
+ # if we're attempting to authenticate with a password then it's a
+ # truncated message
+
+ if auth_type == stem.connection.AuthMethod.PASSWORD:
+ failure_msg = INCORRECT_PASSWORD_FAIL
+ else:
+ failure_msg = PASSWORD_AUTH_FAIL
+
+ try:
+ auth_function()
+ self.fail()
+ except ValueError, exc:
+ self.assertEqual(failure_msg, str(exc))
More information about the tor-commits
mailing list