[tor-commits] [stem/master] Test using new IntroductionPoint class

atagar at torproject.org atagar at torproject.org
Sun Nov 17 23:40:39 UTC 2019


commit a67425403fd90dd0b0e7b55161c8345091dfd36b
Author: Damian Johnson <atagar at torproject.org>
Date:   Thu Oct 31 14:50:08 2019 -0700

    Test using new IntroductionPoint class
    
    Adjusting our test_encode_decode_descriptor() test to use our present
    IntroductionPoint class rather than the prototype.
---
 stem/descriptor/certificate.py            | 29 +++++++++++++++++++++-----
 stem/descriptor/hidden_service.py         |  9 +++++---
 test/unit/descriptor/hidden_service_v3.py | 34 +++++++++++++++++++++++++++----
 3 files changed, 60 insertions(+), 12 deletions(-)

diff --git a/stem/descriptor/certificate.py b/stem/descriptor/certificate.py
index b48a0899..973957f1 100644
--- a/stem/descriptor/certificate.py
+++ b/stem/descriptor/certificate.py
@@ -123,10 +123,10 @@ class Ed25519Extension(Field):
   def __init__(self, ext_type, flag_val, data):
     self.type = ext_type
     self.flags = []
-    self.flag_int = flag_val
+    self.flag_int = flag_val if flag_val else 0
     self.data = data
 
-    if flag_val % 2 == 1:
+    if flag_val and flag_val % 2 == 1:
       self.flags.append(ExtensionFlag.AFFECTS_VALIDATION)
       flag_val -= 1
 
@@ -284,18 +284,34 @@ class Ed25519CertificateV1(Ed25519Certificate):
   :var bytes key: key content
   :var list extensions: :class:`~stem.descriptor.certificate.Ed25519Extension` in this certificate
   :var bytes signature: certificate signature
+
+  :param bytes signature: pre-calculated certificate signature
+  :param cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey: certificate signing key
   """
 
-  def __init__(self, type_int, expiration, key_type, key, extensions, signature):
+  def __init__(self, cert_type, expiration, key_type, key, extensions, signature = None, signing_key = None):
     super(Ed25519CertificateV1, self).__init__(1)
 
-    self.type, self.type_int = ClientCertType.get(type_int)
+    if not signature and not signing_key:
+      raise ValueError('Certificate signature or signing key is required')
+
+    self.type, self.type_int = ClientCertType.get(cert_type)
     self.expiration = expiration
     self.key_type = key_type
     self.key = key
     self.extensions = extensions
     self.signature = signature
 
+    if signing_key:
+      calculated_sig = signing_key.sign(self.pack())
+
+      # if caller provides both signing key *and* signature then ensure they match
+
+      if self.signature and self.signature != calculated_sig:
+        raise ValueError("Signature calculated from its key (%s) mismatches '%s'" % (calculated_sig, self.signature))
+
+      self.signature = calculated_sig
+
     if self.type in (ClientCertType.LINK, ClientCertType.IDENTITY, ClientCertType.AUTHENTICATE):
       raise ValueError('Ed25519 certificate cannot have a type of %i. This is reserved for CERTS cells.' % self.type_int)
     elif self.type == ClientCertType.ED25519_IDENTITY:
@@ -315,7 +331,10 @@ class Ed25519CertificateV1(Ed25519Certificate):
     for extension in self.extensions:
       encoded += extension.pack()
 
-    return bytes(encoded + self.signature)
+    if self.signature:
+      encoded += self.signature
+
+    return bytes(encoded)
 
   @staticmethod
   def unpack(content):
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index 09030ca5..68d150b1 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -935,7 +935,10 @@ def _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey):
 
   # Now encode all the intro points
   for intro_point in intro_points:
-    final_body += intro_point.encode(descriptor_signing_privkey)
+    if isinstance(intro_point, IntroductionPointV3):
+      final_body += intro_point.encode() + b'\n'
+    else:
+      final_body += intro_point.encode(descriptor_signing_privkey)
 
   return final_body
 
@@ -984,7 +987,7 @@ def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_
   should be attached to the descriptor
   """
 
