[tor-commits] [stem/master] Test using new IntroductionPoint class
atagar at torproject.org
atagar at torproject.org
Sun Nov 17 23:40:39 UTC 2019
commit a67425403fd90dd0b0e7b55161c8345091dfd36b
Author: Damian Johnson <atagar at torproject.org>
Date: Thu Oct 31 14:50:08 2019 -0700
Test using new IntroductionPoint class
Adjusting our test_encode_decode_descriptor() test to use our present
IntroductionPoint class rather than the prototype.
---
stem/descriptor/certificate.py | 29 +++++++++++++++++++++-----
stem/descriptor/hidden_service.py | 9 +++++---
test/unit/descriptor/hidden_service_v3.py | 34 +++++++++++++++++++++++++++----
3 files changed, 60 insertions(+), 12 deletions(-)
diff --git a/stem/descriptor/certificate.py b/stem/descriptor/certificate.py
index b48a0899..973957f1 100644
--- a/stem/descriptor/certificate.py
+++ b/stem/descriptor/certificate.py
@@ -123,10 +123,10 @@ class Ed25519Extension(Field):
def __init__(self, ext_type, flag_val, data):
self.type = ext_type
self.flags = []
- self.flag_int = flag_val
+ self.flag_int = flag_val if flag_val else 0
self.data = data
- if flag_val % 2 == 1:
+ if flag_val and flag_val % 2 == 1:
self.flags.append(ExtensionFlag.AFFECTS_VALIDATION)
flag_val -= 1
@@ -284,18 +284,34 @@ class Ed25519CertificateV1(Ed25519Certificate):
:var bytes key: key content
:var list extensions: :class:`~stem.descriptor.certificate.Ed25519Extension` in this certificate
:var bytes signature: certificate signature
+
+ :param bytes signature: pre-calculated certificate signature
+ :param cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey: certificate signing key
"""
- def __init__(self, type_int, expiration, key_type, key, extensions, signature):
+ def __init__(self, cert_type, expiration, key_type, key, extensions, signature = None, signing_key = None):
super(Ed25519CertificateV1, self).__init__(1)
- self.type, self.type_int = ClientCertType.get(type_int)
+ if not signature and not signing_key:
+ raise ValueError('Certificate signature or signing key is required')
+
+ self.type, self.type_int = ClientCertType.get(cert_type)
self.expiration = expiration
self.key_type = key_type
self.key = key
self.extensions = extensions
self.signature = signature
+ if signing_key:
+ calculated_sig = signing_key.sign(self.pack())
+
+ # if caller provides both signing key *and* signature then ensure they match
+
+ if self.signature and self.signature != calculated_sig:
+ raise ValueError("Signature calculated from its key (%s) mismatches '%s'" % (calculated_sig, self.signature))
+
+ self.signature = calculated_sig
+
if self.type in (ClientCertType.LINK, ClientCertType.IDENTITY, ClientCertType.AUTHENTICATE):
raise ValueError('Ed25519 certificate cannot have a type of %i. This is reserved for CERTS cells.' % self.type_int)
elif self.type == ClientCertType.ED25519_IDENTITY:
@@ -315,7 +331,10 @@ class Ed25519CertificateV1(Ed25519Certificate):
for extension in self.extensions:
encoded += extension.pack()
- return bytes(encoded + self.signature)
+ if self.signature:
+ encoded += self.signature
+
+ return bytes(encoded)
@staticmethod
def unpack(content):
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index 09030ca5..68d150b1 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -935,7 +935,10 @@ def _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey):
# Now encode all the intro points
for intro_point in intro_points:
- final_body += intro_point.encode(descriptor_signing_privkey)
+ if isinstance(intro_point, IntroductionPointV3):
+ final_body += intro_point.encode() + b'\n'
+ else:
+ final_body += intro_point.encode(descriptor_signing_privkey)
return final_body
@@ -984,7 +987,7 @@ def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_
should be attached to the descriptor
"""
- inner_descriptor_layer = _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey)
+ inner_descriptor_layer = stem.util.str_tools._to_bytes(_get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey))
inner_ciphertext = hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
inner_ciphertext_b64 = b64_and_wrap_desc_layer(inner_ciphertext, b'encrypted')
@@ -1179,7 +1182,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
raise ImportError('Hidden service descriptor decryption requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
if self._inner_layer is None:
- blinded_key = self.signing_cert.signing_key()
+ blinded_key = self.signing_cert.signing_key() if self.signing_cert else None
if not blinded_key:
raise ValueError('No signing key is present')
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index fb190211..060fa4dc 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -3,6 +3,7 @@ Unit tests for stem.descriptor.hidden_service for version 3.
"""
import base64
+import datetime
import functools
import hashlib
import unittest
@@ -13,6 +14,9 @@ import stem.prereq
import test.require
+from stem.client.datatype import CertType, LinkByIPv4
+from stem.descriptor.certificate import ExtensionType, Ed25519Extension, Ed25519CertificateV1
+
from stem.descriptor.hidden_service import (
CHECKSUM_CONSTANT,
REQUIRED_V3_FIELDS,
@@ -100,6 +104,28 @@ def _encode_onion_address(ed25519_pub_key_bytes):
def _helper_get_intro():
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
+ from cryptography.hazmat.primitives import serialization
+
+ def public_key(key):
+ return key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
+
+ onion_key = public_key(X25519PrivateKey.generate())
+ enc_key = public_key(X25519PrivateKey.generate())
+ auth_key = public_key(Ed25519PrivateKey.generate())
+ signing_key = Ed25519PrivateKey.generate()
+
+ expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = 54)
+ extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, public_key(signing_key))]
+
+ auth_key_cert = Ed25519CertificateV1(CertType.HS_V3_INTRO_AUTH, expiration, 1, auth_key, extensions, signing_key = signing_key)
+ enc_key_cert = Ed25519CertificateV1(CertType.HS_V3_NTOR_ENC, expiration, 1, auth_key, extensions, signing_key = signing_key)
+
+ return IntroductionPointV3([LinkByIPv4('1.2.3.4', 9001)], base64.b64encode(onion_key), auth_key_cert, base64.b64encode(enc_key), enc_key_cert, None, None)
+
+
+def _helper_get_intro_old():
+ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+ from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
link_specifiers = []
@@ -370,7 +396,7 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
desc = HiddenServiceDescriptorV3.from_str(desc_string)
inner_layer = desc.decrypt(onion_address)
- self.assertEqual(len(inner_layer.introduction_points), 3)
+ self.assertEqual(3, len(inner_layer.introduction_points))
# Match introduction points of the parsed descriptor and the generated
# descriptor and do some sanity checks between them to make sure that
@@ -380,9 +406,9 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
original_intro = intro_points[i]
auth_key_1 = Ed25519PublicKey.from_public_bytes(desc_intro.auth_key_cert.key)
- auth_key_2 = original_intro.auth_key
+ auth_key_2 = Ed25519PublicKey.from_public_bytes(original_intro.auth_key_cert.key)
- self.assertTrue(_pubkeys_are_equal(desc_intro.enc_key(), original_intro.enc_key))
- self.assertTrue(_pubkeys_are_equal(desc_intro.onion_key(), original_intro.onion_key))
+ self.assertTrue(_pubkeys_are_equal(desc_intro.enc_key(), original_intro.enc_key()))
+ self.assertTrue(_pubkeys_are_equal(desc_intro.onion_key(), original_intro.onion_key()))
self.assertTrue(_pubkeys_are_equal(auth_key_1, auth_key_2))
More information about the tor-commits
mailing list