[tor-commits] [stem/master] Unit tests for hidden service descriptor edge cases

atagar at torproject.org atagar at torproject.org
Sun Mar 1 05:16:35 UTC 2015


commit b3c42c61ecf883b311af17665abb7213e2361abc
Author: Damian Johnson <atagar at torproject.org>
Date:   Sat Feb 28 14:35:40 2015 -0800

    Unit tests for hidden service descriptor edge cases
    
    Expanding its unit test coverage and few fixes for isssues it surfaced.
---
 stem/descriptor/hidden_service_descriptor.py      |   23 ++--
 test/mocking.py                                   |    2 +-
 test/unit/descriptor/hidden_service_descriptor.py |  149 +++++++++++++++++++++
 3 files changed, 162 insertions(+), 12 deletions(-)

diff --git a/stem/descriptor/hidden_service_descriptor.py b/stem/descriptor/hidden_service_descriptor.py
index f247b07..c1970a0 100644
--- a/stem/descriptor/hidden_service_descriptor.py
+++ b/stem/descriptor/hidden_service_descriptor.py
@@ -16,12 +16,8 @@ the HSDir flag.
   HiddenServiceDescriptor - Tor hidden service descriptor.
 """
 
-# TODO: Add a description for how to retrieve them when tor supports that (#14847).
-
-# TODO: We should add a '@type hidden-service-descriptor 1.0' annotation to
-# CollecTor...
-#
-# https://collector.torproject.org/formats.html
+# TODO: Add a description for how to retrieve them when tor supports that
+# (#14847) and then update #15009.
 
 import base64
 import collections
@@ -141,8 +137,11 @@ def _parse_introduction_points_line(descriptor, entries):
 
   descriptor.introduction_points_encoded = block_contents
 
-  blob = ''.join(block_contents.split('\n')[1:-1])
-  decoded_field = base64.b64decode(stem.util.str_tools._to_bytes(blob))
+  try:
+    blob = ''.join(block_contents.split('\n')[1:-1])
+    decoded_field = base64.b64decode(stem.util.str_tools._to_bytes(blob))
+  except TypeError:
+    raise ValueError("'introduction-points' isn't base64 encoded content:\n%s" % block_contents)
 
   auth_types = []
 
@@ -176,11 +175,11 @@ class HiddenServiceDescriptor(Descriptor):
     values so our descriptor_id can be validated
   :var datetime published: **\*** time in UTC when this descriptor was made
   :var list protocol_versions: **\*** list of **int** versions that are supported when establishing a connection
-  :var str introduction_points_encoded: **\*** raw introduction points blob
+  :var str introduction_points_encoded: raw introduction points blob
   :var list introduction_points_auth: **\*** tuples of the form
     (auth_method, auth_data) for our introduction_points_content
-  :var str introduction_points_content: **\*** decoded introduction-points
-    content without authentication data, if using cookie authentication this is
+  :var str introduction_points_content: decoded introduction-points content
+    without authentication data, if using cookie authentication this is
     encrypted
   :var str signature: signature of the descriptor content
 
@@ -196,6 +195,8 @@ class HiddenServiceDescriptor(Descriptor):
     'published': (None, _parse_publication_time_line),
     'protocol_versions': ([], _parse_protocol_versions_line),
     'introduction_points_encoded': (None, _parse_introduction_points_line),
+    'introduction_points_auth': ([], _parse_introduction_points_line),
+    'introduction_points_content': (None, _parse_introduction_points_line),
     'signature': (None, _parse_signature_line),
   }
 
diff --git a/test/mocking.py b/test/mocking.py
index 604beec..4a3f272 100644
--- a/test/mocking.py
+++ b/test/mocking.py
@@ -531,7 +531,7 @@ def get_hidden_service_descriptor(attr = None, exclude = (), content = False, in
   :returns: HidenServiceDescriptor for the requested descriptor content
   """
 
-  if introduction_points_lines is not None:
+  if (not attr or 'introduction-points' not in attr) and introduction_points_lines is not None:
     encoded = base64.b64encode(introduction_points_lines('\n'))
     attr['introduction-points'] = '\n-----BEGIN MESSAGE-----\n%s\n-----END MESSAGE-----' % '\n'.join(textwrap.wrap(encoded, 64))
 
diff --git a/test/unit/descriptor/hidden_service_descriptor.py b/test/unit/descriptor/hidden_service_descriptor.py
index 1a1c14d..3b9c652 100644
--- a/test/unit/descriptor/hidden_service_descriptor.py
+++ b/test/unit/descriptor/hidden_service_descriptor.py
@@ -7,9 +7,17 @@ import unittest
 
 import stem.descriptor
 
+from stem.descriptor.hidden_service_descriptor import REQUIRED_FIELDS, HiddenServiceDescriptor
+
 from test.mocking import CRYPTO_BLOB, get_hidden_service_descriptor
 from test.unit.descriptor import get_resource
 
