[tor-commits] [stem/master] Introduce encode-to-decode unittest for v3 descriptors.
atagar at torproject.org
atagar at torproject.org
Sun Nov 17 23:40:39 UTC 2019
commit 6a17f072d74718c73be5fb003b98fd3d8babb02d
Author: George Kadianakis <desnacked at riseup.net>
Date: Thu Oct 10 21:01:01 2019 +0300
Introduce encode-to-decode unittest for v3 descriptors.
This unittest encodes a v3 descriptor, then decodes it, and checks that the
parsing went right by checking the parsed attributes (in this case the
introduction points).
---
stem/descriptor/hsv3_crypto.py | 37 ++++++++++++++
test/unit/descriptor/certificate.py | 36 ++++++++++++++
test/unit/descriptor/hidden_service_v3.py | 82 ++++++++++++++++++++++++++++++-
3 files changed, 153 insertions(+), 2 deletions(-)
diff --git a/stem/descriptor/hsv3_crypto.py b/stem/descriptor/hsv3_crypto.py
index a4fb7bf5..275450c0 100644
--- a/stem/descriptor/hsv3_crypto.py
+++ b/stem/descriptor/hsv3_crypto.py
@@ -11,6 +11,17 @@ from cryptography.hazmat.primitives import serialization
import stem.descriptor.ed25519_exts_ref as ed25519_exts_ref
import stem.descriptor.slow_ed25519 as slow_ed25519
+def pubkeys_are_equal(pubkey1, pubkey2):
+ """
+ Compare the raw bytes of the two pubkeys and return True if they are the same
+ """
+ pubkey1_bytes = pubkey1.public_bytes(encoding=serialization.Encoding.Raw,
+ format=serialization.PublicFormat.Raw)
+ pubkey2_bytes = pubkey2.public_bytes(encoding=serialization.Encoding.Raw,
+ format=serialization.PublicFormat.Raw)
+
+ return pubkey1_bytes == pubkey2_bytes
+
"""
HSv3 Key blinding
@@ -72,6 +83,32 @@ def get_subcredential(public_identity_key, blinded_key):
return subcredential
+"""
+Onion address
+
+ onion_address = base32(PUBKEY | CHECKSUM | VERSION) + ".onion"
+ CHECKSUM = H(".onion checksum" | PUBKEY | VERSION)[:2]
+
+ - PUBKEY is the 32 bytes ed25519 master pubkey of the hidden service.
+ - VERSION is an one byte version field (default value '\x03')
+ - ".onion checksum" is a constant string
+ - CHECKSUM is truncated to two bytes before inserting it in onion_address
+"""
+CHECKSUM_CONSTANT = b".onion checksum"
+
+def encode_onion_address(ed25519_pub_key_bytes):
+ """
+ Given the public key, return the onion address
+ """
+ version = 3
+ checksum_body = b"%s%s%d" % (CHECKSUM_CONSTANT, ed25519_pub_key_bytes, version)
+ checksum = hashlib.sha3_256(checksum_body).digest()[:2]
+
+ onion_address_bytes = b"%s%s%d" % (ed25519_pub_key_bytes, checksum, version)
+ onion_address = base64.b32encode(onion_address_bytes) + b".onion"
+ assert(len(onion_address) == 56 + len(".onion"))
+
+ return onion_address.lower()
"""
Basic descriptor logic:
diff --git a/test/unit/descriptor/certificate.py b/test/unit/descriptor/certificate.py
index 51960525..29a06b42 100644
--- a/test/unit/descriptor/certificate.py
+++ b/test/unit/descriptor/certificate.py
@@ -15,6 +15,8 @@ import test.require
from stem.descriptor.certificate import ED25519_SIGNATURE_LENGTH, CertType, ExtensionType, ExtensionFlag, Ed25519Certificate, Ed25519CertificateV1, Ed25519Extension
from test.unit.descriptor import get_resource
+from cryptography.hazmat.primitives import serialization
+
ED25519_CERT = """
AQQABhtZAaW2GoBED1IjY3A6f6GNqBEl5A83fD2Za9upGke51JGqAQAgBABnprVR
ptIr43bWPo2fIzo3uOywfoMrryprpbm4HhCkZMaO064LP+1KNuLvlc8sGG8lTjx1
@@ -193,3 +195,37 @@ class TestEd25519Certificate(unittest.TestCase):
cert = Ed25519Certificate.parse(certificate())
self.assertRaisesWith(ValueError, 'Ed25519KeyCertificate signing key is invalid (Signature was forged or corrupt)', cert.validate, desc)
+
+ @test.require.ed25519_support
+ def test_encode_decode_certificate(self):
+ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+
+ certified_priv_key = Ed25519PrivateKey.generate()
+ certified_pub_key = certified_priv_key.public_key()
+
+ signing_priv_key = Ed25519PrivateKey.generate()
+
+ expiration_date = datetime.datetime(2037, 8, 28, 17, 0)
+
+ my_ed_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type=CertType.HS_V3_DESC_SIGNING_KEY,
+ expiration_date=expiration_date,
+ cert_key_type=1,
+ certified_pub_key=certified_pub_key,
+ signing_priv_key=signing_priv_key,
+ include_signing_key=True)
+
+ ed_cert_bytes = my_ed_cert.encode()
+ self.assertTrue(my_ed_cert)
+
+ # base64 the cert since that's what the parsing func expects
+ ed_cert_bytes_b64 = base64.b64encode(ed_cert_bytes)
+
+ ed_cert_parsed = stem.descriptor.certificate.Ed25519Certificate.parse(ed_cert_bytes_b64)
+
+ self.assertEqual(ed_cert_parsed.type, my_ed_cert.cert_type)
+ self.assertEqual(ed_cert_parsed.expiration, my_ed_cert.expiration_date)
+ self.assertEqual(ed_cert_parsed.key_type, my_ed_cert.cert_key_type)
+ self.assertEqual(ed_cert_parsed.key, my_ed_cert.certified_pub_key.public_bytes(encoding=serialization.Encoding.Raw,
+ format=serialization.PublicFormat.Raw))
+ self.assertEqual(ed_cert_parsed.get_signing_key(), my_ed_cert.signing_pub_key.public_bytes(encoding=serialization.Encoding.Raw,
+ format=serialization.PublicFormat.Raw))
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index 37781b5f..b2a58fbc 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -5,12 +5,19 @@ Unit tests for stem.descriptor.hidden_service for version 3.
import functools
import unittest
+from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
+from cryptography.hazmat.primitives import serialization
+
import stem.client.datatype
import stem.descriptor
import stem.prereq
+import stem.descriptor.hsv3_crypto as hsv3_crypto
+
from stem.descriptor.hidden_service import (
REQUIRED_V3_FIELDS,
+ IntroductionPointV3,
HiddenServiceDescriptorV3,
OuterLayer,
InnerLayer,
@@ -44,7 +51,6 @@ with open(get_resource('hidden_service_v3_outer_layer')) as outer_layer_file:
with open(get_resource('hidden_service_v3_inner_layer')) as inner_layer_file:
INNER_LAYER_STR = inner_layer_file.read()
-
class TestHiddenServiceDescriptorV3(unittest.TestCase):
def test_real_descriptor(self):
"""
@@ -145,8 +151,10 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
'signature': 'signature',
}
+ private_identity_key = Ed25519PrivateKey.generate()
for line in REQUIRED_V3_FIELDS:
- desc_text = HiddenServiceDescriptorV3.content(exclude = (line,))
+ desc_text = HiddenServiceDescriptorV3.content(exclude = (line,),
+ ed25519_private_identity_key=private_identity_key)
expect_invalid_attr_for_text(self, desc_text, line_to_attr[line], None)
def test_invalid_version(self):
@@ -202,3 +210,73 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
self.assertEqual(b'\x92\xe6\x80\xfaWU.}HL\x9d*>\xdbF\xfb\xc0v\xe5N\xa9\x0bw\xbb\x84\xe3\xe6\xd5e}R\xa1', HiddenServiceDescriptorV3._public_key_from_address(HS_ADDRESS))
self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3._public_key_from_address, 'boom')
self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3._public_key_from_address, '5' * 56)
+
+ def _helper_get_intro(self):
+ link_specifiers = []
+
+ link1, _ = stem.client.datatype.LinkSpecifier.pop(b'\x03\x20CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC')
+ link_specifiers.append(link1)
+
+ onion_privkey = X25519PrivateKey.generate()
+ onion_pubkey = onion_privkey.public_key()
+
+ auth_privkey = Ed25519PrivateKey.generate()
+ auth_pubkey = auth_privkey.public_key()
+
+ enc_privkey = X25519PrivateKey.generate()
+ enc_pubkey = enc_privkey.public_key()
+
+ intro = IntroductionPointV3(link_specifiers, onion_key=onion_pubkey, enc_key=enc_pubkey, auth_key=auth_pubkey)
+
+ return intro
+
+ def test_encode_decode_descriptor(self):
+ """
+ Encode an HSv3 descriptor and then decode it and make sure you get the intended results.
+
+ This test is from the point of view of the onionbalance, so the object that
+ this test generates is the data that onionbalance also has available when
+ making onion service descriptors.
+ """
+ # Build the service
+ private_identity_key = Ed25519PrivateKey.from_private_bytes(b"a"*32)
+ public_identity_key = private_identity_key.public_key()
+ pubkey_bytes = public_identity_key.public_bytes(encoding=serialization.Encoding.Raw,
+ format=serialization.PublicFormat.Raw)
+
+ onion_address = hsv3_crypto.encode_onion_address(pubkey_bytes).decode()
+
+ # Build the introduction points
+ intro1 = self._helper_get_intro()
+ intro2 = self._helper_get_intro()
+ intro3 = self._helper_get_intro()
+ intro_points = [intro1, intro2, intro3]
+
+ blind_param = bytes.fromhex("677776AE42464CAAB0DF0BF1E68A5FB651A390A6A8243CF4B60EE73A6AC2E4E3")
+
+ # Build the descriptor
+ desc_string = HiddenServiceDescriptorV3.content(ed25519_private_identity_key=private_identity_key,
+ intro_points=intro_points,
+ blinding_param=blind_param)
+ desc_string = desc_string.decode()
+
+ # Parse the descriptor
+ desc = HiddenServiceDescriptorV3.from_str(desc_string)
+ inner_layer = desc.decrypt(onion_address)
+
+ self.assertEqual(len(inner_layer.introduction_points), 3)
+
+ # Match introduction points of the parsed descriptor and the generated
+ # descriptor and do some sanity checks between them to make sure that
+ # parsing was done right!
+ for desc_intro in inner_layer.introduction_points:
+ original_found = False # Make sure we found all the intro points
+
+ for original_intro in intro_points:
+ # Match intro points
+ if hsv3_crypto.pubkeys_are_equal(desc_intro.auth_key, original_intro.auth_key):
+ original_found = True
+ self.assertTrue(hsv3_crypto.pubkeys_are_equal(desc_intro.enc_key, original_intro.enc_key))
+ self.assertTrue(hsv3_crypto.pubkeys_are_equal(desc_intro.onion_key, original_intro.onion_key))
+
+ self.assertTrue(original_found)
More information about the tor-commits
mailing list