[tor-commits] [stem/master] Helper for generating public bytes
atagar at torproject.org
atagar at torproject.org
Sun Nov 17 23:40:39 UTC 2019
commit 1fbf2c0fc0bb2daa1dc354c75d183adfac71337b
Author: Damian Johnson <atagar at torproject.org>
Date: Thu Nov 7 14:20:38 2019 -0800
Helper for generating public bytes
We convert keys to bytes in enough places that this warrants a helper. Oddly
the HSv3PublicBlindedKey class didn't actually implement its public_bytes()
method so replacing its calls.
---
stem/descriptor/hidden_service.py | 46 +++++++++----------------------
stem/descriptor/hsv3_crypto.py | 5 +---
stem/util/__init__.py | 30 ++++++++++++++++++++
test/unit/descriptor/hidden_service_v3.py | 18 ++++--------
4 files changed, 50 insertions(+), 49 deletions(-)
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index a052d965..7b6651b7 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -43,6 +43,7 @@ import stem.client.datatype
import stem.descriptor.certificate
import stem.descriptor.hsv3_crypto
import stem.prereq
+import stem.util
import stem.util.connection
import stem.util.str_tools
import stem.util.tor_tools
@@ -225,7 +226,7 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
if not stem.prereq.is_crypto_available(ed25519 = True):
raise ImportError('Introduction point creation requires the cryptography module ed25519 support')
- if not stem.util.connection.is_valid_port(port):
+ elif not stem.util.connection.is_valid_port(port):
raise ValueError("'%s' is an invalid port" % port)
if stem.util.connection.is_valid_ipv4_address(address):
@@ -235,35 +236,18 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
else:
raise ValueError("'%s' is not a valid IPv4 or IPv6 address" % address)
- from cryptography.hazmat.primitives import serialization
- from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
- from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
-
- def _key_bytes(key):
- if isinstance(key, str):
- return key
- elif isinstance(key, (X25519PrivateKey, Ed25519PrivateKey)):
- return key.public_key().public_bytes(
- encoding = serialization.Encoding.Raw,
- format = serialization.PublicFormat.Raw,
- )
- elif isinstance(key, (X25519PublicKey, Ed25519PublicKey)):
- return key.public_bytes(
- encoding = serialization.Encoding.Raw,
- format = serialization.PublicFormat.Raw,
- )
- else:
- raise ValueError('Key must be a string or cryptographic public/private key (was %s)' % type(key).__name__)
+ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+ from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
if expiration is None:
expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = stem.descriptor.certificate.DEFAULT_EXPIRATION_HOURS)
- onion_key = base64.b64encode(_key_bytes(onion_key if onion_key else X25519PrivateKey.generate()))
- enc_key = base64.b64encode(_key_bytes(enc_key if enc_key else X25519PrivateKey.generate()))
- auth_key = _key_bytes(auth_key if auth_key else Ed25519PrivateKey.generate())
+ onion_key = base64.b64encode(stem.util._pubkey_bytes(onion_key if onion_key else X25519PrivateKey.generate()))
+ enc_key = base64.b64encode(stem.util._pubkey_bytes(enc_key if enc_key else X25519PrivateKey.generate()))
+ auth_key = stem.util._pubkey_bytes(auth_key if auth_key else Ed25519PrivateKey.generate())
signing_key = signing_key if signing_key else Ed25519PrivateKey.generate()
- extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, _key_bytes(signing_key))]
+ extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, stem.util._pubkey_bytes(signing_key))]
auth_key_cert = Ed25519CertificateV1(CertType.HS_V3_INTRO_AUTH, expiration, 1, auth_key, extensions, signing_key = signing_key)
enc_key_cert = Ed25519CertificateV1(CertType.HS_V3_NTOR_ENC, expiration, 1, auth_key, extensions, signing_key = signing_key)
@@ -861,13 +845,11 @@ def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key
'blinded_priv_key' key that signs the certificate
"""
- from cryptography.hazmat.primitives import serialization
-
# 54 hours expiration date like tor does
expiration_date = datetime.datetime.utcnow() + datetime.timedelta(hours=54)
- signing_key = descriptor_signing_public_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
- extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, blinded_priv_key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))]
+ signing_key = stem.util._pubkey_bytes(descriptor_signing_public_key)
+ extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, blinded_priv_key.public_key().public_key)]
desc_signing_cert = Ed25519CertificateV1(CertType.HS_V3_DESC_SIGNING, expiration_date, 1, signing_key, extensions, signing_key = blinded_priv_key)
@@ -932,10 +914,9 @@ def _get_middle_descriptor_layer_body(encrypted):
"""
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
- from cryptography.hazmat.primitives import serialization
fake_pub_key = X25519PrivateKey.generate().public_key()
- fake_pub_key_bytes = fake_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
+ fake_pub_key_bytes = stem.util._pubkey_bytes(fake_pub_key)
fake_pub_key_bytes_b64 = base64.b64encode(fake_pub_key_bytes)
fake_clients = _get_fake_clients_bytes()
@@ -1012,7 +993,6 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
"""
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
- from cryptography.hazmat.primitives import serialization
if sign:
raise NotImplementedError('Signing of %s not implemented' % cls.__name__)
@@ -1048,12 +1028,12 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
# Get the identity public key
public_identity_key = ed25519_private_identity_key.public_key()
- public_identity_key_bytes = public_identity_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
+ public_identity_key_bytes = stem.util._pubkey_bytes(public_identity_key)
# Blind the identity key to get ephemeral blinded key
blinded_privkey = stem.descriptor.hsv3_crypto.HSv3PrivateBlindedKey(ed25519_private_identity_key, blinding_param = blinding_param)
blinded_pubkey = blinded_privkey.public_key()
- blinded_pubkey_bytes = blinded_pubkey.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
+ blinded_pubkey_bytes = blinded_pubkey.public_key
# Generate descriptor signing key
signing_key = Ed25519PrivateKey.generate()
diff --git a/stem/descriptor/hsv3_crypto.py b/stem/descriptor/hsv3_crypto.py
index 69c8c185..73654866 100644
--- a/stem/descriptor/hsv3_crypto.py
+++ b/stem/descriptor/hsv3_crypto.py
@@ -42,16 +42,13 @@ class HSv3PrivateBlindedKey(object):
return self.blinded_public_key
def sign(self, msg):
- return ed25519_exts_ref.signatureWithESK(msg, self.blinded_secret_key, self.blinded_public_key.public_bytes())
+ return ed25519_exts_ref.signatureWithESK(msg, self.blinded_secret_key, self.blinded_public_key.public_key)
class HSv3PublicBlindedKey(object):
def __init__(self, public_key):
self.public_key = public_key
- def public_bytes(self, encoding=None, format=None):
- return self.public_key
-
def verify(self, signature, message):
"""
raises exception if sig not valid
diff --git a/stem/util/__init__.py b/stem/util/__init__.py
index bff894dc..a8870499 100644
--- a/stem/util/__init__.py
+++ b/stem/util/__init__.py
@@ -127,6 +127,36 @@ def datetime_to_unix(timestamp):
return (timestamp - datetime.datetime(1970, 1, 1)).total_seconds()
+def _pubkey_bytes(key):
+ """
+ Normalizes X25509 and ED25519 keys into their public key bytes.
+ """
+
+ if not stem.prereq.is_crypto_available():
+ raise ImportError('Key normalization requires the cryptography module')
+ elif not stem.prereq.is_crypto_available(ed25519 = True):
+ raise ImportError('Key normalization requires the cryptography ed25519 support')
+
+ from cryptography.hazmat.primitives import serialization
+ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
+ from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
+
+ if isinstance(key, str):
+ return key
+ elif isinstance(key, (X25519PrivateKey, Ed25519PrivateKey)):
+ return key.public_key().public_bytes(
+ encoding = serialization.Encoding.Raw,
+ format = serialization.PublicFormat.Raw,
+ )
+ elif isinstance(key, (X25519PublicKey, Ed25519PublicKey)):
+ return key.public_bytes(
+ encoding = serialization.Encoding.Raw,
+ format = serialization.PublicFormat.Raw,
+ )
+ else:
+ raise ValueError('Key must be a string or cryptographic public/private key (was %s)' % type(key).__name__)
+
+
def _hash_attr(obj, *attributes, **kwargs):
"""
Provide a hash value for the given set of attributes.
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index b0f1829a..d7b32485 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -63,11 +63,6 @@ with open(get_resource('hidden_service_v3_intro_point')) as intro_point_file:
INTRO_POINT_STR = intro_point_file.read()
-def key_bytes(key):
- from cryptography.hazmat.primitives import serialization
- return key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
-
-
class TestHiddenServiceDescriptorV3(unittest.TestCase):
def test_real_descriptor(self):
"""
@@ -258,8 +253,8 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
self.assertTrue(isinstance(intro_point.onion_key(), X25519PublicKey))
self.assertTrue(isinstance(intro_point.enc_key(), X25519PublicKey))
- self.assertEqual(intro_point.onion_key_raw, base64.b64encode(key_bytes(intro_point.onion_key())))
- self.assertEqual(intro_point.enc_key_raw, base64.b64encode(key_bytes(intro_point.enc_key())))
+ self.assertEqual(intro_point.onion_key_raw, base64.b64encode(stem.util._pubkey_bytes(intro_point.onion_key())))
+ self.assertEqual(intro_point.enc_key_raw, base64.b64encode(stem.util._pubkey_bytes(intro_point.enc_key())))
self.assertEqual(None, intro_point.legacy_key_raw)
self.assertEqual(None, intro_point.legacy_key())
@@ -296,9 +291,8 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
# Build the service
private_identity_key = Ed25519PrivateKey.from_private_bytes(b'a' * 32)
public_identity_key = private_identity_key.public_key()
- pubkey_bytes = key_bytes(public_identity_key)
- onion_address = HiddenServiceDescriptorV3.address_from_public_key(pubkey_bytes)
+ onion_address = HiddenServiceDescriptorV3.address_from_public_key(stem.util._pubkey_bytes(public_identity_key))
intro_points = [
IntroductionPointV3.create('1.1.1.1', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
@@ -326,6 +320,6 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
self.assertEqual(original.onion_key_raw, intro_point.onion_key_raw)
self.assertEqual(original.auth_key_cert.key, intro_point.auth_key_cert.key)
- self.assertEqual(intro_point.enc_key_raw, base64.b64encode(key_bytes(intro_point.enc_key())))
- self.assertEqual(intro_point.onion_key_raw, base64.b64encode(key_bytes(intro_point.onion_key())))
- self.assertEqual(intro_point.auth_key_cert.key, key_bytes(intro_point.auth_key()))
+ self.assertEqual(intro_point.enc_key_raw, base64.b64encode(stem.util._pubkey_bytes(intro_point.enc_key())))
+ self.assertEqual(intro_point.onion_key_raw, base64.b64encode(stem.util._pubkey_bytes(intro_point.onion_key())))
+ self.assertEqual(intro_point.auth_key_cert.key, stem.util._pubkey_bytes(intro_point.auth_key()))
More information about the tor-commits
mailing list