[tor-commits] [stem/master] Public decryption method
atagar at torproject.org
atagar at torproject.org
Sun Oct 6 02:07:35 UTC 2019
commit 3498ebedccdcd59cfc9072e2f24f5c9eb0c3daae
Author: Damian Johnson <atagar at torproject.org>
Date: Sat Oct 5 15:06:27 2019 -0700
Public decryption method
After considering a few APIs descided to keep this simple, and simply return
the InnerLayer which references the OuterLayer.
---
stem/descriptor/hidden_service.py | 58 ++++++++++++++++++++-----------
test/unit/descriptor/hidden_service_v3.py | 36 ++++++++++++++-----
2 files changed, 64 insertions(+), 30 deletions(-)
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index 6911fa3c..e4ee13e3 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -671,8 +671,10 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
def create(cls, attr = None, exclude = (), validate = True, sign = False):
return cls(cls.content(attr, exclude, sign), validate = validate, skip_crypto_validation = not sign)
- def __init__(self, raw_contents, validate = False, skip_crypto_validation = False):
+ def __init__(self, raw_contents, validate = False):
super(HiddenServiceDescriptorV3, self).__init__(raw_contents, lazy_load = not validate)
+
+ self._inner_layer = None
entries = _descriptor_components(raw_contents, validate)
if validate:
@@ -691,39 +693,49 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
else:
self._entries = entries
- # TODO: The following is only marked as private because it is a work in
- # progress. This will probably become something like "body()" which decrypts
- # and parses the internal descriptor content.
+ def decrypt(self, onion_address, validate = False):
+ """
+ Decrypt this descriptor. Hidden serice descriptors contain two encryption
+ layers (:class:`~stem.descriptor.hidden_service.OuterLayer` and
+ :class:`~stem.descriptor.hidden_service.InnerLayer`).
+
+ :param str onion_address: hidden service address this descriptor is from
+ :param bool validate: perform validation checks on decrypted content
+
+ :returns: :class:`~stem.descriptor.hidden_service.InnerLayer` with our
+ decrypted content
+
+ :raises:
+ * **ImportError** if required cryptography or sha3 module is unavailable
+ * **ValueError** if unable to decrypt or validation fails
+ """
- def _decrypt(self, onion_address, outer_layer = False):
if not stem.prereq.is_crypto_available(ed25519 = True):
raise ImportError('Hidden service descriptor decryption requires cryptography version 2.6')
elif not stem.prereq._is_sha3_available():
raise ImportError('Hidden service descriptor decryption requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
- blinded_key = self.signing_cert.signing_key()
-
- if not blinded_key:
- raise ValueError('No signing key is present')
+ if self._inner_layer is None:
+ blinded_key = self.signing_cert.signing_key()
- identity_public_key = HiddenServiceDescriptorV3._public_key_from_address(onion_address)
+ if not blinded_key:
+ raise ValueError('No signing key is present')
- # credential = H('credential' | public-identity-key)
- # subcredential = H('subcredential' | credential | blinded-public-key)
+ identity_public_key = HiddenServiceDescriptorV3._public_key_from_address(onion_address)
- credential = hashlib.sha3_256(b'credential%s' % (identity_public_key)).digest()
- subcredential = hashlib.sha3_256(b'subcredential%s%s' % (credential, blinded_key)).digest()
+ # credential = H('credential' | public-identity-key)
+ # subcredential = H('subcredential' | credential | blinded-public-key)
- outter_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_outter_layer(self.superencrypted, self.revision_counter, blinded_key, subcredential)
+ credential = hashlib.sha3_256(b'credential%s' % (identity_public_key)).digest()
+ subcredential = hashlib.sha3_256(b'subcredential%s%s' % (credential, blinded_key)).digest()
- if outer_layer:
- return outter_layer_plaintext
+ outer_layer = OuterLayer(stem.descriptor.hsv3_crypto.decrypt_outter_layer(self.superencrypted, self.revision_counter, blinded_key, subcredential), validate)
- inner_layer_ciphertext = OuterLayer(outter_layer_plaintext).encrypted
+ inner_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_inner_layer(outer_layer.encrypted, self.revision_counter, blinded_key, subcredential)
- inner_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_inner_layer(inner_layer_ciphertext, self.revision_counter, blinded_key, subcredential)
+ self._inner_layer = InnerLayer(inner_layer_plaintext, validate, outer_layer)
- return inner_layer_plaintext
+ return self._inner_layer
@staticmethod
def _public_key_from_address(onion_address):
@@ -805,6 +817,8 @@ class InnerLayer(Descriptor):
.. versionadded:: 1.8.0
+ :var stem.descriptor.hidden_service.OuterLayer outer: enclosing encryption layer
+
:var list formats: **\\*** recognized CREATE2 cell formats
:var list intro_auth: **\\*** introduction-layer authentication types
:var bool is_single_service: **\\*** **True** if this is a `single onion service <https://gitweb.torproject.org/torspec.git/tree/proposals/260-rend-single-onion.txt>`_, **False** otherwise
@@ -827,9 +841,11 @@ class InnerLayer(Descriptor):
'single-onion-service': _parse_v3_inner_single_service,
}
- def __init__(self, content, validate = False):
+ def __init__(self, content, validate = False, outer_layer = None):
super(InnerLayer, self).__init__(content, lazy_load = not validate)
+ self.outer = outer_layer
+
# inner layer begins with a few header fields, followed by multiple any
# number of introduction-points
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index 36b85f7c..093d5cb6 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -35,6 +35,15 @@ BDwQZ8rhp05oCqhhY3oFHqG9KS7HGzv9g2v1/PrVJMbkfpwu1YK4b3zIZAk=
-----END ED25519 CERT-----\
"""
+with open(get_resource('hidden_service_v3'), 'rb') as descriptor_file:
+ HS_DESC_STR = descriptor_file.read()
+
+with open(get_resource('hidden_service_v3_outer_layer'), 'rb') as outer_layer_file:
+ OUTER_LAYER_STR = outer_layer_file.read()
+
+with open(get_resource('hidden_service_v3_inner_layer'), 'rb') as inner_layer_file:
+ INNER_LAYER_STR = inner_layer_file.read()
+
class TestHiddenServiceDescriptorV3(unittest.TestCase):
def test_real_descriptor(self):
@@ -54,20 +63,30 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
self.assertTrue('eaH8VdaTKS' in desc.superencrypted)
self.assertEqual('aglChCQF+lbzKgyxJJTpYGVShV/GMDRJ4+cRGCp+a2y/yX/tLSh7hzqI7rVZrUoGj74Xr1CLMYO3fXYCS+DPDQ', desc.signature)
- if stem.prereq.is_crypto_available(ed25519 = True) and stem.prereq._is_sha3_available():
- with open(get_resource('hidden_service_v3_outer_layer'), 'rb') as outer_layer_file:
- self.assertEqual(outer_layer_file.read(), desc._decrypt(HS_ADDRESS, outer_layer = True))
+ def test_decryption(self):
+ """
+ Decrypt our descriptor and validate its content.
+ """
+
+ if not stem.prereq.is_crypto_available(ed25519 = True):
+ self.skipTest('(requires cryptography ed25519 support)')
+ return
+ elif not stem.prereq._is_sha3_available():
+ self.skipTest('(requires sha3 support)')
+ return
+
+ desc = HiddenServiceDescriptorV3.from_str(HS_DESC_STR)
+ inner_layer = desc.decrypt(HS_ADDRESS)
- with open(get_resource('hidden_service_v3_inner_layer'), 'rb') as outer_layer_file:
- self.assertEqual(outer_layer_file.read(), desc._decrypt(HS_ADDRESS, outer_layer = False))
+ self.assertEqual(INNER_LAYER_STR, str(inner_layer))
+ self.assertEqual(OUTER_LAYER_STR.rstrip(b'\x00'), str(inner_layer.outer))
def test_outer_layer(self):
"""
Parse the outer layer of our test descriptor.
"""
- with open(get_resource('hidden_service_v3_outer_layer'), 'rb') as descriptor_file:
- desc = OuterLayer(descriptor_file.read())
+ desc = OuterLayer(OUTER_LAYER_STR)
self.assertEqual('x25519', desc.auth_type)
self.assertEqual('WjZCU9sV1oxkxaPcd7/YozeZgq0lEs6DhWyrdYRNJR4=', desc.ephemeral_key)
@@ -85,8 +104,7 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
Parse the inner layer of our test descriptor.
"""
- with open(get_resource('hidden_service_v3_inner_layer'), 'rb') as descriptor_file:
- desc = InnerLayer(descriptor_file.read())
+ desc = InnerLayer(INNER_LAYER_STR)
self.assertEqual([2], desc.formats)
self.assertEqual(['ed25519'], desc.intro_auth)
More information about the tor-commits
mailing list