[tor-commits] [stem/master] Support for STREAM events

atagar at torproject.org atagar at torproject.org
Mon Dec 3 02:35:44 UTC 2012


commit d735e1e267877d9e9e7fa0686c2cd49c63930f0b
Author: Damian Johnson <atagar at torproject.org>
Date:   Sun Nov 18 11:48:00 2012 -0800

    Support for STREAM events
    
    Implementaton and tests for STREAM events. I got the test data by...
    
    * starting TBB
    
    * used netstat to get the control port (shouldn't have needed to do this -
      https://trac.torproject.org/7512)
    
    * connecting to it with telnet
      * AUTHENTICATE
      * SETEVENTS STREAM
    
    * visited google's front page in firefox
    
    Full test data:
    
    AUTHENTICATE
    250 OK
    SETEVENTS STREAM
    250 OK
    650 STREAM 18 NEW 0 encrypted.google.com:443 SOURCE_ADDR=127.0.0.1:47849 PURPOSE=USER
    650 STREAM 18 SENTCONNECT 26 encrypted.google.com:443
    650 STREAM 19 NEW 0 encrypted.google.com:443 SOURCE_ADDR=127.0.0.1:47850 PURPOSE=USER
    650 STREAM 19 SENTCONNECT 26 encrypted.google.com:443
    650 STREAM 18 REMAP 26 74.125.227.129:443 SOURCE=EXIT
    650 STREAM 18 SUCCEEDED 26 74.125.227.129:443
    650 STREAM 19 REMAP 26 74.125.227.129:443 SOURCE=EXIT
    650 STREAM 19 SUCCEEDED 26 74.125.227.129:443
    650 STREAM 20 NEW 0 encrypted.google.com:443 SOURCE_ADDR=127.0.0.1:47851 PURPOSE=USER
    650 STREAM 20 REMAP 0 74.125.227.129:443 SOURCE=CACHE
    650 STREAM 20 SENTCONNECT 26 74.125.227.129:443
    650 STREAM 21 NEW 0 encrypted.google.com:443 SOURCE_ADDR=127.0.0.1:47852 PURPOSE=USER
    650 STREAM 21 REMAP 0 74.125.227.129:443 SOURCE=CACHE
    650 STREAM 21 SENTCONNECT 26 74.125.227.129:443
    650 STREAM 20 REMAP 26 74.125.227.129:443 SOURCE=EXIT
    650 STREAM 20 SUCCEEDED 26 74.125.227.129:443
    650 STREAM 21 REMAP 26 74.125.227.129:443 SOURCE=EXIT
    650 STREAM 21 SUCCEEDED 26 74.125.227.129:443
    650 STREAM 22 NEW 0 www.google.com:443 SOURCE_ADDR=127.0.0.1:47853 PURPOSE=USER
    650 STREAM 22 SENTCONNECT 26 www.google.com:443
    650 STREAM 23 NEW 0 www.google.com:443 SOURCE_ADDR=127.0.0.1:47854 PURPOSE=USER
    650 STREAM 23 SENTCONNECT 26 www.google.com:443
    650 STREAM 21 CLOSED 26 74.125.227.129:443 REASON=CONNRESET
    650 STREAM 20 CLOSED 26 74.125.227.129:443 REASON=CONNRESET
    650 STREAM 22 REMAP 26 74.125.227.147:443 SOURCE=EXIT
    650 STREAM 22 SUCCEEDED 26 74.125.227.147:443
    650 STREAM 23 REMAP 26 74.125.227.147:443 SOURCE=EXIT
    650 STREAM 23 SUCCEEDED 26 74.125.227.147:443
    650 STREAM 24 NEW 0 ocsp.thawte.com:80 SOURCE_ADDR=127.0.0.1:47855 PURPOSE=USER
    650 STREAM 24 SENTCONNECT 26 ocsp.thawte.com:80
    650 STREAM 25 NEW 0 ocsp.thawte.com:80 SOURCE_ADDR=127.0.0.1:47856 PURPOSE=USER
    650 STREAM 25 SENTCONNECT 26 ocsp.thawte.com:80
    650 STREAM 24 REMAP 26 199.7.52.72:80 SOURCE=EXIT
    650 STREAM 24 SUCCEEDED 26 199.7.52.72:80
    650 STREAM 25 REMAP 26 199.7.52.72:80 SOURCE=EXIT
    650 STREAM 25 SUCCEEDED 26 199.7.52.72:80
    650 STREAM 26 NEW 0 ssl.gstatic.com:443 SOURCE_ADDR=127.0.0.1:47857 PURPOSE=USER
    650 STREAM 26 SENTCONNECT 26 ssl.gstatic.com:443
    650 STREAM 27 NEW 0 ssl.gstatic.com:443 SOURCE_ADDR=127.0.0.1:47858 PURPOSE=USER
    650 STREAM 27 SENTCONNECT 26 ssl.gstatic.com:443
    650 STREAM 23 CLOSED 26 74.125.227.147:443 REASON=CONNRESET
    650 STREAM 26 REMAP 26 74.125.227.143:443 SOURCE=EXIT
    650 STREAM 26 SUCCEEDED 26 74.125.227.143:443
    650 STREAM 27 REMAP 26 74.125.227.143:443 SOURCE=EXIT
    650 STREAM 27 SUCCEEDED 26 74.125.227.143:443
    650 STREAM 25 CLOSED 26 199.7.52.72:80 REASON=DONE
    650 STREAM 27 CLOSED 26 74.125.227.143:443 REASON=CONNRESET
    650 STREAM 26 CLOSED 26 74.125.227.143:443 REASON=DONE
    650 STREAM 24 CLOSED 26 199.7.52.72:80 REASON=DONE
    650 STREAM 22 CLOSED 26 74.125.227.147:443 REASON=DONE
    650 STREAM 19 CLOSED 26 74.125.227.129:443 REASON=DONE
    650 STREAM 18 CLOSED 26 74.125.227.129:443 REASON=DONE
    Connection closed by foreign host.
---
 stem/control.py              |  115 ++++++++++++++++++++++++++++++++++++
 stem/response/events.py      |   83 ++++++++++++++++++++++++++-
 test/unit/response/events.py |  131 +++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 327 insertions(+), 2 deletions(-)

diff --git a/stem/control.py b/stem/control.py
index 083082e..d0e8462 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -182,6 +182,77 @@ providing its own for interacting at a higher level.
   **HSSR_CONNECTING**           connecting to the introductory point
   **HSSR_JOINED**               connected to the rendezvous-point
   ============================= ===========
+
+.. data:: StreamStatus (enum)
+  
+  State that a stream going through tor can have. Tor may provide states not in
+  this enum.
+  
+  ================= ===========
+  StreamStatus      Description
+  ================= ===========
+  **NEW**           request for a new connection
+  **NEWRESOLVE**    request to resolve an address
+  **REMAP**         address is being re-mapped to another
+  **SENTCONNECT**   sent a connect cell along a circuit
+  **SENTRESOLVE**   sent a resolve cell along a circuit
+  **SUCCEEDED**     stream has been established
+  **FAILED**        stream is detached, and won't be re-established
+  **DETACHED**      stream is detached, but might be re-established
+  **CLOSED**        stream has closed
+  ================= ===========
+
+.. data:: StreamClosureReason (enum)
+  
+  Reason that a stream is being closed or failed to be established. Tor may
+  provide purposes not in this enum.
+  
+  ===================== ===========
+  StreamClosureReason   Description
+  ===================== ===========
+  **MISC**              none of the following reasons
+  **RESOLVEFAILED**     unable to resolve the hostname
+  **CONNECTREFUSED**    remote host refused the connection
+  **EXITPOLICY**        rejected by the exit due to its exit policy
+  **DESTROY**           circuit is being shut down
+  **DONE**              connection has been closed
+  **TIMEOUT**           connection timed out
+  **NOROUTE**           routing error while contacting the destinaiton
+  **HIBERNATING**       relay is hibernating
+  **INTERNAL**          internal error
+  **RESOURCELIMIT**     relay has insufficient resources to service the request
+  **CONNRESET**         connection has been reset
+  **TORPROTOCOL**       violation in the tor protocol
+  **NOTDIRECTORY**      directory information requested from a relay that isn't mirroring it
+  **END**               endpoint has sent a RELAY_END cell
+  **PRIVATE_ADDR**      endpoint was a private address (127.0.0.1, 10.0.0.1, etc)
+  ===================== ===========
+
+.. data:: StreamSource (enum)
+  
+  Cause of a stream being remapped to another address.
+  
+  ============= ===========
+  StreamSource  Description
+  ============= ===========
+  **CACHE**     tor is remapping because of a cached answer
+  **EXIT**      exit relay requested the remap
+  ============= ===========
+
+.. data:: StreamPurpose (enum)
+  
+  Purpsoe of the stream. This is only provided with new streams and tor may
+  provide purposes not in this enum.
+  
+  ================= ===========
+  StreamPurpose     Description
+  ================= ===========
+  **DIR_FETCH**     unknown (https://trac.torproject.org/7508)
+  **UPLOAD_DESC**   unknown (https://trac.torproject.org/7508)
+  **DNS_REQUEST**   unknown (https://trac.torproject.org/7508)
+  **USER**          unknown (https://trac.torproject.org/7508)
+  **DIRPORT_TEST**  unknown (https://trac.torproject.org/7508)
+  ================= ===========
 """
 
 from __future__ import with_statement
@@ -289,6 +360,50 @@ HiddenServiceState = stem.util.enum.UppercaseEnum(
   "HSSR_JOINED",
 )
 
+StreamStatus = stem.util.enum.UppercaseEnum(
+  "NEW",
+  "NEWRESOLVE",
+  "REMAP",
+  "SENTCONNECT",
+  "SENTRESOLVE",
+  "SUCCEEDED",
+  "FAILED",
+  "DETACHED",
+  "CLOSED",
+)
+
+StreamClosureReason = stem.util.enum.UppercaseEnum(
+  "MISC",
+  "RESOLVEFAILED",
+  "CONNECTREFUSED",
+  "EXITPOLICY",
+  "DESTROY",
+  "DONE",
+  "TIMEOUT",
+  "NOROUTE",
+  "HIBERNATING",
+  "INTERNAL",
+  "RESOURCELIMIT",
+  "CONNRESET",
+  "TORPROTOCOL",
+  "NOTDIRECTORY",
+  "END",
+  "PRIVATE_ADDR",
+)
+
+StreamSource = stem.util.enum.UppercaseEnum(
+  "CACHE",
+  "EXIT",
+)
+
+StreamPurpose = stem.util.enum.UppercaseEnum(
+  "DIR_FETCH",
+  "UPLOAD_DESC",
+  "DNS_REQUEST",
+  "USER",
+  "DIRPORT_TEST",
+)
+
 # Constant to indicate an undefined argument default. Usually we'd use None for
 # this, but users will commonly provide None as the argument so need something
 # else fairly unique...
diff --git a/stem/response/events.py b/stem/response/events.py
index 4c8d8bd..8a0e428 100644
--- a/stem/response/events.py
+++ b/stem/response/events.py
@@ -3,7 +3,7 @@ import re
 import stem.control
 import stem.response
 
-from stem.util import log, str_tools, tor_tools
+from stem.util import connection, log, str_tools, tor_tools
 
 # Matches keyword=value arguments. This can't be a simple "(.*)=(.*)" pattern
 # because some positional arguments, like circuit paths, can have an equal
@@ -156,6 +156,86 @@ class CircuitEvent(Event):
       log_id = "event.circ.unknown_remote_reason.%s" % self.remote_reason
       log.log_once(log_id, log.INFO, unrecognized_msg % ('remote reason', self.remote_reason))
 
+class StreamEvent(Event):
+  """
+  Event that indicates that a stream has changed.
+  
+  :var str id: stream identifier
+  :var stem.control.StreamStatus status: reported status for the stream
+  :var str circ_id: circuit that the stream is attached to
+  :var str target: destination of the stream
+  :var str target_address: destination address (ip or hostname)
+  :var int target_port: destination port
+  :var stem.control.StreamClosureReason reason: reason for the stream to be closed
+  :var stem.control.StreamClosureReason remote_reason: remote side's reason for the stream to be closed
+  :var stem.control.StreamSource source: origin of the REMAP request
+  :var str source_addr: requester of the connection
+  :var str source_address: requester address (ip or hostname)
+  :var int source_port: requester port
+  :var stem.control.StreamPurpose purpose: purpose for the stream
+  """
+  
+  _POSITIONAL_ARGS = ("id", "status", "circ_id", "target")
+  _KEYWORD_ARGS = {
+    "REASON": "reason",
+    "REMOTE_REASON": "remote_reason",
+    "SOURCE": "source",
+    "SOURCE_ADDR": "source_addr",
+    "PURPOSE": "purpose",
+  }
+  
+  def _parse(self):
+    if self.target is None:
+      self.target_address = None
+      self.target_port = None
+    else:
+      if not ':' in self.target:
+        raise stem.ProtocolError("Target location must be of the form 'address:port': %s" % self)
+      
+      address, port = self.target.split(':')
+      
+      if not connection.is_valid_port(port):
+        raise stem.ProtocolError("Target location's port is invalid: %s" % self)
+      
+      self.target_address = address
+      self.target_port = int(port)
+    
+    if self.source_addr is None:
+      self.source_address = None
+      self.source_port = None
+    else:
+      if not ':' in self.source_addr:
+        raise stem.ProtocolError("Source location must be of the form 'address:port': %s" % self)
+      
+      address, port = self.source_addr.split(':')
+      
+      if not connection.is_valid_port(port):
+        raise stem.ProtocolError("Source location's port is invalid: %s" % self)
+      
+      self.source_address = address
+      self.source_port = int(port)
+    
+    # spec specifies a circ_id of zero if the stream is unattached
+    
+    if self.circ_id == "0":
+      self.circ_id = None
+    
+    # log if we have an unrecognized closure reason or purpose
+    
+    unrecognized_msg = "STREAM event had an unrecognised %%s (%%s). Maybe a new addition to the control protocol? Full Event: '%s'" % self
+    
+    if self.reason and (not self.reason in stem.control.StreamClosureReason):
+      log_id = "event.stream.reason.%s" % self.reason
+      log.log_once(log_id, log.INFO, unrecognized_msg % ('reason', self.reason))
+    
+    if self.remote_reason and (not self.remote_reason in stem.control.StreamClosureReason):
+      log_id = "event.stream.remote_reason.%s" % self.remote_reason
+      log.log_once(log_id, log.INFO, unrecognized_msg % ('remote reason', self.remote_reason))
+    
+    if self.purpose and (not self.purpose in stem.control.StreamPurpose):
+      log_id = "event.stream.purpose.%s" % self.purpose
+      log.log_once(log_id, log.INFO, unrecognized_msg % ('purpose', self.purpose))
+
 class BandwidthEvent(Event):
   """
   Event emitted every second with the bytes sent and received by tor.
@@ -196,6 +276,7 @@ class LogEvent(Event):
 
 EVENT_TYPE_TO_CLASS = {
   "CIRC": CircuitEvent,
+  "STREAM": StreamEvent,
   "BW": BandwidthEvent,
   "DEBUG": LogEvent,
   "INFO": LogEvent,
diff --git a/test/unit/response/events.py b/test/unit/response/events.py
index d0c0b4e..ac4a22f 100644
--- a/test/unit/response/events.py
+++ b/test/unit/response/events.py
@@ -11,7 +11,14 @@ import stem.response.events
 import test.mocking as mocking
 
 from stem import ProtocolError
-from stem.control import CircStatus, CircBuildFlag, CircPurpose, CircClosureReason
+from stem.control import CircStatus,\
+                         CircBuildFlag,\
+                         CircPurpose,\
+                         CircClosureReason,\
+                         StreamStatus,\
+                         StreamClosureReason,\
+                         StreamSource,\
+                         StreamPurpose
 
 # CIRC events from tor v0.2.3.16
 
@@ -42,6 +49,19 @@ $E57A476CD4DFBD99B4EE52A100A58610AD6E80B9,hamburgerphone"
 CIRC_BUILT_OLD = "650 CIRC 1 BUILT \
 $E57A476CD4DFBD99B4EE52A100A58610AD6E80B9,hamburgerphone,PrivacyRepublic14"
 
+# STREAM events from tor 0.2.3.16 for visiting the google front page
+
+STREAM_NEW = "650 STREAM 18 NEW 0 \
+encrypted.google.com:443 \
+SOURCE_ADDR=127.0.0.1:47849 \
+PURPOSE=USER"
+
+STREAM_SENTCONNECT = "650 STREAM 18 SENTCONNECT 26 encrypted.google.com:443"
+STREAM_REMAP = "650 STREAM 18 REMAP 26 74.125.227.129:443 SOURCE=EXIT"
+STREAM_SUCCEEDED = "650 STREAM 18 SUCCEEDED 26 74.125.227.129:443"
+STREAM_CLOSED_RESET = "650 STREAM 21 CLOSED 26 74.125.227.129:443 REASON=CONNRESET"
+STREAM_CLOSED_DONE = "650 STREAM 25 CLOSED 26 199.7.52.72:80 REASON=DONE"
+
 def _get_event(content):
   controller_event = mocking.get_message(content)
   stem.response.convert("EVENT", controller_event, arrived_at = 25)
@@ -169,6 +189,115 @@ class TestEvents(unittest.TestCase):
     self.assertEqual(None, event.reason)
     self.assertEqual(None, event.remote_reason)
   
+  def test_stream_event(self):
+    event = _get_event(STREAM_NEW)
+    
+    self.assertTrue(isinstance(event, stem.response.events.StreamEvent))
+    self.assertEqual(STREAM_NEW.lstrip("650 "), str(event))
+    self.assertEqual("18", event.id)
+    self.assertEqual(StreamStatus.NEW, event.status)
+    self.assertEqual(None, event.circ_id)
+    self.assertEqual("encrypted.google.com:443", event.target)
+    self.assertEqual("encrypted.google.com", event.target_address)
+    self.assertEqual(443, event.target_port)
+    self.assertEqual(None, event.reason)
+    self.assertEqual(None, event.remote_reason)
+    self.assertEqual(None, event.source)
+    self.assertEqual("127.0.0.1:47849", event.source_addr)
+    self.assertEqual("127.0.0.1", event.source_address)
+    self.assertEqual(47849, event.source_port)
+    self.assertEqual(StreamPurpose.USER, event.purpose)
+    
+    event = _get_event(STREAM_SENTCONNECT)
+    
+    self.assertTrue(isinstance(event, stem.response.events.StreamEvent))
+    self.assertEqual(STREAM_SENTCONNECT.lstrip("650 "), str(event))
+    self.assertEqual("18", event.id)
+    self.assertEqual(StreamStatus.SENTCONNECT, event.status)
+    self.assertEqual("26", event.circ_id)
+    self.assertEqual("encrypted.google.com:443", event.target)
+    self.assertEqual("encrypted.google.com", event.target_address)
+    self.assertEqual(443, event.target_port)
+    self.assertEqual(None, event.reason)
+    self.assertEqual(None, event.remote_reason)
+    self.assertEqual(None, event.source)
+    self.assertEqual(None, event.source_addr)
+    self.assertEqual(None, event.source_address)
+    self.assertEqual(None, event.source_port)
+    self.assertEqual(None, event.purpose)
+    
+    event = _get_event(STREAM_REMAP)
+    
+    self.assertTrue(isinstance(event, stem.response.events.StreamEvent))
+    self.assertEqual(STREAM_REMAP.lstrip("650 "), str(event))
+    self.assertEqual("18", event.id)
+    self.assertEqual(StreamStatus.REMAP, event.status)
+    self.assertEqual("26", event.circ_id)
+    self.assertEqual("74.125.227.129:443", event.target)
+    self.assertEqual("74.125.227.129", event.target_address)
+    self.assertEqual(443, event.target_port)
+    self.assertEqual(None, event.reason)
+    self.assertEqual(None, event.remote_reason)
+    self.assertEqual(StreamSource.EXIT, event.source)
+    self.assertEqual(None, event.source_addr)
+    self.assertEqual(None, event.source_address)
+    self.assertEqual(None, event.source_port)
+    self.assertEqual(None, event.purpose)
+    
+    event = _get_event(STREAM_SUCCEEDED)
+    
+    self.assertTrue(isinstance(event, stem.response.events.StreamEvent))
+    self.assertEqual(STREAM_SUCCEEDED.lstrip("650 "), str(event))
+    self.assertEqual("18", event.id)
+    self.assertEqual(StreamStatus.SUCCEEDED, event.status)
+    self.assertEqual("26", event.circ_id)
+    self.assertEqual("74.125.227.129:443", event.target)
+    self.assertEqual("74.125.227.129", event.target_address)
+    self.assertEqual(443, event.target_port)
+    self.assertEqual(None, event.reason)
+    self.assertEqual(None, event.remote_reason)
+    self.assertEqual(None, event.source)
+    self.assertEqual(None, event.source_addr)
+    self.assertEqual(None, event.source_address)
+    self.assertEqual(None, event.source_port)
+    self.assertEqual(None, event.purpose)
+    
+    event = _get_event(STREAM_CLOSED_RESET)
+    
+    self.assertTrue(isinstance(event, stem.response.events.StreamEvent))
+    self.assertEqual(STREAM_CLOSED_RESET.lstrip("650 "), str(event))
+    self.assertEqual("21", event.id)
+    self.assertEqual(StreamStatus.CLOSED, event.status)
+    self.assertEqual("26", event.circ_id)
+    self.assertEqual("74.125.227.129:443", event.target)
+    self.assertEqual("74.125.227.129", event.target_address)
+    self.assertEqual(443, event.target_port)
+    self.assertEqual(StreamClosureReason.CONNRESET, event.reason)
+    self.assertEqual(None, event.remote_reason)
+    self.assertEqual(None, event.source)
+    self.assertEqual(None, event.source_addr)
+    self.assertEqual(None, event.source_address)
+    self.assertEqual(None, event.source_port)
+    self.assertEqual(None, event.purpose)
+    
+    event = _get_event(STREAM_CLOSED_DONE)
+    
+    self.assertTrue(isinstance(event, stem.response.events.StreamEvent))
+    self.assertEqual(STREAM_CLOSED_DONE.lstrip("650 "), str(event))
+    self.assertEqual("25", event.id)
+    self.assertEqual(StreamStatus.CLOSED, event.status)
+    self.assertEqual("26", event.circ_id)
+    self.assertEqual("199.7.52.72:80", event.target)
+    self.assertEqual("199.7.52.72", event.target_address)
+    self.assertEqual(80, event.target_port)
+    self.assertEqual(StreamClosureReason.DONE, event.reason)
+    self.assertEqual(None, event.remote_reason)
+    self.assertEqual(None, event.source)
+    self.assertEqual(None, event.source_addr)
+    self.assertEqual(None, event.source_address)
+    self.assertEqual(None, event.source_port)
+    self.assertEqual(None, event.purpose)
+  
   def test_bw_event(self):
     event = _get_event("650 BW 15 25")
     





More information about the tor-commits mailing list