[tor-commits] [stem/master] Rewriting authentication integ tests
atagar at torproject.org
atagar at torproject.org
Sun Jan 29 08:54:42 UTC 2012
commit a73be509009942b4b2711b49a3b30e3c3f6e609b
Author: Damian Johnson <atagar at torproject.org>
Date: Sat Jan 28 17:06:36 2012 -0800
Rewriting authentication integ tests
Excessive testing helper functions are bad. They hurt test readability and
makes the code a pita to trace through. This cleans up the authentication
integration tests to collapse the helper functions into just the few that make
life better. Much more maintainable now. \o/
---
stem/socket.py | 6 +
test/integ/connection/authentication.py | 276 ++++++++++++-------------------
2 files changed, 116 insertions(+), 166 deletions(-)
diff --git a/stem/socket.py b/stem/socket.py
index f3aeaf8..889713d 100644
--- a/stem/socket.py
+++ b/stem/socket.py
@@ -217,6 +217,12 @@ class ControlSocket:
self._send_cond.release()
self._recv_cond.release()
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self.close()
+
def _make_socket(self):
"""
Constructs and connects new socket. This is implemented by subclasses.
diff --git a/test/integ/connection/authentication.py b/test/integ/connection/authentication.py
index bc94ead..cc36c66 100644
--- a/test/integ/connection/authentication.py
+++ b/test/integ/connection/authentication.py
@@ -5,7 +5,6 @@ stem.connection.authenticate* functions.
import os
import unittest
-import functools
import test.runner
import stem.connection
@@ -23,6 +22,60 @@ MULTIPLE_AUTH_FAIL = "Authentication failed: Password did not match HashedContro
INCORRECT_COOKIE_FAIL = "Authentication failed: Authentication cookie did not match expected value."
INCORRECT_PASSWORD_FAIL = "Authentication failed: Password did not match HashedControlPassword value from configuration"
+def _can_authenticate(auth_type):
+ """
+ Checks if a given authentication method can authenticate to our control
+ socket.
+
+ Arguments:
+ auth_type (stem.connection.AuthMethod) - authentication method to check
+
+ Returns:
+ bool that's True if we should be able to authenticate and False otherwise
+ """
+
+ tor_options = test.runner.get_runner().get_options()
+ password_auth = test.runner.Torrc.PASSWORD in tor_options
+ cookie_auth = test.runner.Torrc.COOKIE in tor_options
+
+ if not password_auth and not cookie_auth: return True # open socket
+ elif auth_type == stem.connection.AuthMethod.PASSWORD: return password_auth
+ elif auth_type == stem.connection.AuthMethod.COOKIE: return cookie_auth
+ else: return False
+
+def _get_auth_failure_message(auth_type):
+ """
+ Provides the message that tor will respond with if our current method of
+ authentication fails. Note that this test will need to be updated if tor
+ changes its rejection reponse.
+
+ Arguments:
+ auth_type (stem.connection.AuthMethod) - authentication method to check
+
+ Returns:
+ string with the rejection message that tor would provide
+ """
+
+ tor_options = test.runner.get_runner().get_options()
+ password_auth = test.runner.Torrc.PASSWORD in tor_options
+ cookie_auth = test.runner.Torrc.COOKIE in tor_options
+
+ if cookie_auth and password_auth:
+ return MULTIPLE_AUTH_FAIL
+ elif cookie_auth:
+ if auth_type == stem.connection.AuthMethod.COOKIE:
+ return INCORRECT_COOKIE_FAIL
+ else:
+ return COOKIE_AUTH_FAIL
+ elif password_auth:
+ if auth_type == stem.connection.AuthMethod.PASSWORD:
+ return INCORRECT_PASSWORD_FAIL
+ else:
+ return PASSWORD_AUTH_FAIL
+ else:
+ # shouldn't happen, if so then the test has a bug
+ raise ValueError("No methods of authentication. If this is an open socket then auth shoulnd't fail.")
+
class TestAuthenticate(unittest.TestCase):
def setUp(self):
# none of these tests apply if there's no control connection
@@ -34,10 +87,9 @@ class TestAuthenticate(unittest.TestCase):
Tests that the authenticate function can authenticate to our socket.
"""
- control_socket = test.runner.get_runner().get_tor_socket(False)
- stem.connection.authenticate(control_socket, test.runner.CONTROL_PASSWORD)
- test.runner.exercise_socket(self, control_socket)
- control_socket.close()
+ with test.runner.get_runner().get_tor_socket(False) as control_socket:
+ stem.connection.authenticate(control_socket, test.runner.CONTROL_PASSWORD)
+ test.runner.exercise_socket(self, control_socket)
def test_authenticate_general_example(self):
"""
@@ -87,34 +139,25 @@ class TestAuthenticate(unittest.TestCase):
is_password_only = test.runner.Torrc.PASSWORD in tor_options and not test.runner.Torrc.COOKIE in tor_options
# tests without a password
- control_socket = runner.get_tor_socket(False)
- auth_function = functools.partial(stem.connection.authenticate, control_socket)
-
- if is_password_only:
- self.assertRaises(stem.connection.MissingPassword, auth_function)
- else:
- auth_function()
- test.runner.exercise_socket(self, control_socket)
-
- control_socket.close()
+ with runner.get_tor_socket(False) as control_socket:
+ if is_password_only:
+ self.assertRaises(stem.connection.MissingPassword, stem.connection.authenticate, control_socket)
+ else:
+ stem.connection.authenticate(control_socket)
+ test.runner.exercise_socket(self, control_socket)
# tests with the incorrect password
- control_socket = runner.get_tor_socket(False)
- auth_function = functools.partial(stem.connection.authenticate, control_socket, "blarg")
-
- if is_password_only:
- self.assertRaises(stem.connection.IncorrectPassword, auth_function)
- else:
- auth_function()
- test.runner.exercise_socket(self, control_socket)
-
- control_socket.close()
+ with runner.get_tor_socket(False) as control_socket:
+ if is_password_only:
+ self.assertRaises(stem.connection.IncorrectPassword, stem.connection.authenticate, control_socket, "blarg")
+ else:
+ stem.connection.authenticate(control_socket, "blarg")
+ test.runner.exercise_socket(self, control_socket)
# tests with the right password
- control_socket = runner.get_tor_socket(False)
- stem.connection.authenticate(control_socket, test.runner.CONTROL_PASSWORD)
- test.runner.exercise_socket(self, control_socket)
- control_socket.close()
+ with runner.get_tor_socket(False) as control_socket:
+ stem.connection.authenticate(control_socket, test.runner.CONTROL_PASSWORD)
+ test.runner.exercise_socket(self, control_socket)
def test_authenticate_none(self):
"""
@@ -122,11 +165,11 @@ class TestAuthenticate(unittest.TestCase):
"""
auth_type = stem.connection.AuthMethod.NONE
- if self._can_authenticate(auth_type):
+
+ if _can_authenticate(auth_type):
self._check_auth(auth_type)
else:
self.assertRaises(stem.connection.OpenAuthRejected, self._check_auth, auth_type)
- self._assert_auth_rejected_msg(auth_type)
def test_authenticate_password(self):
"""
@@ -136,26 +179,24 @@ class TestAuthenticate(unittest.TestCase):
auth_type = stem.connection.AuthMethod.PASSWORD
auth_value = test.runner.CONTROL_PASSWORD
- if self._can_authenticate(auth_type):
+ if _can_authenticate(auth_type):
self._check_auth(auth_type, auth_value)
else:
self.assertRaises(stem.connection.PasswordAuthRejected, self._check_auth, auth_type, auth_value)
- self._assert_auth_rejected_msg(auth_type, auth_value)
# Check with an empty, invalid, and quoted password. These should work if
# we have no authentication, and fail otherwise.
for auth_value in ("", "blarg", "this has a \" in it"):
- if self._can_authenticate(stem.connection.AuthMethod.NONE):
+ if _can_authenticate(stem.connection.AuthMethod.NONE):
self._check_auth(auth_type, auth_value)
else:
- if self._can_authenticate(stem.connection.AuthMethod.PASSWORD):
+ if _can_authenticate(stem.connection.AuthMethod.PASSWORD):
exc_type = stem.connection.IncorrectPassword
else:
exc_type = stem.connection.PasswordAuthRejected
self.assertRaises(exc_type, self._check_auth, auth_type, auth_value)
- self._assert_auth_rejected_msg(auth_type, auth_value)
def test_authenticate_cookie(self):
"""
@@ -173,11 +214,10 @@ class TestAuthenticate(unittest.TestCase):
# missing file.
self.assertRaises(stem.connection.UnreadableCookieFile, self._check_auth, auth_type, auth_value)
- elif self._can_authenticate(auth_type):
+ elif _can_authenticate(auth_type):
self._check_auth(auth_type, auth_value)
else:
self.assertRaises(stem.connection.CookieAuthRejected, self._check_auth, auth_type, auth_value)
- self._assert_auth_rejected_msg(auth_type, auth_value)
def test_authenticate_cookie_invalid(self):
"""
@@ -193,17 +233,16 @@ class TestAuthenticate(unittest.TestCase):
fake_cookie.write("0" * 32)
fake_cookie.close()
- if self._can_authenticate(stem.connection.AuthMethod.NONE):
+ if _can_authenticate(stem.connection.AuthMethod.NONE):
# authentication will work anyway
self._check_auth(auth_type, auth_value)
else:
- if self._can_authenticate(auth_type):
+ if _can_authenticate(auth_type):
exc_type = stem.connection.IncorrectCookieValue
else:
exc_type = stem.connection.CookieAuthRejected
self.assertRaises(exc_type, self._check_auth, auth_type, auth_value)
- self._assert_auth_rejected_msg(auth_type, auth_value)
os.remove(auth_value)
@@ -215,7 +254,7 @@ class TestAuthenticate(unittest.TestCase):
auth_type = stem.connection.AuthMethod.COOKIE
auth_value = "/if/this/exists/then/they're/asking/for/a/failure"
- self.assertRaises(stem.connection.UnreadableCookieFile, self._check_auth, auth_type, auth_value)
+ self.assertRaises(stem.connection.UnreadableCookieFile, self._check_auth, auth_type, auth_value, False)
def test_authenticate_cookie_wrong_size(self):
"""
@@ -231,139 +270,44 @@ class TestAuthenticate(unittest.TestCase):
# Weird coincidence? Fail so we can pick another file to check against.
self.fail("Our torrc is 32 bytes, preventing the test_authenticate_cookie_wrong_size test from running.")
else:
- self.assertRaises(stem.connection.IncorrectCookieSize, self._check_auth, auth_type, auth_value)
-
- def _get_socket_auth(self):
- """
- Provides the types of authentication that our current test socket accepts.
-
- Returns:
- bool tuple of the form (password_auth, cookie_auth)
- """
-
- tor_options = test.runner.get_runner().get_options()
- password_auth = test.runner.Torrc.PASSWORD in tor_options
- cookie_auth = test.runner.Torrc.COOKIE in tor_options
-
- return password_auth, cookie_auth
-
- def _can_authenticate(self, auth_type):
- """
- Checks if the given authentication type should be able to authenticate to
- our current socket.
-
- Arguments:
- auth_type (stem.connection.AuthMethod) - authentication method to check
-
- Returns:
- bool that's True if we should be able to authenticate and False otherwise
- """
-
- password_auth, cookie_auth = self._get_socket_auth()
-
- # If the control socket is open then all authentication methods will be
- # accepted. Otherwise check if our auth type matches what the socket
- # accepts.
-
- if not password_auth and not cookie_auth: return True
- elif auth_type == stem.connection.AuthMethod.PASSWORD: return password_auth
- elif auth_type == stem.connection.AuthMethod.COOKIE: return cookie_auth
- else: return False
-
- def _get_auth_function(self, control_socket, auth_type, *auth_args):
- """
- Constructs a functor that performs the given authentication without
- additional arguments.
-
- Arguments:
- control_socket (stem.socket.ControlSocket) - socket for the function to
- authenticate to
- auth_type (stem.connection.AuthMethod) - method by which we should
- authentiate to the control socket
- auth_args (str) - arguments to be passed to the authentication function
- """
-
- if auth_type == stem.connection.AuthMethod.NONE:
- auth_function = stem.connection.authenticate_none
- elif auth_type == stem.connection.AuthMethod.PASSWORD:
- auth_function = stem.connection.authenticate_password
- elif auth_type == stem.connection.AuthMethod.COOKIE:
- auth_function = stem.connection.authenticate_cookie
- else:
- raise ValueError("unexpected auth type: %s" % auth_type)
-
- if auth_args:
- return functools.partial(auth_function, control_socket, *auth_args)
- else:
- return functools.partial(auth_function, control_socket)
-
- def _assert_auth_rejected_msg(self, auth_type, *auth_args):
- """
- This asserts that authentication will fail with the rejection message given
- by tor. Note that this test will need to be updated if tor changes its
- rejection reponse.
-
- Arguments:
- auth_type (stem.connection.AuthMethod) - method by which we should
- authentiate to the control socket
- auth_args (str) - arguments to be passed to the authentication function
- """
-
- control_socket = test.runner.get_runner().get_tor_socket(False)
- auth_function = self._get_auth_function(control_socket, auth_type, *auth_args)
- password_auth, cookie_auth = self._get_socket_auth()
-
- if cookie_auth and password_auth:
- failure_msg = MULTIPLE_AUTH_FAIL
- elif cookie_auth:
- if auth_type == stem.connection.AuthMethod.COOKIE:
- failure_msg = INCORRECT_COOKIE_FAIL
- else:
- failure_msg = COOKIE_AUTH_FAIL
- elif password_auth:
- if auth_type == stem.connection.AuthMethod.PASSWORD:
- failure_msg = INCORRECT_PASSWORD_FAIL
- else:
- failure_msg = PASSWORD_AUTH_FAIL
- else:
- # shouldn't happen, if so then the test has a bug
- raise ValueError("No methods of authentication. If this is an open socket then auth shoulnd't fail.")
-
- try:
- auth_function()
- control_socket.close()
- self.fail()
- except stem.connection.AuthenticationFailure, exc:
- self.assertTrue(control_socket.is_alive())
- self.assertEqual(failure_msg, str(exc))
- control_socket.close()
+ self.assertRaises(stem.connection.IncorrectCookieSize, self._check_auth, auth_type, auth_value, False)
- def _check_auth(self, auth_type, *auth_args):
+ def _check_auth(self, auth_type, auth_arg = None, check_message = True):
"""
- Attempts to use the given authentication function against our connection.
- If this works then checks that we can use the connection. If not then this
- raises the exception.
+ Attempts to use the given type of authentication against tor's control
+ socket. If it succeeds then we check that the socket can then be used. If
+ not then we check that this gives a message that we'd expect then raises
+ the exception.
Arguments:
auth_type (stem.connection.AuthMethod) - method by which we should
authentiate to the control socket
- auth_args (str) - arguments to be passed to the authentication function
+ auth_arg (str) - argument to be passed to the authentication function
+ check_message (bool) - checks that failure messages are what we'd expect
Raises:
stem.connection.AuthenticationFailure if the authentication fails
"""
- control_socket = test.runner.get_runner().get_tor_socket(False)
- auth_function = self._get_auth_function(control_socket, auth_type, *auth_args)
-
- # run the authentication, re-raising if there's a problem
- try:
- auth_function()
- except stem.connection.AuthenticationFailure, exc:
- self.assertTrue(control_socket.is_alive())
- control_socket.close()
- raise exc
-
- test.runner.exercise_socket(self, control_socket)
- control_socket.close()
+ with test.runner.get_runner().get_tor_socket(False) as control_socket:
+ # run the authentication, re-raising if there's a problem
+ try:
+ if auth_type == stem.connection.AuthMethod.NONE:
+ stem.connection.authenticate_none(control_socket)
+ elif auth_type == stem.connection.AuthMethod.PASSWORD:
+ stem.connection.authenticate_password(control_socket, auth_arg)
+ elif auth_type == stem.connection.AuthMethod.COOKIE:
+ stem.connection.authenticate_cookie(control_socket, auth_arg)
+
+ test.runner.exercise_socket(self, control_socket)
+ except stem.connection.AuthenticationFailure, exc:
+ # authentication functions should re-attach on failure
+ self.assertTrue(control_socket.is_alive())
+
+ # check that we got the failure message that we'd expect
+ if check_message:
+ failure_msg = _get_auth_failure_message(auth_type)
+ self.assertEqual(failure_msg, str(exc))
+
+ raise exc
More information about the tor-commits
mailing list