-  inner_descriptor_layer = _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey)
+  inner_descriptor_layer = stem.util.str_tools._to_bytes(_get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey))
   inner_ciphertext = hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
   inner_ciphertext_b64 = b64_and_wrap_desc_layer(inner_ciphertext, b'encrypted')
 
@@ -1179,7 +1182,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
       raise ImportError('Hidden service descriptor decryption requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
 
     if self._inner_layer is None:
-      blinded_key = self.signing_cert.signing_key()
+      blinded_key = self.signing_cert.signing_key() if self.signing_cert else None
 
       if not blinded_key:
         raise ValueError('No signing key is present')
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index fb190211..060fa4dc 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -3,6 +3,7 @@ Unit tests for stem.descriptor.hidden_service for version 3.
 """
 
 import base64
+import datetime
 import functools
 import hashlib
 import unittest
@@ -13,6 +14,9 @@ import stem.prereq
 
 import test.require
 
+from stem.client.datatype import CertType, LinkByIPv4
+from stem.descriptor.certificate import ExtensionType, Ed25519Extension, Ed25519CertificateV1
+
 from stem.descriptor.hidden_service import (
   CHECKSUM_CONSTANT,
   REQUIRED_V3_FIELDS,
@@ -100,6 +104,28 @@ def _encode_onion_address(ed25519_pub_key_bytes):
 def _helper_get_intro():
   from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
   from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
+  from cryptography.hazmat.primitives import serialization
+
+  def public_key(key):
+    return key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
+
+  onion_key = public_key(X25519PrivateKey.generate())
+  enc_key = public_key(X25519PrivateKey.generate())
+  auth_key = public_key(Ed25519PrivateKey.generate())
+  signing_key = Ed25519PrivateKey.generate()
+
+  expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = 54)
+  extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, public_key(signing_key))]
+
+  auth_key_cert = Ed25519CertificateV1(CertType.HS_V3_INTRO_AUTH, expiration, 1, auth_key, extensions, signing_key = signing_key)
+  enc_key_cert = Ed25519CertificateV1(CertType.HS_V3_NTOR_ENC, expiration, 1, auth_key, extensions, signing_key = signing_key)
+
+  return IntroductionPointV3([LinkByIPv4('1.2.3.4', 9001)], base64.b64encode(onion_key), auth_key_cert, base64.b64encode(enc_key), enc_key_cert, None, None)
+
+
+def _helper_get_intro_old():
+  from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+  from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
 
   link_specifiers = []
 
@@ -370,7 +396,7 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
     desc = HiddenServiceDescriptorV3.from_str(desc_string)
     inner_layer = desc.decrypt(onion_address)
 
-    self.assertEqual(len(inner_layer.introduction_points), 3)
+    self.assertEqual(3, len(inner_layer.introduction_points))
 
     # Match introduction points of the parsed descriptor and the generated
     # descriptor and do some sanity checks between them to make sure that
@@ -380,9 +406,9 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
       original_intro = intro_points[i]
 
       auth_key_1 = Ed25519PublicKey.from_public_bytes(desc_intro.auth_key_cert.key)
-      auth_key_2 = original_intro.auth_key
+      auth_key_2 = Ed25519PublicKey.from_public_bytes(original_intro.auth_key_cert.key)
 
-      self.assertTrue(_pubkeys_are_equal(desc_intro.enc_key(), original_intro.enc_key))
-      self.assertTrue(_pubkeys_are_equal(desc_intro.onion_key(), original_intro.onion_key))
+      self.assertTrue(_pubkeys_are_equal(desc_intro.enc_key(), original_intro.enc_key()))
+      self.assertTrue(_pubkeys_are_equal(desc_intro.onion_key(), original_intro.onion_key()))
 
       self.assertTrue(_pubkeys_are_equal(auth_key_1, auth_key_2))





More information about the tor-commits mailing list