[tor-commits] [stem/master] Simplified introduction point constructor
atagar at torproject.org
atagar at torproject.org
Sun Nov 17 23:40:39 UTC 2019
commit 76fe67b0f377ffe340ea77b79e1055012ed7fc56
Author: Damian Johnson <atagar at torproject.org>
Date: Mon Nov 4 15:39:25 2019 -0800
Simplified introduction point constructor
Our test's _helper_get_intro() provided a good recipie for creating
introduction points. Productionizing this into a helper we can provide
to make these without worrying about too many details.
---
stem/descriptor/hidden_service.py | 164 +++++++++++++++++++++++++-----
stem/prereq.py | 3 +
test/unit/descriptor/hidden_service_v3.py | 147 ++++++++------------------
3 files changed, 189 insertions(+), 125 deletions(-)
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index 9168123d..70ca6d25 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -41,12 +41,14 @@ import time
import stem.client.datatype
import stem.descriptor.certificate
+import stem.descriptor.hsv3_crypto
import stem.prereq
import stem.util.connection
import stem.util.str_tools
import stem.util.tor_tools
-from stem.descriptor import hsv3_crypto
+from stem.client.datatype import CertType
+from stem.descriptor.certificate import ExtensionType, Ed25519Extension, Ed25519Certificate, Ed25519CertificateV1
from stem.descriptor import (
PGP_BLOCK_END,
@@ -183,7 +185,7 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
onion_key = onion_key_line[5:] if onion_key_line.startswith('ntor ') else None
_, block_type, auth_key_cert = entry['auth-key'][0]
- auth_key_cert = stem.descriptor.certificate.Ed25519Certificate.from_base64(auth_key_cert)
+ auth_key_cert = Ed25519Certificate.from_base64(auth_key_cert)
if block_type != 'ED25519 CERT':
raise ValueError('Expected auth-key to have an ed25519 certificate, but was %s' % block_type)
@@ -192,7 +194,7 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
enc_key = enc_key_line[5:] if enc_key_line.startswith('ntor ') else None
_, block_type, enc_key_cert = entry['enc-key-cert'][0]
- enc_key_cert = stem.descriptor.certificate.Ed25519Certificate.from_base64(enc_key_cert)
+ enc_key_cert = Ed25519Certificate.from_base64(enc_key_cert)
if block_type != 'ED25519 CERT':
raise ValueError('Expected enc-key-cert to have an ed25519 certificate, but was %s' % block_type)
@@ -202,6 +204,64 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
return IntroductionPointV3(link_specifiers, onion_key, auth_key_cert, enc_key, enc_key_cert, legacy_key, legacy_key_cert)
+ @staticmethod
+ def create(address, port, expiration, onion_key, enc_key, auth_key, signing_key):
+ """
+ Simplified constructor. For more sophisticated use cases you can use this
+ as a template for how introduction points are properly created.
+
+ :param str address: IPv4 or IPv6 address where the service is reachable
+ :param int port: port where the service is reachable
+ :param datetime.datetime expiration: when certificates should expire
+ :param str onion_key: encoded, X25519PublicKey, or X25519PrivateKey onion key
+ :param str enc_key: encoded, X25519PublicKey, or X25519PrivateKey encryption key
+ :param str auth_key: encoded, Ed25519PublicKey, or Ed25519PrivateKey authentication key
+ :param cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey signing_key: service signing key
+
+ :returns: :class:`~stem.descriptor.hidden_service.IntroductionPointV3` with these attributes
+
+ :raises: **ValueError** if the address, port, or keys are malformed
+ """
+
+ def _key_bytes(key):
+ class_name = type(key).__name__
+
+ if isinstance(key, str):
+ return key
+ elif not class_name.endswith('PublicKey') and not class_name.endswith('PrivateKey'):
+ raise ValueError('Key must be a string or cryptographic public/private key (was %s)' % class_name)
+ elif not stem.prereq.is_crypto_available():
+ raise ImportError('Serializing keys requires the cryptography module')
+
+ from cryptography.hazmat.primitives import serialization
+
+ if class_name.endswith('PrivateKey'):
+ key = key.public_key()
+
+ return key.public_bytes(
+ encoding = serialization.Encoding.Raw,
+ format = serialization.PublicFormat.Raw,
+ )
+
+ if not stem.util.connection.is_valid_port(port):
+ raise ValueError("'%s' is an invalid port" % port)
+
+ if stem.util.connection.is_valid_ipv4_address(address):
+ link_specifiers = [stem.client.datatype.LinkByIPv4(address, port)]
+ elif stem.util.connection.is_valid_ipv6_address(address):
+ link_specifiers = [stem.client.datatype.LinkByIPv6(address, port)]
+ else:
+ raise ValueError("'%s' is not a valid IPv4 or IPv6 address" % address)
+
+ onion_key = base64.b64encode(_key_bytes(onion_key))
+ enc_key = base64.b64encode(_key_bytes(enc_key))
+
+ extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, _key_bytes(signing_key))]
+ auth_key_cert = Ed25519CertificateV1(CertType.HS_V3_INTRO_AUTH, expiration, 1, _key_bytes(auth_key), extensions, signing_key = signing_key)
+ enc_key_cert = Ed25519CertificateV1(CertType.HS_V3_NTOR_ENC, expiration, 1, _key_bytes(auth_key), extensions, signing_key = signing_key)
+
+ return IntroductionPointV3(link_specifiers, onion_key, auth_key_cert, enc_key, enc_key_cert, None, None)
+
def encode(self):
"""
Descriptor representation of this introduction point.
@@ -244,7 +304,20 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
* **EnvironmentError** if OpenSSL x25519 unsupported
"""
- return IntroductionPointV3._parse_key(self.onion_key_raw)
+ return IntroductionPointV3._key_as(self.onion_key_raw, x25519 = True)
+
+ def auth_key(self):
+ """
+ Provides our authentication certificate's public key.
+
+ :returns: :class:`~cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey`
+
+ :raises:
+ * **ImportError** if required the cryptography module is unavailable
+ * **EnvironmentError** if OpenSSL x25519 unsupported
+ """
+
+ return IntroductionPointV3._key_as(self.auth_key_cert.key, ed25519 = True)
def enc_key(self):
"""
@@ -257,7 +330,7 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
* **EnvironmentError** if OpenSSL x25519 unsupported
"""
- return IntroductionPointV3._parse_key(self.enc_key_raw)
+ return IntroductionPointV3._key_as(self.enc_key_raw, x25519 = True)
def legacy_key(self):
"""
@@ -270,23 +343,31 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
* **EnvironmentError** if OpenSSL x25519 unsupported
"""
- return IntroductionPointV3._parse_key(self.legacy_key_raw)
+ return IntroductionPointV3._key_as(self.legacy_key_raw, x25519 = True)
@staticmethod
- def _parse_key(value):
- if value is None:
+ def _key_as(value, x25519 = False, ed25519 = False):
+ if value is None or (not x25519 and not ed25519):
return value
elif not stem.prereq.is_crypto_available():
raise ImportError('cryptography module unavailable')
- elif not X25519_AVAILABLE:
- # without this the cryptography raises...
- # cryptography.exceptions.UnsupportedAlgorithm: X25519 is not supported by this version of OpenSSL.
- raise EnvironmentError('OpenSSL x25519 unsupported')
+ if x25519:
+ if not X25519_AVAILABLE:
+ # without this the cryptography raises...
+ # cryptography.exceptions.UnsupportedAlgorithm: X25519 is not supported by this version of OpenSSL.
+
+ raise EnvironmentError('OpenSSL x25519 unsupported')
- from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PublicKey
+ from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PublicKey
+ return X25519PublicKey.from_public_bytes(base64.b64decode(value))
- return X25519PublicKey.from_public_bytes(base64.b64decode(value))
+ if ed25519:
+ if not stem.prereq.is_crypto_available(ed25519 = True):
+ raise EnvironmentError('cryptography ed25519 unsupported')
+
+ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
+ return Ed25519PublicKey.from_public_bytes(value)
@staticmethod
def _parse_link_specifiers(content):
@@ -482,7 +563,7 @@ _parse_v2_signature_line = _parse_key_block('signature', 'signature', 'SIGNATURE
_parse_v3_version_line = _parse_int_line('hs-descriptor', 'version', allow_negative = False)
_parse_lifetime_line = _parse_int_line('descriptor-lifetime', 'lifetime', allow_negative = False)
-_parse_signing_cert = stem.descriptor.certificate.Ed25519Certificate._from_descriptor('descriptor-signing-key-cert', 'signing_cert')
+_parse_signing_cert = Ed25519Certificate._from_descriptor('descriptor-signing-key-cert', 'signing_cert')
_parse_revision_counter_line = _parse_int_line('revision-counter', 'revision_counter', allow_negative = False)
_parse_superencrypted_line = _parse_key_block('superencrypted', 'superencrypted', 'MESSAGE')
_parse_v3_signature_line = _parse_simple_line('signature', 'signature')
@@ -779,9 +860,9 @@ def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key
expiration_date = datetime.datetime.utcnow() + datetime.timedelta(hours=54)
signing_key = descriptor_signing_public_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
- extensions = [stem.descriptor.certificate.Ed25519Extension(stem.descriptor.certificate.ExtensionType.HAS_SIGNING_KEY, None, blinded_priv_key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))]
+ extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, blinded_priv_key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))]
- desc_signing_cert = stem.descriptor.certificate.Ed25519CertificateV1(stem.client.datatype.CertType.HS_V3_DESC_SIGNING, expiration_date, 1, signing_key, extensions, signing_key = blinded_priv_key)
+ desc_signing_cert = Ed25519CertificateV1(CertType.HS_V3_DESC_SIGNING, expiration_date, 1, signing_key, extensions, signing_key = blinded_priv_key)
return '\n' + desc_signing_cert.to_base64(pem = True)
@@ -864,11 +945,11 @@ def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_
"""
inner_descriptor_layer = stem.util.str_tools._to_bytes(_get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey))
- inner_ciphertext = hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
+ inner_ciphertext = stem.descriptor.hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
inner_ciphertext_b64 = b64_and_wrap_desc_layer(inner_ciphertext, b'encrypted')
middle_descriptor_layer = _get_middle_descriptor_layer_body(inner_ciphertext_b64)
- outter_ciphertext = hsv3_crypto.encrypt_outter_layer(middle_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
+ outter_ciphertext = stem.descriptor.hsv3_crypto.encrypt_outter_layer(middle_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
return b64_and_wrap_desc_layer(outter_ciphertext)
@@ -898,6 +979,8 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
**\\*** attribute is either required when we're parsed with validation or has
a default value, others are left as **None** if undefined
+
+ .. versionadded:: 1.8.0
"""
# TODO: requested this @type on https://trac.torproject.org/projects/tor/ticket/31481
@@ -973,7 +1056,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
public_identity_key_bytes = public_identity_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
# Blind the identity key to get ephemeral blinded key
- blinded_privkey = hsv3_crypto.HSv3PrivateBlindedKey(ed25519_private_identity_key, blinding_param = blinding_param)
+ blinded_privkey = stem.descriptor.hsv3_crypto.HSv3PrivateBlindedKey(ed25519_private_identity_key, blinding_param = blinding_param)
blinded_pubkey = blinded_privkey.public_key()
blinded_pubkey_bytes = blinded_pubkey.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
@@ -1063,7 +1146,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
if not blinded_key:
raise ValueError('No signing key is present')
- identity_public_key = HiddenServiceDescriptorV3._public_key_from_address(onion_address)
+ identity_public_key = HiddenServiceDescriptorV3.public_key_from_address(onion_address)
subcredential = HiddenServiceDescriptorV3._subcredential(identity_public_key, blinded_key)
outer_layer = OuterLayer._decrypt(self.superencrypted, self.revision_counter, subcredential, blinded_key)
@@ -1072,8 +1155,43 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
return self._inner_layer
@staticmethod
- def _public_key_from_address(onion_address):
- # provides our hidden service ed25519 public key
+ def address_from_public_key(pubkey, suffix = True):
+ """
+ Converts a hidden service public key into its address.
+
+ :param bytes pubkey: hidden service public key
+ :param bool suffix: includes the '.onion' suffix if true, excluded otherwise
+
+ :returns: **unicode** hidden service address
+
+ :raises: **ImportError** if sha3 unsupported
+ """
+
+ if not stem.prereq._is_sha3_available():
+ raise ImportError('Hidden service address conversion requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
+
+ version = stem.client.datatype.Size.CHAR.pack(3)
+ checksum = hashlib.sha3_256(CHECKSUM_CONSTANT + pubkey + version).digest()[:2]
+ onion_address = base64.b32encode(pubkey + checksum + version)
+
+ return stem.util.str_tools._to_unicode(onion_address + b'.onion' if suffix else onion_address).lower()
+
+ @staticmethod
+ def public_key_from_address(onion_address):
+ """
+ Converts a hidden service address into its public key.
+
+ :param str onion_address: hidden service address
+
+ :returns: **bytes** for the hidden service's public key
+
+ :raises:
+ * **ImportError** if sha3 unsupported
+ * **ValueError** if address malformed or checksum is invalid
+ """
+
+ if not stem.prereq._is_sha3_available():
+ raise ImportError('Hidden service address conversion requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
if onion_address.endswith('.onion'):
onion_address = onion_address[:-6]
diff --git a/stem/prereq.py b/stem/prereq.py
index 40c2f006..4af6c093 100644
--- a/stem/prereq.py
+++ b/stem/prereq.py
@@ -26,6 +26,9 @@ import inspect
import platform
import sys
+# TODO: in stem 2.x consider replacing these functions with requirement
+# annotations (like our tests)
+
CRYPTO_UNAVAILABLE = "Unable to import the cryptography module. Because of this we'll be unable to verify descriptor signature integrity. You can get cryptography from: https://pypi.org/project/cryptography/"
ZSTD_UNAVAILABLE = 'ZSTD compression requires the zstandard module (https://pypi.org/project/zstandard/)'
LZMA_UNAVAILABLE = 'LZMA compression requires the lzma module (https://docs.python.org/3/library/lzma.html)'
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index 14df30ec..61752e11 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -5,34 +5,28 @@ Unit tests for stem.descriptor.hidden_service for version 3.
import base64
import datetime
import functools
-import hashlib
import unittest
import stem.client.datatype
import stem.descriptor
+import stem.descriptor.hidden_service
import stem.prereq
import test.require
-from stem.client.datatype import CertType, LinkByIPv4
-from stem.descriptor.certificate import ExtensionType, Ed25519Extension, Ed25519CertificateV1
+from test.unit.descriptor import (
+ get_resource,
+ base_expect_invalid_attr,
+ base_expect_invalid_attr_for_text,
+)
from stem.descriptor.hidden_service import (
- CHECKSUM_CONSTANT,
- REQUIRED_V3_FIELDS,
- X25519_AVAILABLE,
IntroductionPointV3,
HiddenServiceDescriptorV3,
OuterLayer,
InnerLayer,
)
-from test.unit.descriptor import (
- get_resource,
- base_expect_invalid_attr,
- base_expect_invalid_attr_for_text,
-)
-
try:
# added in python 3.3
from unittest.mock import patch, Mock
@@ -40,12 +34,13 @@ except ImportError:
from mock import patch, Mock
require_sha3 = test.require.needs(stem.prereq._is_sha3_available, 'requires sha3')
-require_x25519 = test.require.needs(lambda: X25519_AVAILABLE, 'requires openssl x5509')
+require_x25519 = test.require.needs(lambda: stem.descriptor.hidden_service.X25519_AVAILABLE, 'requires openssl x5509')
expect_invalid_attr = functools.partial(base_expect_invalid_attr, HiddenServiceDescriptorV3, 'version', 3)
expect_invalid_attr_for_text = functools.partial(base_expect_invalid_attr_for_text, HiddenServiceDescriptorV3, 'version', 3)
-HS_ADDRESS = 'sltib6sxkuxh2scmtuvd5w2g7pahnzkovefxpo4e4ptnkzl5kkq5h2ad.onion'
+HS_ADDRESS = u'sltib6sxkuxh2scmtuvd5w2g7pahnzkovefxpo4e4ptnkzl5kkq5h2ad.onion'
+HS_PUBKEY = b'\x92\xe6\x80\xfaWU.}HL\x9d*>\xdbF\xfb\xc0v\xe5N\xa9\x0bw\xbb\x84\xe3\xe6\xd5e}R\xa1'
EXPECTED_SIGNING_CERT = """\
-----BEGIN ED25519 CERT-----
@@ -68,58 +63,9 @@ with open(get_resource('hidden_service_v3_intro_point')) as intro_point_file:
INTRO_POINT_STR = intro_point_file.read()
-def _pubkeys_are_equal(pubkey1, pubkey2):
- """
- Compare the raw bytes of the two pubkeys and return True if they are the same
- """
-
- from cryptography.hazmat.primitives import serialization
-
- pubkey1_bytes = pubkey1.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
- pubkey2_bytes = pubkey2.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
-
- return pubkey1_bytes == pubkey2_bytes
-
-
-def _encode_onion_address(ed25519_pub_key_bytes):
- """
- Given the public key, return the onion address
- """
-
- if not stem.prereq._is_sha3_available():
- raise ImportError('Encoding onion addresses requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
-
- version = 3
- checksum_body = b'%s%s%d' % (CHECKSUM_CONSTANT, ed25519_pub_key_bytes, version)
- checksum = hashlib.sha3_256(checksum_body).digest()[:2]
-
- onion_address_bytes = b'%s%s%d' % (ed25519_pub_key_bytes, checksum, version)
- onion_address = base64.b32encode(onion_address_bytes) + b'.onion'
- assert(len(onion_address) == 56 + len('.onion'))
-
- return onion_address.lower()
-
-
-def _helper_get_intro():
- from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
- from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
+def key_bytes(key):
from cryptography.hazmat.primitives import serialization
-
- def public_key(key):
- return key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
-
- onion_key = public_key(X25519PrivateKey.generate())
- enc_key = public_key(X25519PrivateKey.generate())
- auth_key = public_key(Ed25519PrivateKey.generate())
- signing_key = Ed25519PrivateKey.generate()
-
- expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = 54)
- extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, public_key(signing_key))]
-
- auth_key_cert = Ed25519CertificateV1(CertType.HS_V3_INTRO_AUTH, expiration, 1, auth_key, extensions, signing_key = signing_key)
- enc_key_cert = Ed25519CertificateV1(CertType.HS_V3_NTOR_ENC, expiration, 1, auth_key, extensions, signing_key = signing_key)
-
- return IntroductionPointV3([LinkByIPv4('1.2.3.4', 9001)], base64.b64encode(onion_key), auth_key_cert, base64.b64encode(enc_key), enc_key_cert, None, None)
+ return key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
class TestHiddenServiceDescriptorV3(unittest.TestCase):
@@ -221,7 +167,7 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
'signature': 'signature',
}
- for line in REQUIRED_V3_FIELDS:
+ for line in stem.descriptor.hidden_service.REQUIRED_V3_FIELDS:
desc_text = HiddenServiceDescriptorV3.content(exclude = (line,))
expect_invalid_attr_for_text(self, desc_text, line_to_attr[line], None)
@@ -268,11 +214,14 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
expect_invalid_attr(self, {'revision-counter': test_value}, 'revision_counter')
@require_sha3
- @test.require.ed25519_support
+ def test_address_from_public_key(self):
+ self.assertEqual(HS_ADDRESS, HiddenServiceDescriptorV3.address_from_public_key(HS_PUBKEY))
+
+ @require_sha3
def test_public_key_from_address(self):
- self.assertEqual(b'\x92\xe6\x80\xfaWU.}HL\x9d*>\xdbF\xfb\xc0v\xe5N\xa9\x0bw\xbb\x84\xe3\xe6\xd5e}R\xa1', HiddenServiceDescriptorV3._public_key_from_address(HS_ADDRESS))
- self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3._public_key_from_address, 'boom')
- self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3._public_key_from_address, '5' * 56)
+ self.assertEqual(HS_PUBKEY, HiddenServiceDescriptorV3.public_key_from_address(HS_ADDRESS))
+ self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3.public_key_from_address, 'boom')
+ self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3.public_key_from_address, '5' * 56)
def test_intro_point_parse(self):
"""
@@ -304,7 +253,6 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
"""
from cryptography.hazmat.backends.openssl.x25519 import X25519PublicKey
- from cryptography.hazmat.primitives import serialization
intro_point = InnerLayer(INNER_LAYER_STR).introduction_points[0]
@@ -314,15 +262,8 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
self.assertTrue(isinstance(intro_point.onion_key(), X25519PublicKey))
self.assertTrue(isinstance(intro_point.enc_key(), X25519PublicKey))
- self.assertEqual(intro_point.onion_key_raw, base64.b64encode(intro_point.onion_key().public_bytes(
- encoding = serialization.Encoding.Raw,
- format = serialization.PublicFormat.Raw,
- )))
-
- self.assertEqual(intro_point.enc_key_raw, base64.b64encode(intro_point.enc_key().public_bytes(
- encoding = serialization.Encoding.Raw,
- format = serialization.PublicFormat.Raw,
- )))
+ self.assertEqual(intro_point.onion_key_raw, base64.b64encode(key_bytes(intro_point.onion_key())))
+ self.assertEqual(intro_point.enc_key_raw, base64.b64encode(key_bytes(intro_point.enc_key())))
self.assertEqual(None, intro_point.legacy_key_raw)
self.assertEqual(None, intro_point.legacy_key())
@@ -346,21 +287,28 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
making onion service descriptors.
"""
- from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey, Ed25519PrivateKey
- from cryptography.hazmat.primitives import serialization
+ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+ from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
+
+ onion_key = X25519PrivateKey.generate()
+ enc_key = X25519PrivateKey.generate()
+ auth_key = Ed25519PrivateKey.generate()
+ signing_key = Ed25519PrivateKey.generate()
+
+ expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = 54)
# Build the service
private_identity_key = Ed25519PrivateKey.from_private_bytes(b'a' * 32)
public_identity_key = private_identity_key.public_key()
- pubkey_bytes = public_identity_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
+ pubkey_bytes = key_bytes(public_identity_key)
- onion_address = _encode_onion_address(pubkey_bytes).decode()
+ onion_address = HiddenServiceDescriptorV3.address_from_public_key(pubkey_bytes)
- # Build the introduction points
- intro1 = _helper_get_intro()
- intro2 = _helper_get_intro()
- intro3 = _helper_get_intro()
- intro_points = [intro1, intro2, intro3]
+ intro_points = [
+ IntroductionPointV3.create('1.1.1.1', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
+ IntroductionPointV3.create('2.2.2.2', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
+ IntroductionPointV3.create('3.3.3.3', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
+ ]
# TODO: replace with bytes.fromhex() when we drop python 2.x support
@@ -368,7 +316,6 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
# Build the descriptor
desc_string = HiddenServiceDescriptorV3.content(ed25519_private_identity_key = private_identity_key, intro_points = intro_points, blinding_param = blind_param)
- desc_string = desc_string.decode()
# Parse the descriptor
desc = HiddenServiceDescriptorV3.from_str(desc_string)
@@ -376,17 +323,13 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
self.assertEqual(3, len(inner_layer.introduction_points))
- # Match introduction points of the parsed descriptor and the generated
- # descriptor and do some sanity checks between them to make sure that
- # parsing was done right!
-
- for i, desc_intro in enumerate(inner_layer.introduction_points):
- original_intro = intro_points[i]
-
- auth_key_1 = Ed25519PublicKey.from_public_bytes(desc_intro.auth_key_cert.key)
- auth_key_2 = Ed25519PublicKey.from_public_bytes(original_intro.auth_key_cert.key)
+ for i, intro_point in enumerate(inner_layer.introduction_points):
+ original = intro_points[i]
- self.assertTrue(_pubkeys_are_equal(desc_intro.enc_key(), original_intro.enc_key()))
- self.assertTrue(_pubkeys_are_equal(desc_intro.onion_key(), original_intro.onion_key()))
+ self.assertEqual(original.enc_key_raw, intro_point.enc_key_raw)
+ self.assertEqual(original.onion_key_raw, intro_point.onion_key_raw)
+ self.assertEqual(original.auth_key_cert.key, intro_point.auth_key_cert.key)
- self.assertTrue(_pubkeys_are_equal(auth_key_1, auth_key_2))
+ self.assertEqual(intro_point.enc_key_raw, base64.b64encode(key_bytes(intro_point.enc_key())))
+ self.assertEqual(intro_point.onion_key_raw, base64.b64encode(key_bytes(intro_point.onion_key())))
+ self.assertEqual(intro_point.auth_key_cert.key, key_bytes(intro_point.auth_key()))
More information about the tor-commits
mailing list