[tor-commits] [stem/master] Unit tests for hidden service descriptor edge cases
atagar at torproject.org
atagar at torproject.org
Sun Mar 1 05:16:35 UTC 2015
commit b3c42c61ecf883b311af17665abb7213e2361abc
Author: Damian Johnson <atagar at torproject.org>
Date: Sat Feb 28 14:35:40 2015 -0800
Unit tests for hidden service descriptor edge cases
Expanding its unit test coverage and few fixes for isssues it surfaced.
---
stem/descriptor/hidden_service_descriptor.py | 23 ++--
test/mocking.py | 2 +-
test/unit/descriptor/hidden_service_descriptor.py | 149 +++++++++++++++++++++
3 files changed, 162 insertions(+), 12 deletions(-)
diff --git a/stem/descriptor/hidden_service_descriptor.py b/stem/descriptor/hidden_service_descriptor.py
index f247b07..c1970a0 100644
--- a/stem/descriptor/hidden_service_descriptor.py
+++ b/stem/descriptor/hidden_service_descriptor.py
@@ -16,12 +16,8 @@ the HSDir flag.
HiddenServiceDescriptor - Tor hidden service descriptor.
"""
-# TODO: Add a description for how to retrieve them when tor supports that (#14847).
-
-# TODO: We should add a '@type hidden-service-descriptor 1.0' annotation to
-# CollecTor...
-#
-# https://collector.torproject.org/formats.html
+# TODO: Add a description for how to retrieve them when tor supports that
+# (#14847) and then update #15009.
import base64
import collections
@@ -141,8 +137,11 @@ def _parse_introduction_points_line(descriptor, entries):
descriptor.introduction_points_encoded = block_contents
- blob = ''.join(block_contents.split('\n')[1:-1])
- decoded_field = base64.b64decode(stem.util.str_tools._to_bytes(blob))
+ try:
+ blob = ''.join(block_contents.split('\n')[1:-1])
+ decoded_field = base64.b64decode(stem.util.str_tools._to_bytes(blob))
+ except TypeError:
+ raise ValueError("'introduction-points' isn't base64 encoded content:\n%s" % block_contents)
auth_types = []
@@ -176,11 +175,11 @@ class HiddenServiceDescriptor(Descriptor):
values so our descriptor_id can be validated
:var datetime published: **\*** time in UTC when this descriptor was made
:var list protocol_versions: **\*** list of **int** versions that are supported when establishing a connection
- :var str introduction_points_encoded: **\*** raw introduction points blob
+ :var str introduction_points_encoded: raw introduction points blob
:var list introduction_points_auth: **\*** tuples of the form
(auth_method, auth_data) for our introduction_points_content
- :var str introduction_points_content: **\*** decoded introduction-points
- content without authentication data, if using cookie authentication this is
+ :var str introduction_points_content: decoded introduction-points content
+ without authentication data, if using cookie authentication this is
encrypted
:var str signature: signature of the descriptor content
@@ -196,6 +195,8 @@ class HiddenServiceDescriptor(Descriptor):
'published': (None, _parse_publication_time_line),
'protocol_versions': ([], _parse_protocol_versions_line),
'introduction_points_encoded': (None, _parse_introduction_points_line),
+ 'introduction_points_auth': ([], _parse_introduction_points_line),
+ 'introduction_points_content': (None, _parse_introduction_points_line),
'signature': (None, _parse_signature_line),
}
diff --git a/test/mocking.py b/test/mocking.py
index 604beec..4a3f272 100644
--- a/test/mocking.py
+++ b/test/mocking.py
@@ -531,7 +531,7 @@ def get_hidden_service_descriptor(attr = None, exclude = (), content = False, in
:returns: HidenServiceDescriptor for the requested descriptor content
"""
- if introduction_points_lines is not None:
+ if (not attr or 'introduction-points' not in attr) and introduction_points_lines is not None:
encoded = base64.b64encode(introduction_points_lines('\n'))
attr['introduction-points'] = '\n-----BEGIN MESSAGE-----\n%s\n-----END MESSAGE-----' % '\n'.join(textwrap.wrap(encoded, 64))
diff --git a/test/unit/descriptor/hidden_service_descriptor.py b/test/unit/descriptor/hidden_service_descriptor.py
index 1a1c14d..3b9c652 100644
--- a/test/unit/descriptor/hidden_service_descriptor.py
+++ b/test/unit/descriptor/hidden_service_descriptor.py
@@ -7,9 +7,17 @@ import unittest
import stem.descriptor
+from stem.descriptor.hidden_service_descriptor import REQUIRED_FIELDS, HiddenServiceDescriptor
+
from test.mocking import CRYPTO_BLOB, get_hidden_service_descriptor
from test.unit.descriptor import get_resource
+MESSAGE_BLOCK = """
+-----BEGIN MESSAGE-----
+%s
+-----END MESSAGE-----\
+"""
+
EXPECTED_DDG_PERMANENT_KEY = """\
-----BEGIN RSA PUBLIC KEY-----
MIGJAoGBAJ/SzzgrXPxTlFrKVhXh3buCWv2QfcNgncUpDpKouLn3AtPH5Ocys0jE
@@ -258,3 +266,144 @@ class TestHiddenServiceDescriptor(unittest.TestCase):
self.assertEqual('', desc.introduction_points_content)
self.assertTrue(CRYPTO_BLOB in desc.signature)
self.assertEqual([], desc.introduction_points())
+
+ def test_unrecognized_line(self):
+ """
+ Includes unrecognized content in the descriptor.
+ """
+
+ desc = get_hidden_service_descriptor({'pepperjack': 'is oh so tasty!'})
+ self.assertEqual(['pepperjack is oh so tasty!'], desc.get_unrecognized_lines())
+
+ def test_proceeding_line(self):
+ """
+ Includes a line prior to the 'rendezvous-service-descriptor' entry.
+ """
+
+ desc_text = b'hibernate 1\n' + get_hidden_service_descriptor(content = True)
+ self._expect_invalid_attr(desc_text)
+
+ def test_trailing_line(self):
+ """
+ Includes a line after the 'router-signature' entry.
+ """
+
+ desc_text = get_hidden_service_descriptor(content = True) + b'\nhibernate 1'
+ self._expect_invalid_attr(desc_text)
+
+ def test_required_fields(self):
+ """
+ Check that we require the mandatory fields.
+ """
+
+ line_to_attr = {
+ 'rendezvous-service-descriptor': 'descriptor_id',
+ 'version': 'version',
+ 'permanent-key': 'permanent_key',
+ 'secret-id-part': 'secret_id_part',
+ 'publication-time': 'published',
+ 'introduction-points': 'introduction_points_encoded',
+ 'protocol-versions': 'protocol_versions',
+ 'signature': 'signature',
+ }
+
+ for line in REQUIRED_FIELDS:
+ desc_text = get_hidden_service_descriptor(content = True, exclude = (line,))
+
+ expected = [] if line == 'protocol-versions' else None
+ self._expect_invalid_attr(desc_text, line_to_attr[line], expected)
+
+ def test_invalid_version(self):
+ """
+ Checks that our version field expects a numeric value.
+ """
+
+ test_values = (
+ '',
+ '-10',
+ 'hello',
+ )
+
+ for test_value in test_values:
+ desc_text = get_hidden_service_descriptor({'version': test_value}, content = True)
+ self._expect_invalid_attr(desc_text, 'version')
+
+ def test_invalid_protocol_versions(self):
+ """
+ Checks that our protocol-versions field expects comma separated numeric
+ values.
+ """
+
+ test_values = (
+ '',
+ '-10',
+ 'hello',
+ '10,',
+ ',10',
+ '10,-10',
+ '10,hello',
+ )
+
+ for test_value in test_values:
+ desc_text = get_hidden_service_descriptor({'protocol-versions': test_value}, content = True)
+ self._expect_invalid_attr(desc_text, 'protocol_versions', [])
+
+ def test_introduction_points_when_empty(self):
+ """
+ It's valid to advertise zero introduciton points. I'm not clear if this
+ would mean an empty protocol-versions field or that it's omitted but either
+ are valid according to the spec.
+ """
+
+ missing_field_desc = get_hidden_service_descriptor(exclude = ('introduction-points',))
+
+ self.assertEqual(None, missing_field_desc.introduction_points_encoded)
+ self.assertEqual([], missing_field_desc.introduction_points_auth)
+ self.assertEqual(None, missing_field_desc.introduction_points_content)
+ self.assertEqual([], missing_field_desc.introduction_points())
+
+ empty_field_desc = get_hidden_service_descriptor({'introduction-points': MESSAGE_BLOCK % ''})
+
+ self.assertEqual((MESSAGE_BLOCK % '').strip(), empty_field_desc.introduction_points_encoded)
+ self.assertEqual([], empty_field_desc.introduction_points_auth)
+ self.assertEqual('', empty_field_desc.introduction_points_content)
+ self.assertEqual([], empty_field_desc.introduction_points())
+
+ def test_introduction_points_when_not_base64(self):
+ """
+ Checks the introduction-points field when the content isn't base64 encoded.
+ """
+
+ test_values = (
+ MESSAGE_BLOCK % '12345',
+ MESSAGE_BLOCK % 'hello',
+ )
+
+ for test_value in test_values:
+ desc_text = get_hidden_service_descriptor({'introduction-points': test_value}, content = True)
+
+ desc = self._expect_invalid_attr(desc_text, 'introduction_points_encoded', test_value.strip())
+ self.assertEqual([], desc.introduction_points_auth)
+ self.assertEqual(None, desc.introduction_points_content)
+ self.assertEqual([], desc.introduction_points())
+
+ def _expect_invalid_attr(self, desc_text, attr = None, expected_value = None):
+ """
+ Asserts that construction will fail due to desc_text having a malformed
+ attribute. If an attr is provided then we check that it matches an expected
+ value when we're constructed without validation.
+ """
+
+ self.assertRaises(ValueError, HiddenServiceDescriptor, desc_text, True)
+ desc = HiddenServiceDescriptor(desc_text, validate = False)
+
+ if attr:
+ # check that the invalid attribute matches the expected value when
+ # constructed without validation
+
+ self.assertEqual(expected_value, getattr(desc, attr))
+ else:
+ # check a default attribute
+ self.assertEqual('y3olqqblqw2gbh6phimfuiroechjjafa', desc.descriptor_id)
+
+ return desc
More information about the tor-commits
mailing list