+MESSAGE_BLOCK = """
+-----BEGIN MESSAGE-----
+%s
+-----END MESSAGE-----\
+"""
+
 EXPECTED_DDG_PERMANENT_KEY = """\
 -----BEGIN RSA PUBLIC KEY-----
 MIGJAoGBAJ/SzzgrXPxTlFrKVhXh3buCWv2QfcNgncUpDpKouLn3AtPH5Ocys0jE
@@ -258,3 +266,144 @@ class TestHiddenServiceDescriptor(unittest.TestCase):
     self.assertEqual('', desc.introduction_points_content)
     self.assertTrue(CRYPTO_BLOB in desc.signature)
     self.assertEqual([], desc.introduction_points())
+
+  def test_unrecognized_line(self):
+    """
+    Includes unrecognized content in the descriptor.
+    """
+
+    desc = get_hidden_service_descriptor({'pepperjack': 'is oh so tasty!'})
+    self.assertEqual(['pepperjack is oh so tasty!'], desc.get_unrecognized_lines())
+
+  def test_proceeding_line(self):
+    """
+    Includes a line prior to the 'rendezvous-service-descriptor' entry.
+    """
+
+    desc_text = b'hibernate 1\n' + get_hidden_service_descriptor(content = True)
+    self._expect_invalid_attr(desc_text)
+
+  def test_trailing_line(self):
+    """
+    Includes a line after the 'router-signature' entry.
+    """
+
+    desc_text = get_hidden_service_descriptor(content = True) + b'\nhibernate 1'
+    self._expect_invalid_attr(desc_text)
+
+  def test_required_fields(self):
+    """
+    Check that we require the mandatory fields.
+    """
+
+    line_to_attr = {
+      'rendezvous-service-descriptor': 'descriptor_id',
+      'version': 'version',
+      'permanent-key': 'permanent_key',
+      'secret-id-part': 'secret_id_part',
+      'publication-time': 'published',
+      'introduction-points': 'introduction_points_encoded',
+      'protocol-versions': 'protocol_versions',
+      'signature': 'signature',
+    }
+
+    for line in REQUIRED_FIELDS:
+      desc_text = get_hidden_service_descriptor(content = True, exclude = (line,))
+
+      expected = [] if line == 'protocol-versions' else None
+      self._expect_invalid_attr(desc_text, line_to_attr[line], expected)
+
+  def test_invalid_version(self):
+    """
+    Checks that our version field expects a numeric value.
+    """
+
+    test_values = (
+      '',
+      '-10',
+      'hello',
+    )
+
+    for test_value in test_values:
+      desc_text = get_hidden_service_descriptor({'version': test_value}, content = True)
+      self._expect_invalid_attr(desc_text, 'version')
+
+  def test_invalid_protocol_versions(self):
+    """
+    Checks that our protocol-versions field expects comma separated numeric
+    values.
+    """
+
+    test_values = (
+      '',
+      '-10',
+      'hello',
+      '10,',
+      ',10',
+      '10,-10',
+      '10,hello',
+    )
+
+    for test_value in test_values:
+      desc_text = get_hidden_service_descriptor({'protocol-versions': test_value}, content = True)
+      self._expect_invalid_attr(desc_text, 'protocol_versions', [])
+
+  def test_introduction_points_when_empty(self):
+    """
+    It's valid to advertise zero introduciton points. I'm not clear if this
+    would mean an empty protocol-versions field or that it's omitted but either
+    are valid according to the spec.
+    """
+
+    missing_field_desc = get_hidden_service_descriptor(exclude = ('introduction-points',))
+
+    self.assertEqual(None, missing_field_desc.introduction_points_encoded)
+    self.assertEqual([], missing_field_desc.introduction_points_auth)
+    self.assertEqual(None, missing_field_desc.introduction_points_content)
+    self.assertEqual([], missing_field_desc.introduction_points())
+
+    empty_field_desc = get_hidden_service_descriptor({'introduction-points': MESSAGE_BLOCK % ''})
+
+    self.assertEqual((MESSAGE_BLOCK % '').strip(), empty_field_desc.introduction_points_encoded)
+    self.assertEqual([], empty_field_desc.introduction_points_auth)
+    self.assertEqual('', empty_field_desc.introduction_points_content)
+    self.assertEqual([], empty_field_desc.introduction_points())
+
+  def test_introduction_points_when_not_base64(self):
+    """
+    Checks the introduction-points field when the content isn't base64 encoded.
+    """
+
+    test_values = (
+      MESSAGE_BLOCK % '12345',
+      MESSAGE_BLOCK % 'hello',
+    )
+
+    for test_value in test_values:
+      desc_text = get_hidden_service_descriptor({'introduction-points': test_value}, content = True)
+
+      desc = self._expect_invalid_attr(desc_text, 'introduction_points_encoded', test_value.strip())
+      self.assertEqual([], desc.introduction_points_auth)
+      self.assertEqual(None, desc.introduction_points_content)
+      self.assertEqual([], desc.introduction_points())
+
+  def _expect_invalid_attr(self, desc_text, attr = None, expected_value = None):
+    """
+    Asserts that construction will fail due to desc_text having a malformed
+    attribute. If an attr is provided then we check that it matches an expected
+    value when we're constructed without validation.
+    """
+
+    self.assertRaises(ValueError, HiddenServiceDescriptor, desc_text, True)
+    desc = HiddenServiceDescriptor(desc_text, validate = False)
+
+    if attr:
+      # check that the invalid attribute matches the expected value when
+      # constructed without validation
+
+      self.assertEqual(expected_value, getattr(desc, attr))
+    else:
+      # check a default attribute
+      self.assertEqual('y3olqqblqw2gbh6phimfuiroechjjafa', desc.descriptor_id)
+
+    return desc





More information about the tor-commits mailing list