[tor-commits] [stem/master] HidenServiceDescriptor creation
atagar at torproject.org
atagar at torproject.org
Tue May 2 19:57:29 UTC 2017
commit cf869a877683254ca7b262a989551bc9c6b76204
Author: Damian Johnson <atagar at torproject.org>
Date: Mon May 1 12:56:36 2017 -0700
HidenServiceDescriptor creation
---
stem/descriptor/hidden_service_descriptor.py | 31 ++++++++++-
test/mocking.py | 51 ------------------
test/unit/descriptor/hidden_service_descriptor.py | 65 ++++++++---------------
3 files changed, 52 insertions(+), 95 deletions(-)
diff --git a/stem/descriptor/hidden_service_descriptor.py b/stem/descriptor/hidden_service_descriptor.py
index 129bc83..a3b12b5 100644
--- a/stem/descriptor/hidden_service_descriptor.py
+++ b/stem/descriptor/hidden_service_descriptor.py
@@ -31,8 +31,10 @@ import stem.util.connection
import stem.util.str_tools
from stem.descriptor import (
+ CRYPTO_BLOB,
PGP_BLOCK_END,
Descriptor,
+ _descriptor_content,
_descriptor_components,
_read_until_keywords,
_bytes_for_block,
@@ -80,6 +82,20 @@ SINGLE_INTRODUCTION_POINT_FIELDS = [
BASIC_AUTH = 1
STEALTH_AUTH = 2
+HIDDEN_SERVICE_HEADER = (
+ ('rendezvous-service-descriptor', 'y3olqqblqw2gbh6phimfuiroechjjafa'),
+ ('version', '2'),
+ ('permanent-key', '\n-----BEGIN RSA PUBLIC KEY-----%s-----END RSA PUBLIC KEY-----' % CRYPTO_BLOB),
+ ('secret-id-part', 'e24kgecavwsznj7gpbktqsiwgvngsf4e'),
+ ('publication-time', '2015-02-23 20:00:00'),
+ ('protocol-versions', '2,3'),
+ ('introduction-points', '\n-----BEGIN MESSAGE-----\n-----END MESSAGE-----'),
+)
+
+HIDDEN_SERVICE_FOOTER = (
+ ('signature', '\n-----BEGIN SIGNATURE-----%s-----END SIGNATURE-----' % CRYPTO_BLOB),
+)
+
class IntroductionPoints(collections.namedtuple('IntroductionPoints', INTRODUCTION_POINTS_ATTR.keys())):
"""
@@ -204,6 +220,9 @@ class HiddenServiceDescriptor(Descriptor):
Moved from the deprecated `pycrypto
<https://www.dlitz.net/software/pycrypto/>`_ module to `cryptography
<https://pypi.python.org/pypi/cryptography>`_ for validating signatures.
+
+ .. versionchanged:: 1.6.0
+ Added the **skip_crypto_validation** constructor argument.
"""
ATTRIBUTES = {
@@ -230,7 +249,15 @@ class HiddenServiceDescriptor(Descriptor):
'signature': _parse_signature_line,
}
- def __init__(self, raw_contents, validate = False):
+ @classmethod
+ def content(cls, attr = None, exclude = ()):
+ return _descriptor_content(attr, exclude, HIDDEN_SERVICE_HEADER, HIDDEN_SERVICE_FOOTER)
+
+ @classmethod
+ def create(cls, attr = None, exclude = (), validate = True):
+ return cls(cls.content(attr, exclude), validate = validate, skip_crypto_validation = True)
+
+ def __init__(self, raw_contents, validate = False, skip_crypto_validation = False):
super(HiddenServiceDescriptor, self).__init__(raw_contents, lazy_load = not validate)
entries = _descriptor_components(raw_contents, validate, non_ascii_fields = ('introduction-points'))
@@ -248,7 +275,7 @@ class HiddenServiceDescriptor(Descriptor):
self._parse(entries, validate)
- if stem.prereq.is_crypto_available():
+ if not skip_crypto_validation and stem.prereq.is_crypto_available():
signed_digest = self._digest_for_signature(self.permanent_key, self.signature)
content_digest = self._digest_for_content(b'rendezvous-service-descriptor ', b'\nsignature\n')
diff --git a/test/mocking.py b/test/mocking.py
index f1d859e..09d3631 100644
--- a/test/mocking.py
+++ b/test/mocking.py
@@ -23,17 +23,12 @@ Helper functions for creating mock objects.
get_router_status_entry_v2 - RouterStatusEntryV2
get_router_status_entry_v3 - RouterStatusEntryV3
get_router_status_entry_micro_v3 - RouterStatusEntryMicroV3
-
- stem.descriptor.hidden-service_descriptor
- get_hidden_service_descriptor - HiddenServiceDescriptor
"""
-import base64
import hashlib
import itertools
import os
import re
-import textwrap
import stem.descriptor.extrainfo_descriptor
import stem.descriptor.hidden_service_descriptor
@@ -46,12 +41,6 @@ import stem.response
import stem.util.str_tools
try:
- # added in python 3.3
- from unittest.mock import Mock, patch
-except ImportError:
- from mock import Mock, patch
-
-try:
# added in python 2.7
from collections import OrderedDict
except ImportError:
@@ -138,20 +127,6 @@ NETWORK_STATUS_DOCUMENT_FOOTER = (
('directory-signature', '%s %s\n%s' % (DOC_SIG.identity, DOC_SIG.key_digest, DOC_SIG.signature)),
)
-HIDDEN_SERVICE_HEADER = (
- ('rendezvous-service-descriptor', 'y3olqqblqw2gbh6phimfuiroechjjafa'),
- ('version', '2'),
- ('permanent-key', '\n-----BEGIN RSA PUBLIC KEY-----%s-----END RSA PUBLIC KEY-----' % CRYPTO_BLOB),
- ('secret-id-part', 'e24kgecavwsznj7gpbktqsiwgvngsf4e'),
- ('publication-time', '2015-02-23 20:00:00'),
- ('protocol-versions', '2,3'),
- ('introduction-points', '\n-----BEGIN MESSAGE-----\n-----END MESSAGE-----'),
-)
-
-HIDDEN_SERVICE_FOOTER = (
- ('signature', '\n-----BEGIN SIGNATURE-----%s-----END SIGNATURE-----' % CRYPTO_BLOB),
-)
-
def get_all_combinations(attr, include_empty = False):
"""
@@ -372,32 +347,6 @@ def get_router_status_entry_micro_v3(attr = None, exclude = (), content = False)
return stem.descriptor.router_status_entry.RouterStatusEntryMicroV3(desc_content, validate = True)
-def get_hidden_service_descriptor(attr = None, exclude = (), content = False, introduction_points_lines = None):
- """
- Provides the descriptor content for...
- stem.descriptor.hidden_service_descriptor.HidenServiceDescriptor
-
- :param dict attr: keyword/value mappings to be included in the descriptor
- :param list exclude: mandatory keywords to exclude from the descriptor
- :param bool content: provides the str content of the descriptor rather than the class if True
- :param list introduction_points_lines: lines to be included in the introduction-points field
-
- :returns: HidenServiceDescriptor for the requested descriptor content
- """
-
- if (not attr or 'introduction-points' not in attr) and introduction_points_lines is not None:
- encoded = base64.b64encode(introduction_points_lines('\n'))
- attr['introduction-points'] = '\n-----BEGIN MESSAGE-----\n%s\n-----END MESSAGE-----' % '\n'.join(textwrap.wrap(encoded, 64))
-
- desc_content = _get_descriptor_content(attr, exclude, HIDDEN_SERVICE_HEADER, HIDDEN_SERVICE_FOOTER)
-
- if content:
- return desc_content
- else:
- with patch('stem.prereq.is_crypto_available', Mock(return_value = False)):
- return stem.descriptor.hidden_service_descriptor.HiddenServiceDescriptor(desc_content, validate = True)
-
-
def get_directory_authority(attr = None, exclude = (), is_vote = False, content = False):
"""
Provides the descriptor content for...
diff --git a/test/unit/descriptor/hidden_service_descriptor.py b/test/unit/descriptor/hidden_service_descriptor.py
index 6f9fb26..48f1d29 100644
--- a/test/unit/descriptor/hidden_service_descriptor.py
+++ b/test/unit/descriptor/hidden_service_descriptor.py
@@ -3,13 +3,12 @@ Unit tests for stem.descriptor.hidden_service_descriptor.
"""
import datetime
+import functools
import unittest
import stem.descriptor
import stem.prereq
-from test.mocking import CRYPTO_BLOB, get_hidden_service_descriptor
-from test.unit.descriptor import get_resource
from test.util import require_cryptography
from stem.descriptor.hidden_service_descriptor import (
@@ -18,6 +17,12 @@ from stem.descriptor.hidden_service_descriptor import (
HiddenServiceDescriptor,
)
+from test.unit.descriptor import (
+ get_resource,
+ base_expect_invalid_attr,
+ base_expect_invalid_attr_for_text,
+)
+
MESSAGE_BLOCK = """
-----BEGIN MESSAGE-----
%s
@@ -232,6 +237,9 @@ lj/7xMZWDrfyw5H86L0QiaZnkmD+nig1+S+Rn39mmuEgl2iwZO/ihlncUJQTEULb
-----END MESSAGE-----\
"""
+expect_invalid_attr = functools.partial(base_expect_invalid_attr, HiddenServiceDescriptor, 'descriptor_id', 'y3olqqblqw2gbh6phimfuiroechjjafa')
+expect_invalid_attr_for_text = functools.partial(base_expect_invalid_attr_for_text, HiddenServiceDescriptor, 'descriptor_id', 'y3olqqblqw2gbh6phimfuiroechjjafa')
+
class TestHiddenServiceDescriptor(unittest.TestCase):
def test_for_duckduckgo_with_validation(self):
@@ -403,18 +411,18 @@ class TestHiddenServiceDescriptor(unittest.TestCase):
Basic sanity check that we can parse a hidden service descriptor with minimal attributes.
"""
- desc = get_hidden_service_descriptor()
+ desc = HiddenServiceDescriptor.create()
self.assertEqual('y3olqqblqw2gbh6phimfuiroechjjafa', desc.descriptor_id)
self.assertEqual(2, desc.version)
- self.assertTrue(CRYPTO_BLOB in desc.permanent_key)
+ self.assertTrue(stem.descriptor.CRYPTO_BLOB in desc.permanent_key)
self.assertEqual('e24kgecavwsznj7gpbktqsiwgvngsf4e', desc.secret_id_part)
self.assertEqual(datetime.datetime(2015, 2, 23, 20, 0, 0), desc.published)
self.assertEqual([2, 3], desc.protocol_versions)
self.assertEqual('-----BEGIN MESSAGE-----\n-----END MESSAGE-----', desc.introduction_points_encoded)
self.assertEqual([], desc.introduction_points_auth)
self.assertEqual(b'', desc.introduction_points_content)
- self.assertTrue(CRYPTO_BLOB in desc.signature)
+ self.assertTrue(stem.descriptor.CRYPTO_BLOB in desc.signature)
self.assertEqual([], desc.introduction_points())
def test_unrecognized_line(self):
@@ -422,7 +430,7 @@ class TestHiddenServiceDescriptor(unittest.TestCase):
Includes unrecognized content in the descriptor.
"""
- desc = get_hidden_service_descriptor({'pepperjack': 'is oh so tasty!'})
+ desc = HiddenServiceDescriptor.create({'pepperjack': 'is oh so tasty!'})
self.assertEqual(['pepperjack is oh so tasty!'], desc.get_unrecognized_lines())
def test_proceeding_line(self):
@@ -430,16 +438,14 @@ class TestHiddenServiceDescriptor(unittest.TestCase):
Includes a line prior to the 'rendezvous-service-descriptor' entry.
"""
- desc_text = b'hibernate 1\n' + get_hidden_service_descriptor(content = True)
- self._expect_invalid_attr(desc_text)
+ expect_invalid_attr_for_text(self, b'hibernate 1\n' + HiddenServiceDescriptor.content())
def test_trailing_line(self):
"""
Includes a line after the 'router-signature' entry.
"""
- desc_text = get_hidden_service_descriptor(content = True) + b'\nhibernate 1'
- self._expect_invalid_attr(desc_text)
+ expect_invalid_attr_for_text(self, HiddenServiceDescriptor.content() + b'\nhibernate 1')
def test_required_fields(self):
"""
@@ -458,10 +464,10 @@ class TestHiddenServiceDescriptor(unittest.TestCase):
}
for line in REQUIRED_FIELDS:
- desc_text = get_hidden_service_descriptor(content = True, exclude = (line,))
+ desc_text = HiddenServiceDescriptor.content(exclude = (line,))
expected = [] if line == 'protocol-versions' else None
- self._expect_invalid_attr(desc_text, line_to_attr[line], expected)
+ expect_invalid_attr_for_text(self, desc_text, line_to_attr[line], expected)
def test_invalid_version(self):
"""
@@ -475,8 +481,7 @@ class TestHiddenServiceDescriptor(unittest.TestCase):
)
for test_value in test_values:
- desc_text = get_hidden_service_descriptor({'version': test_value}, content = True)
- self._expect_invalid_attr(desc_text, 'version')
+ expect_invalid_attr(self, {'version': test_value}, 'version')
def test_invalid_protocol_versions(self):
"""
@@ -495,8 +500,7 @@ class TestHiddenServiceDescriptor(unittest.TestCase):
)
for test_value in test_values:
- desc_text = get_hidden_service_descriptor({'protocol-versions': test_value}, content = True)
- self._expect_invalid_attr(desc_text, 'protocol_versions', [])
+ expect_invalid_attr(self, {'protocol-versions': test_value}, 'protocol_versions', [])
def test_introduction_points_when_empty(self):
"""
@@ -505,14 +509,14 @@ class TestHiddenServiceDescriptor(unittest.TestCase):
are valid according to the spec.
"""
- missing_field_desc = get_hidden_service_descriptor(exclude = ('introduction-points',))
+ missing_field_desc = HiddenServiceDescriptor.create(exclude = ('introduction-points',))
self.assertEqual(None, missing_field_desc.introduction_points_encoded)
self.assertEqual([], missing_field_desc.introduction_points_auth)
self.assertEqual(None, missing_field_desc.introduction_points_content)
self.assertEqual([], missing_field_desc.introduction_points())
- empty_field_desc = get_hidden_service_descriptor({'introduction-points': MESSAGE_BLOCK % ''})
+ empty_field_desc = HiddenServiceDescriptor.create({'introduction-points': MESSAGE_BLOCK % ''})
self.assertEqual((MESSAGE_BLOCK % '').strip(), empty_field_desc.introduction_points_encoded)
self.assertEqual([], empty_field_desc.introduction_points_auth)
@@ -530,30 +534,7 @@ class TestHiddenServiceDescriptor(unittest.TestCase):
)
for test_value in test_values:
- desc_text = get_hidden_service_descriptor({'introduction-points': test_value}, content = True)
-
- desc = self._expect_invalid_attr(desc_text, 'introduction_points_encoded', test_value.strip())
+ desc = expect_invalid_attr(self, {'introduction-points': test_value}, 'introduction_points_encoded', test_value.strip())
self.assertEqual([], desc.introduction_points_auth)
self.assertEqual(None, desc.introduction_points_content)
self.assertEqual([], desc.introduction_points())
-
- def _expect_invalid_attr(self, desc_text, attr = None, expected_value = None):
- """
- Asserts that construction will fail due to desc_text having a malformed
- attribute. If an attr is provided then we check that it matches an expected
- value when we're constructed without validation.
- """
-
- self.assertRaises(ValueError, HiddenServiceDescriptor, desc_text, True)
- desc = HiddenServiceDescriptor(desc_text, validate = False)
-
- if attr:
- # check that the invalid attribute matches the expected value when
- # constructed without validation
-
- self.assertEqual(expected_value, getattr(desc, attr))
- else:
- # check a default attribute
- self.assertEqual('y3olqqblqw2gbh6phimfuiroechjjafa', desc.descriptor_id)
-
- return desc
More information about the tor-commits
mailing list