[tor-commits] [stem/master] Unit tests for DirectoryAuthority fields
atagar at torproject.org
atagar at torproject.org
Sat Oct 13 18:35:45 UTC 2012
commit dc89b293bb3f8a282766971597ada90723f17fa7
Author: Damian Johnson <atagar at torproject.org>
Date: Sat Oct 6 13:51:29 2012 -0700
Unit tests for DirectoryAuthority fields
Tests for the DirectoryAuthority's individual fields, and fixes for a couple
issues they uncovered.
---
stem/descriptor/networkstatus.py | 33 +++--
.../networkstatus/directory_authority.py | 142 ++++++++++++++++++++
2 files changed, 164 insertions(+), 11 deletions(-)
diff --git a/stem/descriptor/networkstatus.py b/stem/descriptor/networkstatus.py
index 039c90c..80eb17b 100644
--- a/stem/descriptor/networkstatus.py
+++ b/stem/descriptor/networkstatus.py
@@ -726,14 +726,13 @@ class DirectoryAuthority(stem.descriptor.Descriptor):
"""
# separate the directory authority entry from its key certificate
- key_cert_content = None
+ key_div = content.find('\ndir-key-certificate-version')
- if is_vote:
- key_div = content.find('\ndir-key-certificate-version')
-
- if key_div != -1:
- key_cert_content = content[key_div + 1:]
- content = content[:key_div + 1]
+ if key_div != -1:
+ key_cert_content = content[key_div + 1:]
+ content = content[:key_div + 1]
+ else:
+ key_cert_content = None
entries, first_keyword, _, _ = stem.descriptor._get_descriptor_components(content, validate)
@@ -743,16 +742,28 @@ class DirectoryAuthority(stem.descriptor.Descriptor):
# check that we have mandatory fields
if validate:
- required_fields = ["dir-source", "contact"]
+ required_fields, excluded_fields = ["dir-source", "contact"], []
- if is_vote and not key_cert_content:
- raise ValueError("Authority votes must have a key certificate:\n%s" % content)
+ if is_vote:
+ if not key_cert_content:
+ raise ValueError("Authority votes must have a key certificate:\n%s" % content)
+
+ excluded_fields += ["vote-digest"]
elif not is_vote:
+ if key_cert_content:
+ raise ValueError("Authority consensus entries shouldn't have a key certificate:\n%s" % content)
+
required_fields += ["vote-digest"]
+ excluded_fields += ["legacy-dir-key"]
for keyword in required_fields:
if not keyword in entries:
raise ValueError("Authority entries must have a '%s' line:\n%s" % (keyword, content))
+
+ for keyword in entries:
+ if keyword in excluded_fields:
+ type_label = "votes" if is_vote else "consensus entries"
+ raise ValueError("Authority %s shouldn't have a '%s' line:\n%s" % (type_label, keyword, content))
for keyword, values in entries.items():
value, block_contents = values[0]
@@ -814,7 +825,7 @@ class DirectoryAuthority(stem.descriptor.Descriptor):
self._unrecognized_lines.append(line)
if key_cert_content:
- self.key_certificate = KeyCertificate(key_cert_content)
+ self.key_certificate = KeyCertificate(key_cert_content, validate)
def get_unrecognized_lines(self):
"""
diff --git a/test/unit/descriptor/networkstatus/directory_authority.py b/test/unit/descriptor/networkstatus/directory_authority.py
index 1405d12..b032b48 100644
--- a/test/unit/descriptor/networkstatus/directory_authority.py
+++ b/test/unit/descriptor/networkstatus/directory_authority.py
@@ -106,6 +106,25 @@ class TestDirectoryAuthority(unittest.TestCase):
authority = DirectoryAuthority(content, False)
self.assertEqual("turtles", authority.nickname)
+ def test_missing_dir_source_field(self):
+ """
+ Excludes fields from the 'dir-source' line.
+ """
+
+ for missing_value in AUTHORITY_HEADER[0][1].split(' '):
+ dir_source = AUTHORITY_HEADER[0][1].replace(missing_value, '').replace(' ', ' ')
+ content = get_directory_authority({"dir-source": dir_source}, content = True)
+ self.assertRaises(ValueError, DirectoryAuthority, content)
+
+ authority = DirectoryAuthority(content, False)
+
+ self.assertEqual(None, authority.nickname)
+ self.assertEqual(None, authority.fingerprint)
+ self.assertEqual(None, authority.hostname)
+ self.assertEqual(None, authority.address)
+ self.assertEqual(None, authority.dir_port)
+ self.assertEqual(None, authority.or_port)
+
def test_empty_values(self):
"""
The 'dir-source' line has a couple string values where anything (without
@@ -137,4 +156,127 @@ class TestDirectoryAuthority(unittest.TestCase):
dir_source = AUTHORITY_HEADER[0][1].replace('no.place.com', '')
authority = get_directory_authority({"dir-source": dir_source})
self.assertEqual('', authority.hostname)
+
+ def test_malformed_fingerprint(self):
+ """
+ Includes a malformed fingerprint on the 'dir-source' line.
+ """
+
+ test_values = (
+ "",
+ "zzzzz",
+ "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
+ )
+
+ for value in test_values:
+ dir_source = AUTHORITY_HEADER[0][1].replace('27B6B5996C426270A5C95488AA5BCEB6BCC86956', value)
+ content = get_directory_authority({"dir-source": dir_source}, content = True)
+ self.assertRaises(ValueError, DirectoryAuthority, content)
+
+ authority = DirectoryAuthority(content, False)
+ self.assertEqual(value, authority.fingerprint)
+
+ def test_malformed_address(self):
+ """
+ Includes a malformed ip address on the 'dir-source' line.
+ """
+
+ test_values = (
+ "",
+ "71.35.150.",
+ "71.35..29",
+ "71.35.150",
+ "71.35.150.256",
+ "[fd9f:2e19:3bcf::02:9970]",
+ )
+
+ for value in test_values:
+ dir_source = AUTHORITY_HEADER[0][1].replace('76.73.17.194', value)
+ content = get_directory_authority({"dir-source": dir_source}, content = True)
+ self.assertRaises(ValueError, DirectoryAuthority, content)
+
+ authority = DirectoryAuthority(content, False)
+ self.assertEqual(value, authority.address)
+
+ def test_malformed_port(self):
+ """
+ Includes a malformed orport or dirport on the 'dir-source' line.
+ """
+
+ test_values = (
+ "",
+ "-1",
+ "399482",
+ "blarg",
+ )
+
+ for value in test_values:
+ for include_or_port in (False, True):
+ for include_dir_port in (False, True):
+ if not include_or_port and not include_dir_port:
+ continue
+
+ dir_source = AUTHORITY_HEADER[0][1]
+
+ if include_or_port:
+ dir_source = dir_source.replace('9090', value)
+
+ if include_dir_port:
+ dir_source = dir_source.replace('9030', value)
+
+ content = get_directory_authority({"dir-source": dir_source}, content = True)
+ self.assertRaises(ValueError, DirectoryAuthority, content)
+
+ authority = DirectoryAuthority(content, False)
+
+ expected_value = 399482 if value == "399482" else None
+ actual_value = authority.or_port if include_or_port else authority.dir_port
+ self.assertEqual(expected_value, actual_value)
+
+ def test_legacy_dir_key(self):
+ """
+ Includes a 'legacy-dir-key' line with both valid and invalid content.
+ """
+
+ test_value = "65968CCB6BECB5AA88459C5A072624C6995B6B72"
+ authority = get_directory_authority({"legacy-dir-key": test_value}, is_vote = True)
+ self.assertEqual(test_value, authority.legacy_dir_key)
+
+ # check that we'll fail if legacy-dir-key appears in a consensus
+ content = get_directory_authority({"legacy-dir-key": test_value}, content = True)
+ self.assertRaises(ValueError, DirectoryAuthority, content)
+
+ test_values = (
+ "",
+ "zzzzz",
+ "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
+ )
+
+ for value in test_values:
+ content = get_directory_authority({"legacy-dir-key": value}, content = True)
+ self.assertRaises(ValueError, DirectoryAuthority, content)
+
+ authority = DirectoryAuthority(content, False)
+ self.assertEqual(value, authority.legacy_dir_key)
+
+ def test_key_certificate(self):
+ """
+ Includes or exclude a key certificate from the directory entry.
+ """
+
+ key_cert = get_key_certificate()
+
+ # include a key cert with a consensus
+ content = get_directory_authority(content = True) + "\n" + str(key_cert)
+ self.assertRaises(ValueError, DirectoryAuthority, content)
+
+ authority = DirectoryAuthority(content, False)
+ self.assertEqual('turtles', authority.nickname)
+
+ # exclude key cert from a vote
+ content = get_directory_authority(content = True, is_vote = True).replace("\n" + str(key_cert), '')
+ self.assertRaises(ValueError, DirectoryAuthority, content, True, True)
+
+ authority = DirectoryAuthority(content, False, True)
+ self.assertEqual('turtles', authority.nickname)
More information about the tor-commits
mailing list