[tor-commits] [stem/master] stem.descriptor.networkstatus.KeyCertificate => stem.descriptor.KeyCertificate
atagar at torproject.org
atagar at torproject.org
Sat Oct 13 18:35:44 UTC 2012
commit 92f46907b3d84c53ab10c8f19851d65be86f7b9f
Author: Ravi Chandra Padmala <neenaoffline at gmail.com>
Date: Tue Aug 7 13:19:57 2012 +0530
stem.descriptor.networkstatus.KeyCertificate => stem.descriptor.KeyCertificate
From section 4.7 the dir-spec,
A concatenated set of all the current key certificates should be
available at: http://<hostname>/tor/keys/all.z
The key certificate for this server (if it is an authority) should be
available at: http://<hostname>/tor/keys/authority.z
...
we might want to parse these seperately at some point, so, moving it out
of stem.descriptor.networkstatus
---
stem/descriptor/__init__.py | 108 ++++++++++++++++++++++++++++++++++++++
stem/descriptor/networkstatus.py | 105 +------------------------------------
2 files changed, 110 insertions(+), 103 deletions(-)
diff --git a/stem/descriptor/__init__.py b/stem/descriptor/__init__.py
index 5f9d144..f5da97a 100644
--- a/stem/descriptor/__init__.py
+++ b/stem/descriptor/__init__.py
@@ -24,6 +24,7 @@ __all__ = [
import os
import re
+import datetime
KEYWORD_CHAR = "a-zA-Z0-9-"
WHITESPACE = " \t"
@@ -283,6 +284,12 @@ def _get_descriptor_components(raw_contents, validate, extra_keywords):
return entries, first_keyword, last_keyword, extra_entries
+def _strptime(string, validate = True, optional = False):
+ try:
+ return datetime.datetime.strptime(string, "%Y-%m-%d %H:%M:%S")
+ except ValueError, exc:
+ if validate or not optional: raise exc
+
class DescriptorParser:
"""
Helper class to parse documents.
@@ -423,3 +430,104 @@ class DescriptorParser:
else:
return []
+class KeyCertificate(Descriptor):
+ """
+ Directory key certificate.
+
+ :var str key_certificate_version: **\*** version of the key certificate (Should be "3")
+ :var str ip: IP address on which the directory authority is listening
+ :var int port: port on which the directory authority is listening
+ :var str fingerprint: **\*** hex encoded fingerprint of the authority's identity key
+ :var str identity_key: **\*** long term authority identity key
+ :var datetime published: **\*** time (in GMT) when this document & the key were last generated
+ :var str expires: **\*** time (in GMT) after which this key becomes invalid
+ :var str signing_key: **\*** directory server's public signing key
+ :var str crosscert: signature made using certificate's signing key
+ :var str certification: **\*** signature of this key certificate signed with the identity key
+
+ **\*** attribute is either required when we're parsed with validation or has a default value, others are left as None if undefined
+ """
+
+ def __init__(self, raw_content, validate = True):
+ """
+ Parse a key certificate entry and provide a KeyCertificate object.
+
+ :param str raw_content: raw key certificate information
+ :param bool validate: True if the document is to be validated, False otherwise
+
+ :raises: ValueError if the raw data is invalid
+ """
+
+ super(KeyCertificate, self).__init__(raw_content)
+ self.key_certificate_version, self.ip, self.port = None, None, None
+ self.fingerprint, self.identity_key, self.published = None, None, None
+ self.expires, self.signing_key, self.crosscert = None, None, None
+ self.certification = None
+ parser = DescriptorParser(raw_content, validate)
+ peek_check_kw = lambda keyword: keyword == parser.peek_keyword()
+ seen_keywords = set()
+
+ self.key_certificate_version = parser.read_keyword_line("dir-key-certificate-version")
+ if validate and self.key_certificate_version != "3": raise ValueError("Unrecognized dir-key-certificate-version")
+
+ def _read_keyword_line(keyword):
+ if validate and keyword in seen_keywords:
+ raise ValueError("Invalid key certificate: '%s' appears twice" % keyword)
+ seen_keywords.add(keyword)
+ return parser.read_keyword_line(keyword)
+
+ while parser.line:
+ if peek_check_kw("dir-address"):
+ line = _read_keyword_line("dir-address")
+ try:
+ self.ip, self.port = line.rsplit(":", 1)
+ self.port = int(self.port)
+ except Exception:
+ if validate: raise ValueError("Invalid dir-address line: %s" % line)
+
+ elif peek_check_kw("fingerprint"):
+ self.fingerprint = _read_keyword_line("fingerprint")
+
+ elif peek_check_kw("dir-identity-key"):
+ _read_keyword_line("dir-identity-key")
+ self.identity_key = parser.read_block("RSA PUBLIC KEY")
+
+ elif peek_check_kw("dir-key-published"):
+ self.published = _strptime(_read_keyword_line("dir-key-published"))
+
+ elif peek_check_kw("dir-key-expires"):
+ self.expires = _strptime(_read_keyword_line("dir-key-expires"))
+
+ elif peek_check_kw("dir-signing-key"):
+ _read_keyword_line("dir-signing-key")
+ self.signing_key = parser.read_block("RSA PUBLIC KEY")
+
+ elif peek_check_kw("dir-key-crosscert"):
+ _read_keyword_line("dir-key-crosscert")
+ self.crosscert = parser.read_block("ID SIGNATURE")
+
+ elif peek_check_kw("dir-key-certification"):
+ _read_keyword_line("dir-key-certification")
+ self.certification = parser.read_block("SIGNATURE")
+ break
+
+ elif validate:
+ raise ValueError("Key certificate contains unrecognized lines: %s" % parser.line)
+
+ else:
+ # ignore unrecognized lines if we aren't validating
+ self._unrecognized_lines.append(parser.read_line())
+
+ self._unrecognized_lines = parser.remaining()
+ if self._unrecognized_lines and validate:
+ raise ValueError("Unrecognized trailing data in key certificate")
+
+ def get_unrecognized_lines(self):
+ """
+ Returns any unrecognized lines.
+
+ :returns: a list of unrecognized lines
+ """
+
+ return self._unrecognized_lines
+
diff --git a/stem/descriptor/networkstatus.py b/stem/descriptor/networkstatus.py
index 876acd9..988c71c 100644
--- a/stem/descriptor/networkstatus.py
+++ b/stem/descriptor/networkstatus.py
@@ -249,7 +249,7 @@ class DirectoryAuthority(stem.descriptor.Descriptor):
:var int orport: current orport
:var str contact: directory authority's contact information
:var str legacy_dir_key: fingerprint of and obsolete identity key
- :var :class:`stem.descriptor.networkstatus_descriptor.KeyCertificate` key_certificate: directory authority's current key certificate
+ :var :class:`stem.descriptor.KeyCertificate` key_certificate: directory authority's current key certificate
:var str vote_digest: digest of the authority that contributed to the consensus
"""
@@ -278,114 +278,13 @@ class DirectoryAuthority(stem.descriptor.Descriptor):
self.contact = parser.read_keyword_line("contact")
if vote:
self.legacy_dir_key = parser.read_keyword_line("legacy-dir-key", True)
- self.key_certificate = KeyCertificate("\n".join(parser.remaining()), validate)
+ self.key_certificate = stem.descriptor.KeyCertificate("\n".join(parser.remaining()), validate)
else:
self.vote_digest = parser.read_keyword_line("vote-digest", True)
rmng = parser.remaining()
if rmng and validate:
raise ValueError("Unrecognized trailing data in directory authority information")
-class KeyCertificate(stem.descriptor.Descriptor):
- """
- Directory key certificate.
-
- :var str key_certificate_version: **\*** version of the key certificate (Should be "3")
- :var str ip: IP address on which the directory authority is listening
- :var int port: port on which the directory authority is listening
- :var str fingerprint: **\*** hex encoded fingerprint of the authority's identity key
- :var str identity_key: **\*** long term authority identity key
- :var datetime published: **\*** time (in GMT) when this document & the key were last generated
- :var str expires: **\*** time (in GMT) after which this key becomes invalid
- :var str signing_key: **\*** directory server's public signing key
- :var str crosscert: signature made using certificate's signing key
- :var str certification: **\*** signature of this key certificate signed with the identity key
-
- **\*** attribute is either required when we're parsed with validation or has a default value, others are left as None if undefined
- """
-
- def __init__(self, raw_content, validate = True):
- """
- Parse a key certificate entry and provide a KeyCertificate object.
-
- :param str raw_content: raw key certificate information
- :param bool validate: True if the document is to be validated, False otherwise
-
- :raises: ValueError if the raw data is invalid
- """
-
- super(KeyCertificate, self).__init__(raw_content)
- self.key_certificate_version, self.ip, self.port = None, None, None
- self.fingerprint, self.identity_key, self.published = None, None, None
- self.expires, self.signing_key, self.crosscert = None, None, None
- self.certification = None
- parser = stem.descriptor.DescriptorParser(raw_content, validate)
- peek_check_kw = lambda keyword: keyword == parser.peek_keyword()
- seen_keywords = set()
-
- self.key_certificate_version = parser.read_keyword_line("dir-key-certificate-version")
- if validate and self.key_certificate_version != "3": raise ValueError("Unrecognized dir-key-certificate-version")
-
- def _read_keyword_line(keyword):
- if validate and keyword in seen_keywords:
- raise ValueError("Invalid key certificate: '%s' appears twice" % keyword)
- seen_keywords.add(keyword)
- return parser.read_keyword_line(keyword)
-
- while parser.line:
- if peek_check_kw("dir-address"):
- line = _read_keyword_line("dir-address")
- try:
- self.ip, self.port = line.rsplit(":", 1)
- self.port = int(self.port)
- except Exception:
- if validate: raise ValueError("Invalid dir-address line: %s" % line)
-
- elif peek_check_kw("fingerprint"):
- self.fingerprint = _read_keyword_line("fingerprint")
-
- elif peek_check_kw("dir-identity-key"):
- _read_keyword_line("dir-identity-key")
- self.identity_key = parser.read_block("RSA PUBLIC KEY")
-
- elif peek_check_kw("dir-key-published"):
- self.published = _strptime(_read_keyword_line("dir-key-published"))
-
- elif peek_check_kw("dir-key-expires"):
- self.expires = _strptime(_read_keyword_line("dir-key-expires"))
-
- elif peek_check_kw("dir-signing-key"):
- _read_keyword_line("dir-signing-key")
- self.signing_key = parser.read_block("RSA PUBLIC KEY")
-
- elif peek_check_kw("dir-key-crosscert"):
- _read_keyword_line("dir-key-crosscert")
- self.crosscert = parser.read_block("ID SIGNATURE")
-
- elif peek_check_kw("dir-key-certification"):
- _read_keyword_line("dir-key-certification")
- self.certification = parser.read_block("SIGNATURE")
- break
-
- elif validate:
- raise ValueError("Key certificate contains unrecognized lines: %s" % parser.line)
-
- else:
- # ignore unrecognized lines if we aren't validating
- self._unrecognized_lines.append(parser.read_line())
-
- self._unrecognized_lines = parser.remaining()
- if self._unrecognized_lines and validate:
- raise ValueError("Unrecognized trailing data in key certificate")
-
- def get_unrecognized_lines(self):
- """
- Returns any unrecognized lines.
-
- :returns: a list of unrecognized lines
- """
-
- return self._unrecognized_lines
-
class DirectorySignature(stem.descriptor.Descriptor):
"""
Contains directory signatures in a v3 network status document.
More information about the tor-commits
mailing list