[tor-commits] [stem/master] Refactor and move layer encryption
atagar at torproject.org
atagar at torproject.org
Sun Nov 17 23:40:39 UTC 2019
commit 33a96a6a754ca7e65f1d5741c6f813c447849ef8
Author: Damian Johnson <atagar at torproject.org>
Date: Fri Nov 8 16:48:25 2019 -0800
Refactor and move layer encryption
We already had a _decrypt_layer() helper from the last branch. While doing the
same for encryption it became evedent that these helpers are mostly identical
so refactoring the common crypto into a third function.
Still not perfectly happy, but closer. :P
---
stem/descriptor/hidden_service.py | 54 ++++++++++++++--------
stem/descriptor/hsv3_crypto.py | 95 ---------------------------------------
2 files changed, 36 insertions(+), 113 deletions(-)
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index 5fb7a189..f0baf678 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -445,12 +445,6 @@ def _parse_file(descriptor_file, desc_type = None, validate = False, **kwargs):
def _decrypt_layer(encrypted_block, constant, revision_counter, subcredential, blinded_key):
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
- from cryptography.hazmat.backends import default_backend
-
- def pack(val):
- return struct.pack('>Q', val)
-
if encrypted_block.startswith('-----BEGIN MESSAGE-----\n') and encrypted_block.endswith('\n-----END MESSAGE-----'):
encrypted_block = encrypted_block[24:-22]
@@ -466,22 +460,42 @@ def _decrypt_layer(encrypted_block, constant, revision_counter, subcredential, b
ciphertext = encrypted[SALT_LEN:-MAC_LEN]
expected_mac = encrypted[-MAC_LEN:]
- kdf = hashlib.shake_256(blinded_key + subcredential + pack(revision_counter) + salt + constant)
+ cipher, mac_for = _layer_cipher(constant, revision_counter, subcredential, blinded_key, salt)
+
+ if expected_mac != mac_for(ciphertext):
+ raise ValueError('Malformed mac (expected %s, but was %s)' % (expected_mac, mac_for(ciphertext)))
+
+ decryptor = cipher.decryptor()
+ plaintext = decryptor.update(ciphertext) + decryptor.finalize()
+
+ return stem.util.str_tools._to_unicode(plaintext)
+
+
+def _encrypt_layer(plaintext, constant, revision_counter, subcredential, blinded_key):
+ salt = os.urandom(16)
+ cipher, mac_for = _layer_cipher(constant, revision_counter, subcredential, blinded_key, salt)
+
+ encryptor = cipher.encryptor()
+ ciphertext = encryptor.update(plaintext) + encryptor.finalize()
+
+ return salt + ciphertext + mac_for(ciphertext)
+
+
+def _layer_cipher(constant, revision_counter, subcredential, blinded_key, salt):
+ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+ from cryptography.hazmat.backends import default_backend
+
+ kdf = hashlib.shake_256(blinded_key + subcredential + struct.pack('>Q', revision_counter) + salt + constant)
keys = kdf.digest(S_KEY_LEN + S_IV_LEN + MAC_LEN)
secret_key = keys[:S_KEY_LEN]
secret_iv = keys[S_KEY_LEN:S_KEY_LEN + S_IV_LEN]
mac_key = keys[S_KEY_LEN + S_IV_LEN:]
- mac = hashlib.sha3_256(pack(len(mac_key)) + mac_key + pack(len(salt)) + salt + ciphertext).digest()
-
- if mac != expected_mac:
- raise ValueError('Malformed mac (expected %s, but was %s)' % (expected_mac, mac))
-
cipher = Cipher(algorithms.AES(secret_key), modes.CTR(secret_iv), default_backend())
- decryptor = cipher.decryptor()
+ mac_prefix = struct.pack('>Q', len(mac_key)) + mac_key + struct.pack('>Q', len(salt)) + salt
- return stem.util.str_tools._to_unicode(decryptor.update(ciphertext) + decryptor.finalize())
+ return cipher, lambda ciphertext: hashlib.sha3_256(mac_prefix + ciphertext).digest()
def _parse_protocol_versions_line(descriptor, entries):
@@ -917,18 +931,22 @@ def _get_middle_descriptor_layer_body(encrypted):
b'%s' % (fake_pub_key_bytes_b64, fake_clients, encrypted)
-def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_counter, blinded_key_bytes, subcredential):
+def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_counter, blinded_key, subcredential):
"""
Get the superencrypted blob (which also includes the encrypted blob) that
should be attached to the descriptor
"""
inner_descriptor_layer = stem.util.str_tools._to_bytes('create2-formats 2\n' + '\n'.join(map(IntroductionPointV3.encode, intro_points)) + '\n')
- inner_ciphertext = stem.descriptor.hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
+ inner_ciphertext = _encrypt_layer(inner_descriptor_layer, b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key)
inner_ciphertext_b64 = b64_and_wrap_desc_layer(inner_ciphertext, b'encrypted')
middle_descriptor_layer = _get_middle_descriptor_layer_body(inner_ciphertext_b64)
- outter_ciphertext = stem.descriptor.hsv3_crypto.encrypt_outter_layer(middle_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
+
+ padding_bytes_needed = stem.descriptor.hsv3_crypto._get_padding_needed(len(middle_descriptor_layer))
+ middle_descriptor_layer = middle_descriptor_layer + b'\x00' * padding_bytes_needed
+
+ outter_ciphertext = _encrypt_layer(middle_descriptor_layer, b'hsdir-superencrypted-data', revision_counter, subcredential, blinded_key)
return b64_and_wrap_desc_layer(outter_ciphertext)
@@ -1279,7 +1297,7 @@ class InnerLayer(Descriptor):
super(InnerLayer, self).__init__(content, lazy_load = not validate)
self.outer = outer_layer
- # inner layer begins with a few header fields, followed by multiple any
+ # inner layer begins with a few header fields, followed by any
# number of introduction-points
div = content.find('\nintroduction-point ')
diff --git a/stem/descriptor/hsv3_crypto.py b/stem/descriptor/hsv3_crypto.py
index 80759aa2..b762c5ee 100644
--- a/stem/descriptor/hsv3_crypto.py
+++ b/stem/descriptor/hsv3_crypto.py
@@ -1,7 +1,3 @@
-import hashlib
-import struct
-import os
-
from stem.descriptor import slow_ed25519
@@ -109,82 +105,6 @@ Descriptor encryption
"""
-def pack(val):
- return struct.pack('>Q', val)
-
-
-def get_desc_keys(secret_data, string_constant, subcredential, revision_counter, salt):
- """
- secret_input = SECRET_DATA | subcredential | INT_8(revision_counter)
-
- keys = KDF(secret_input | salt | STRING_CONSTANT, S_KEY_LEN + S_IV_LEN + MAC_KEY_LEN)
-
- SECRET_KEY = first S_KEY_LEN bytes of keys
- SECRET_IV = next S_IV_LEN bytes of keys
- MAC_KEY = last MAC_KEY_LEN bytes of keys
-
- where
-
- 2.5.1.1. First layer encryption logic
- SECRET_DATA = blinded-public-key
- STRING_CONSTANT = "hsdir-superencrypted-data"
-
- 2.5.2.1. Second layer encryption keys
- SECRET_DATA = blinded-public-key | descriptor_cookie
- STRING_CONSTANT = "hsdir-encrypted-data"
- """
-
- secret_input = b'%s%s%s' % (secret_data, subcredential, pack(revision_counter))
-
- kdf = hashlib.shake_256(secret_input + salt + string_constant)
-
- keys = kdf.digest(S_KEY_LEN + S_IV_LEN + MAC_LEN)
-
- secret_key = keys[:S_KEY_LEN]
- secret_iv = keys[S_KEY_LEN:S_KEY_LEN + S_IV_LEN]
- mac_key = keys[S_KEY_LEN + S_IV_LEN:]
-
- return secret_key, secret_iv, mac_key
-
-
-def get_desc_encryption_mac(key, salt, ciphertext):
- mac = hashlib.sha3_256(pack(len(key)) + key + pack(len(salt)) + salt + ciphertext).digest()
- return mac
-
-
-def _encrypt_descriptor_layer(plaintext, revision_counter, subcredential, secret_data, string_constant):
- """
- Encrypt descriptor layer at 'plaintext'
- """
-
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
-
- salt = os.urandom(16)
-
- secret_key, secret_iv, mac_key = get_desc_keys(secret_data, string_constant, subcredential, revision_counter, salt)
-
- # Now time to encrypt descriptor
- cipher = Cipher(algorithms.AES(secret_key), modes.CTR(secret_iv), default_backend())
- encryptor = cipher.encryptor()
- ciphertext = encryptor.update(plaintext) + encryptor.finalize()
-
- mac = get_desc_encryption_mac(mac_key, salt, ciphertext)
-
- return salt + ciphertext + mac
-
-
-def encrypt_inner_layer(plaintext, revision_counter, blinded_key_bytes, subcredential):
- """
- Encrypt the inner layer of the descriptor
- """
-
- secret_data = blinded_key_bytes
- string_constant = b'hsdir-encrypted-data'
-
- return _encrypt_descriptor_layer(plaintext, revision_counter, subcredential, secret_data, string_constant)
-
-
def ceildiv(a, b):
"""
Like // division but return the ceiling instead of the floor
@@ -205,18 +125,3 @@ def _get_padding_needed(plaintext_len):
final_size = ceildiv(plaintext_len, PAD_MULTIPLE_BYTES) * PAD_MULTIPLE_BYTES
return final_size - plaintext_len
-
-
-def encrypt_outter_layer(plaintext, revision_counter, blinded_key_bytes, subcredential):
- """
- Encrypt the outer layer of the descriptor
- """
-
- secret_data = blinded_key_bytes
- string_constant = b'hsdir-superencrypted-data'
-
- # In the outter layer we first need to pad the plaintext
- padding_bytes_needed = _get_padding_needed(len(plaintext))
- padded_plaintext = plaintext + b'\x00' * padding_bytes_needed
-
- return _encrypt_descriptor_layer(padded_plaintext, revision_counter, subcredential, secret_data, string_constant)
More information about the tor-commits
mailing list