[tor-commits] [stem/master] Initial KeyCertificate implementation

atagar at torproject.org atagar at torproject.org
Sat Oct 13 18:35:45 UTC 2012


commit 6ee5abbe5c1137b4a311d5b7ef5003874982008e
Author: Damian Johnson <atagar at torproject.org>
Date:   Sun Sep 23 14:20:49 2012 -0700

    Initial KeyCertificate implementation
    
    First stab at parsing the authority section's key certificates. This is
    completely untested, next step is to write some unit tests for it.
---
 stem/descriptor/networkstatus.py |  170 ++++++++++++++++++++++++++++++++++++++
 1 files changed, 170 insertions(+), 0 deletions(-)

diff --git a/stem/descriptor/networkstatus.py b/stem/descriptor/networkstatus.py
index 17a9877..46adbfe 100644
--- a/stem/descriptor/networkstatus.py
+++ b/stem/descriptor/networkstatus.py
@@ -106,6 +106,21 @@ DEFAULT_PARAMS = {
   "cbtinitialtimeout": 60000,
 }
 
+# KeyCertificate fields, tuple is of the form...
+# (keyword, is_mandatory)
+
+KEY_CERTIFICATE_PARAMS = (
+  ('dir-key-certificate-version', True),
+  ('dir-address', False),
+  ('fingerprint', True),
+  ('dir-identity-key', True),
+  ('dir-key-published', True),
+  ('dir-key-expires', True),
+  ('dir-signing-key', True),
+  ('dir-key-crosscert', False),
+  ('dir-key-certification', True),
+)
+
 BANDWIDTH_WEIGHT_ENTRIES = (
   "Wbd", "Wbe", "Wbg", "Wbm",
   "Wdb",
@@ -768,6 +783,161 @@ class DirectoryAuthority(stem.descriptor.Descriptor):
     
     return self.unrecognized_lines
 
+class KeyCertificate(stem.descriptor.Descriptor):
+  """
+  Directory key certificate for a v3 network status document.
+  
+  :var int version: **\*** version of the key certificate
+  :var str address: authority's IP address
+  :var int dir_port: authority's DirPort
+  :var str fingerprint: **\*** authority's fingerprint
+  :var str identity_key: **\*** long term authority identity key
+  :var datetime published: **\*** time when this key was generated
+  :var datetime expires: **\*** time after which this key becomes invalid
+  :var str signing_key: **\*** directory server's public signing key
+  :var str crosscert: signature made using certificate's signing key
+  :var str certification: **\*** signature of this key certificate signed with the identity key
+  
+  **\*** mandatory attribute
+  """
+  
+  def __init__(self, raw_content, validate):
+    super(KeyCertificate, self).__init__(raw_content)
+    
+    self.version = None
+    self.address = None
+    self.dir_port = None
+    self.fingerprint = None
+    self.identity_key = None
+    self.published = None
+    self.expires = None
+    self.signing_key = None
+    self.crosscert = None
+    self.certification = None
+    
+    self._unrecognized_lines = []
+    
+    self._parse(raw_contents, validate)
+  
+  def _parse(self, content, validate):
+    """
+    Parses the given content and applies the attributes.
+    
+    :param str content: descriptor content
+    :param bool validate: checks validity if True
+    
+    :raises: ValueError if a validity check fails
+    """
+    
+    entries, first_keyword, last_keyword, _ = stem.descriptor._get_descriptor_components(content, validate)
+    
+    if validate:
+      if first_keyword != 'dir-key-certificate-version':
+        raise ValueError("Key certificates must start with a 'dir-key-certificate-version' line:\n%s" % (content))
+      elif last_keyword != 'dir-key-certification':
+        raise ValueError("Key certificates must end with a 'dir-key-certification' line:\n%s" % (content))
+      
+      # check that we have mandatory fields and that our known fields only
+      # appear once
+      
+      for keyword, is_mandatory in KEY_CERTIFICATE_PARAMS:
+        if is_mandatory and not keyword in entries:
+          raise ValueError("Key certificates must have a '%s' line:\n%s" % (keyword, content))
+        
+        entry_count = len(entries.get(keyword, []))
+        if entry_count > 1:
+          raise ValueError("Key certificates can only have a single '%s' line, got %i:\n%s" % (keyword, entry_count, content))
+      
+      # Check that our field's order matches the spec. This isn't explicitely
+      # stated in the spec, but the network status document requires a specific
+      # order so it stands to reason that the key certificate (which is in it)
+      # needs ot match a prescribed order too.
+      
+      fields = [attr[0] for attr in KEY_CERTIFICATE_PARAMS]
+      _check_for_misordered_fields(entries, fields)
+    
+    for keyword, values in entries.items():
+      value, block_contents = values[0]
+      line = "%s %s" % (keyword, value)
+      
+      if keyword == 'dir-key-certificate-version':
+        # "dir-key-certificate-version" version
+        
+        if not value.isdigit():
+          if not validate: continue
+          raise ValueError("Key certificate has a non-integer version: %s" % line)
+        
+        self.version = int(value)
+        
+        if validate and self.version != 3:
+          raise ValueError("Expected a version 3 key certificate, got version '%i' instead" % self.version)
+      elif keyword == 'dir-address':
+        # "dir-address" IPPort
+        
+        if not ':' in value:
+          if not validate: continue
+          raise ValueError("Key certificate's 'dir-address' is expected to be of the form ADDRESS:PORT: %s" % line)
+        
+        address, dirport = value.split(':', 1)
+        
+        if validate:
+          if not stem.util.connection.is_valid_ip_address(address):
+            raise ValueError("Key certificate's address isn't a valid IPv4 address: %s" % line)
+          elif not stem.util.connection.is_valid_port(dirport):
+            raise ValueError("Key certificate's dirport is invalid: %s" % line)
+        
+        self.address = address
+        self.dir_port = dirport
+      elif keyword == 'fingerprint':
+        # "fingerprint" fingerprint
+        
+        if validate and not stem.util.tor_tools.is_valid_fingerprint(value):
+          raise ValueError("Key certificate's fingerprint is malformed: %s" % line)
+        
+        self.fingerprint = value
+      elif keyword in ('dir-key-published', 'dir-key-expires'):
+        # "dir-key-published" YYYY-MM-DD HH:MM:SS
+        # "dir-key-expires" YYYY-MM-DD HH:MM:SS
+        
+        try:
+          date_value = datetime.datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
+          
+          if keyword == 'dir-key-published':
+            self.published = date_value
+          elif keyword == 'dir-key-expires':
+            self.expires = date_value
+        except ValueError:
+          if validate:
+            raise ValueError("Key certificate's '%s' time wasn't parseable: %s" % (keyword, value))
+      elif keyword in ('dir-identity-key', 'dir-signing-key', 'dir-key-crosscert', 'dir-key-certification'):
+        # "dir-identity-key" NL a public key in PEM format
+        # "dir-signing-key" NL a key in PEM format
+        # "dir-key-crosscert" NL CrossSignature
+        # "dir-key-certification" NL Signature
+        
+        if validate and not block_contents:
+          raise ValueError("Key certificate's '%s' line must be followed by a key block: %s" % (keyword, line))
+        
+        if keyword == 'dir-identity-key':
+          self.identity_key = block_contents
+        elif keyword == 'dir-signing-key':
+          self.signing_key = block_contents
+        elif keyword == 'dir-key-crosscert':
+          self.crosscert = block_contents
+        elif keyword == 'dir-key-certification':
+          self.certification = block_contents
+      else:
+        self._unrecognized_lines.append(line)
+  
+  def get_unrecognized_lines(self):
+    """
+    Returns any unrecognized lines.
+    
+    :returns: a list of unrecognized lines
+    """
+    
+    return self.unrecognized_lines
+
 # TODO: microdescriptors have a slightly different format (including a
 # 'method') - should probably be a subclass
 class DocumentSignature(object):





More information about the tor-commits mailing list