[tor-commits] [stem/master] Initial KeyCertificate implementation
atagar at torproject.org
atagar at torproject.org
Sat Oct 13 18:35:45 UTC 2012
commit 6ee5abbe5c1137b4a311d5b7ef5003874982008e
Author: Damian Johnson <atagar at torproject.org>
Date: Sun Sep 23 14:20:49 2012 -0700
Initial KeyCertificate implementation
First stab at parsing the authority section's key certificates. This is
completely untested, next step is to write some unit tests for it.
---
stem/descriptor/networkstatus.py | 170 ++++++++++++++++++++++++++++++++++++++
1 files changed, 170 insertions(+), 0 deletions(-)
diff --git a/stem/descriptor/networkstatus.py b/stem/descriptor/networkstatus.py
index 17a9877..46adbfe 100644
--- a/stem/descriptor/networkstatus.py
+++ b/stem/descriptor/networkstatus.py
@@ -106,6 +106,21 @@ DEFAULT_PARAMS = {
"cbtinitialtimeout": 60000,
}
+# KeyCertificate fields, tuple is of the form...
+# (keyword, is_mandatory)
+
+KEY_CERTIFICATE_PARAMS = (
+ ('dir-key-certificate-version', True),
+ ('dir-address', False),
+ ('fingerprint', True),
+ ('dir-identity-key', True),
+ ('dir-key-published', True),
+ ('dir-key-expires', True),
+ ('dir-signing-key', True),
+ ('dir-key-crosscert', False),
+ ('dir-key-certification', True),
+)
+
BANDWIDTH_WEIGHT_ENTRIES = (
"Wbd", "Wbe", "Wbg", "Wbm",
"Wdb",
@@ -768,6 +783,161 @@ class DirectoryAuthority(stem.descriptor.Descriptor):
return self.unrecognized_lines
+class KeyCertificate(stem.descriptor.Descriptor):
+ """
+ Directory key certificate for a v3 network status document.
+
+ :var int version: **\*** version of the key certificate
+ :var str address: authority's IP address
+ :var int dir_port: authority's DirPort
+ :var str fingerprint: **\*** authority's fingerprint
+ :var str identity_key: **\*** long term authority identity key
+ :var datetime published: **\*** time when this key was generated
+ :var datetime expires: **\*** time after which this key becomes invalid
+ :var str signing_key: **\*** directory server's public signing key
+ :var str crosscert: signature made using certificate's signing key
+ :var str certification: **\*** signature of this key certificate signed with the identity key
+
+ **\*** mandatory attribute
+ """
+
+ def __init__(self, raw_content, validate):
+ super(KeyCertificate, self).__init__(raw_content)
+
+ self.version = None
+ self.address = None
+ self.dir_port = None
+ self.fingerprint = None
+ self.identity_key = None
+ self.published = None
+ self.expires = None
+ self.signing_key = None
+ self.crosscert = None
+ self.certification = None
+
+ self._unrecognized_lines = []
+
+ self._parse(raw_contents, validate)
+
+ def _parse(self, content, validate):
+ """
+ Parses the given content and applies the attributes.
+
+ :param str content: descriptor content
+ :param bool validate: checks validity if True
+
+ :raises: ValueError if a validity check fails
+ """
+
+ entries, first_keyword, last_keyword, _ = stem.descriptor._get_descriptor_components(content, validate)
+
+ if validate:
+ if first_keyword != 'dir-key-certificate-version':
+ raise ValueError("Key certificates must start with a 'dir-key-certificate-version' line:\n%s" % (content))
+ elif last_keyword != 'dir-key-certification':
+ raise ValueError("Key certificates must end with a 'dir-key-certification' line:\n%s" % (content))
+
+ # check that we have mandatory fields and that our known fields only
+ # appear once
+
+ for keyword, is_mandatory in KEY_CERTIFICATE_PARAMS:
+ if is_mandatory and not keyword in entries:
+ raise ValueError("Key certificates must have a '%s' line:\n%s" % (keyword, content))
+
+ entry_count = len(entries.get(keyword, []))
+ if entry_count > 1:
+ raise ValueError("Key certificates can only have a single '%s' line, got %i:\n%s" % (keyword, entry_count, content))
+
+ # Check that our field's order matches the spec. This isn't explicitely
+ # stated in the spec, but the network status document requires a specific
+ # order so it stands to reason that the key certificate (which is in it)
+ # needs ot match a prescribed order too.
+
+ fields = [attr[0] for attr in KEY_CERTIFICATE_PARAMS]
+ _check_for_misordered_fields(entries, fields)
+
+ for keyword, values in entries.items():
+ value, block_contents = values[0]
+ line = "%s %s" % (keyword, value)
+
+ if keyword == 'dir-key-certificate-version':
+ # "dir-key-certificate-version" version
+
+ if not value.isdigit():
+ if not validate: continue
+ raise ValueError("Key certificate has a non-integer version: %s" % line)
+
+ self.version = int(value)
+
+ if validate and self.version != 3:
+ raise ValueError("Expected a version 3 key certificate, got version '%i' instead" % self.version)
+ elif keyword == 'dir-address':
+ # "dir-address" IPPort
+
+ if not ':' in value:
+ if not validate: continue
+ raise ValueError("Key certificate's 'dir-address' is expected to be of the form ADDRESS:PORT: %s" % line)
+
+ address, dirport = value.split(':', 1)
+
+ if validate:
+ if not stem.util.connection.is_valid_ip_address(address):
+ raise ValueError("Key certificate's address isn't a valid IPv4 address: %s" % line)
+ elif not stem.util.connection.is_valid_port(dirport):
+ raise ValueError("Key certificate's dirport is invalid: %s" % line)
+
+ self.address = address
+ self.dir_port = dirport
+ elif keyword == 'fingerprint':
+ # "fingerprint" fingerprint
+
+ if validate and not stem.util.tor_tools.is_valid_fingerprint(value):
+ raise ValueError("Key certificate's fingerprint is malformed: %s" % line)
+
+ self.fingerprint = value
+ elif keyword in ('dir-key-published', 'dir-key-expires'):
+ # "dir-key-published" YYYY-MM-DD HH:MM:SS
+ # "dir-key-expires" YYYY-MM-DD HH:MM:SS
+
+ try:
+ date_value = datetime.datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
+
+ if keyword == 'dir-key-published':
+ self.published = date_value
+ elif keyword == 'dir-key-expires':
+ self.expires = date_value
+ except ValueError:
+ if validate:
+ raise ValueError("Key certificate's '%s' time wasn't parseable: %s" % (keyword, value))
+ elif keyword in ('dir-identity-key', 'dir-signing-key', 'dir-key-crosscert', 'dir-key-certification'):
+ # "dir-identity-key" NL a public key in PEM format
+ # "dir-signing-key" NL a key in PEM format
+ # "dir-key-crosscert" NL CrossSignature
+ # "dir-key-certification" NL Signature
+
+ if validate and not block_contents:
+ raise ValueError("Key certificate's '%s' line must be followed by a key block: %s" % (keyword, line))
+
+ if keyword == 'dir-identity-key':
+ self.identity_key = block_contents
+ elif keyword == 'dir-signing-key':
+ self.signing_key = block_contents
+ elif keyword == 'dir-key-crosscert':
+ self.crosscert = block_contents
+ elif keyword == 'dir-key-certification':
+ self.certification = block_contents
+ else:
+ self._unrecognized_lines.append(line)
+
+ def get_unrecognized_lines(self):
+ """
+ Returns any unrecognized lines.
+
+ :returns: a list of unrecognized lines
+ """
+
+ return self.unrecognized_lines
+
# TODO: microdescriptors have a slightly different format (including a
# 'method') - should probably be a subclass
class DocumentSignature(object):
More information about the tor-commits
mailing list