[tor-commits] [stem/master] Support for STREAM events
atagar at torproject.org
atagar at torproject.org
Mon Dec 3 02:35:44 UTC 2012
commit d735e1e267877d9e9e7fa0686c2cd49c63930f0b
Author: Damian Johnson <atagar at torproject.org>
Date: Sun Nov 18 11:48:00 2012 -0800
Support for STREAM events
Implementaton and tests for STREAM events. I got the test data by...
* starting TBB
* used netstat to get the control port (shouldn't have needed to do this -
https://trac.torproject.org/7512)
* connecting to it with telnet
* AUTHENTICATE
* SETEVENTS STREAM
* visited google's front page in firefox
Full test data:
AUTHENTICATE
250 OK
SETEVENTS STREAM
250 OK
650 STREAM 18 NEW 0 encrypted.google.com:443 SOURCE_ADDR=127.0.0.1:47849 PURPOSE=USER
650 STREAM 18 SENTCONNECT 26 encrypted.google.com:443
650 STREAM 19 NEW 0 encrypted.google.com:443 SOURCE_ADDR=127.0.0.1:47850 PURPOSE=USER
650 STREAM 19 SENTCONNECT 26 encrypted.google.com:443
650 STREAM 18 REMAP 26 74.125.227.129:443 SOURCE=EXIT
650 STREAM 18 SUCCEEDED 26 74.125.227.129:443
650 STREAM 19 REMAP 26 74.125.227.129:443 SOURCE=EXIT
650 STREAM 19 SUCCEEDED 26 74.125.227.129:443
650 STREAM 20 NEW 0 encrypted.google.com:443 SOURCE_ADDR=127.0.0.1:47851 PURPOSE=USER
650 STREAM 20 REMAP 0 74.125.227.129:443 SOURCE=CACHE
650 STREAM 20 SENTCONNECT 26 74.125.227.129:443
650 STREAM 21 NEW 0 encrypted.google.com:443 SOURCE_ADDR=127.0.0.1:47852 PURPOSE=USER
650 STREAM 21 REMAP 0 74.125.227.129:443 SOURCE=CACHE
650 STREAM 21 SENTCONNECT 26 74.125.227.129:443
650 STREAM 20 REMAP 26 74.125.227.129:443 SOURCE=EXIT
650 STREAM 20 SUCCEEDED 26 74.125.227.129:443
650 STREAM 21 REMAP 26 74.125.227.129:443 SOURCE=EXIT
650 STREAM 21 SUCCEEDED 26 74.125.227.129:443
650 STREAM 22 NEW 0 www.google.com:443 SOURCE_ADDR=127.0.0.1:47853 PURPOSE=USER
650 STREAM 22 SENTCONNECT 26 www.google.com:443
650 STREAM 23 NEW 0 www.google.com:443 SOURCE_ADDR=127.0.0.1:47854 PURPOSE=USER
650 STREAM 23 SENTCONNECT 26 www.google.com:443
650 STREAM 21 CLOSED 26 74.125.227.129:443 REASON=CONNRESET
650 STREAM 20 CLOSED 26 74.125.227.129:443 REASON=CONNRESET
650 STREAM 22 REMAP 26 74.125.227.147:443 SOURCE=EXIT
650 STREAM 22 SUCCEEDED 26 74.125.227.147:443
650 STREAM 23 REMAP 26 74.125.227.147:443 SOURCE=EXIT
650 STREAM 23 SUCCEEDED 26 74.125.227.147:443
650 STREAM 24 NEW 0 ocsp.thawte.com:80 SOURCE_ADDR=127.0.0.1:47855 PURPOSE=USER
650 STREAM 24 SENTCONNECT 26 ocsp.thawte.com:80
650 STREAM 25 NEW 0 ocsp.thawte.com:80 SOURCE_ADDR=127.0.0.1:47856 PURPOSE=USER
650 STREAM 25 SENTCONNECT 26 ocsp.thawte.com:80
650 STREAM 24 REMAP 26 199.7.52.72:80 SOURCE=EXIT
650 STREAM 24 SUCCEEDED 26 199.7.52.72:80
650 STREAM 25 REMAP 26 199.7.52.72:80 SOURCE=EXIT
650 STREAM 25 SUCCEEDED 26 199.7.52.72:80
650 STREAM 26 NEW 0 ssl.gstatic.com:443 SOURCE_ADDR=127.0.0.1:47857 PURPOSE=USER
650 STREAM 26 SENTCONNECT 26 ssl.gstatic.com:443
650 STREAM 27 NEW 0 ssl.gstatic.com:443 SOURCE_ADDR=127.0.0.1:47858 PURPOSE=USER
650 STREAM 27 SENTCONNECT 26 ssl.gstatic.com:443
650 STREAM 23 CLOSED 26 74.125.227.147:443 REASON=CONNRESET
650 STREAM 26 REMAP 26 74.125.227.143:443 SOURCE=EXIT
650 STREAM 26 SUCCEEDED 26 74.125.227.143:443
650 STREAM 27 REMAP 26 74.125.227.143:443 SOURCE=EXIT
650 STREAM 27 SUCCEEDED 26 74.125.227.143:443
650 STREAM 25 CLOSED 26 199.7.52.72:80 REASON=DONE
650 STREAM 27 CLOSED 26 74.125.227.143:443 REASON=CONNRESET
650 STREAM 26 CLOSED 26 74.125.227.143:443 REASON=DONE
650 STREAM 24 CLOSED 26 199.7.52.72:80 REASON=DONE
650 STREAM 22 CLOSED 26 74.125.227.147:443 REASON=DONE
650 STREAM 19 CLOSED 26 74.125.227.129:443 REASON=DONE
650 STREAM 18 CLOSED 26 74.125.227.129:443 REASON=DONE
Connection closed by foreign host.
---
stem/control.py | 115 ++++++++++++++++++++++++++++++++++++
stem/response/events.py | 83 ++++++++++++++++++++++++++-
test/unit/response/events.py | 131 +++++++++++++++++++++++++++++++++++++++++-
3 files changed, 327 insertions(+), 2 deletions(-)
diff --git a/stem/control.py b/stem/control.py
index 083082e..d0e8462 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -182,6 +182,77 @@ providing its own for interacting at a higher level.
**HSSR_CONNECTING** connecting to the introductory point
**HSSR_JOINED** connected to the rendezvous-point
============================= ===========
+
+.. data:: StreamStatus (enum)
+
+ State that a stream going through tor can have. Tor may provide states not in
+ this enum.
+
+ ================= ===========
+ StreamStatus Description
+ ================= ===========
+ **NEW** request for a new connection
+ **NEWRESOLVE** request to resolve an address
+ **REMAP** address is being re-mapped to another
+ **SENTCONNECT** sent a connect cell along a circuit
+ **SENTRESOLVE** sent a resolve cell along a circuit
+ **SUCCEEDED** stream has been established
+ **FAILED** stream is detached, and won't be re-established
+ **DETACHED** stream is detached, but might be re-established
+ **CLOSED** stream has closed
+ ================= ===========
+
+.. data:: StreamClosureReason (enum)
+
+ Reason that a stream is being closed or failed to be established. Tor may
+ provide purposes not in this enum.
+
+ ===================== ===========
+ StreamClosureReason Description
+ ===================== ===========
+ **MISC** none of the following reasons
+ **RESOLVEFAILED** unable to resolve the hostname
+ **CONNECTREFUSED** remote host refused the connection
+ **EXITPOLICY** rejected by the exit due to its exit policy
+ **DESTROY** circuit is being shut down
+ **DONE** connection has been closed
+ **TIMEOUT** connection timed out
+ **NOROUTE** routing error while contacting the destinaiton
+ **HIBERNATING** relay is hibernating
+ **INTERNAL** internal error
+ **RESOURCELIMIT** relay has insufficient resources to service the request
+ **CONNRESET** connection has been reset
+ **TORPROTOCOL** violation in the tor protocol
+ **NOTDIRECTORY** directory information requested from a relay that isn't mirroring it
+ **END** endpoint has sent a RELAY_END cell
+ **PRIVATE_ADDR** endpoint was a private address (127.0.0.1, 10.0.0.1, etc)
+ ===================== ===========
+
+.. data:: StreamSource (enum)
+
+ Cause of a stream being remapped to another address.
+
+ ============= ===========
+ StreamSource Description
+ ============= ===========
+ **CACHE** tor is remapping because of a cached answer
+ **EXIT** exit relay requested the remap
+ ============= ===========
+
+.. data:: StreamPurpose (enum)
+
+ Purpsoe of the stream. This is only provided with new streams and tor may
+ provide purposes not in this enum.
+
+ ================= ===========
+ StreamPurpose Description
+ ================= ===========
+ **DIR_FETCH** unknown (https://trac.torproject.org/7508)
+ **UPLOAD_DESC** unknown (https://trac.torproject.org/7508)
+ **DNS_REQUEST** unknown (https://trac.torproject.org/7508)
+ **USER** unknown (https://trac.torproject.org/7508)
+ **DIRPORT_TEST** unknown (https://trac.torproject.org/7508)
+ ================= ===========
"""
from __future__ import with_statement
@@ -289,6 +360,50 @@ HiddenServiceState = stem.util.enum.UppercaseEnum(
"HSSR_JOINED",
)
+StreamStatus = stem.util.enum.UppercaseEnum(
+ "NEW",
+ "NEWRESOLVE",
+ "REMAP",
+ "SENTCONNECT",
+ "SENTRESOLVE",
+ "SUCCEEDED",
+ "FAILED",
+ "DETACHED",
+ "CLOSED",
+)
+
+StreamClosureReason = stem.util.enum.UppercaseEnum(
+ "MISC",
+ "RESOLVEFAILED",
+ "CONNECTREFUSED",
+ "EXITPOLICY",
+ "DESTROY",
+ "DONE",
+ "TIMEOUT",
+ "NOROUTE",
+ "HIBERNATING",
+ "INTERNAL",
+ "RESOURCELIMIT",
+ "CONNRESET",
+ "TORPROTOCOL",
+ "NOTDIRECTORY",
+ "END",
+ "PRIVATE_ADDR",
+)
+
+StreamSource = stem.util.enum.UppercaseEnum(
+ "CACHE",
+ "EXIT",
+)
+
+StreamPurpose = stem.util.enum.UppercaseEnum(
+ "DIR_FETCH",
+ "UPLOAD_DESC",
+ "DNS_REQUEST",
+ "USER",
+ "DIRPORT_TEST",
+)
+
# Constant to indicate an undefined argument default. Usually we'd use None for
# this, but users will commonly provide None as the argument so need something
# else fairly unique...
diff --git a/stem/response/events.py b/stem/response/events.py
index 4c8d8bd..8a0e428 100644
--- a/stem/response/events.py
+++ b/stem/response/events.py
@@ -3,7 +3,7 @@ import re
import stem.control
import stem.response
-from stem.util import log, str_tools, tor_tools
+from stem.util import connection, log, str_tools, tor_tools
# Matches keyword=value arguments. This can't be a simple "(.*)=(.*)" pattern
# because some positional arguments, like circuit paths, can have an equal
@@ -156,6 +156,86 @@ class CircuitEvent(Event):
log_id = "event.circ.unknown_remote_reason.%s" % self.remote_reason
log.log_once(log_id, log.INFO, unrecognized_msg % ('remote reason', self.remote_reason))
+class StreamEvent(Event):
+ """
+ Event that indicates that a stream has changed.
+
+ :var str id: stream identifier
+ :var stem.control.StreamStatus status: reported status for the stream
+ :var str circ_id: circuit that the stream is attached to
+ :var str target: destination of the stream
+ :var str target_address: destination address (ip or hostname)
+ :var int target_port: destination port
+ :var stem.control.StreamClosureReason reason: reason for the stream to be closed
+ :var stem.control.StreamClosureReason remote_reason: remote side's reason for the stream to be closed
+ :var stem.control.StreamSource source: origin of the REMAP request
+ :var str source_addr: requester of the connection
+ :var str source_address: requester address (ip or hostname)
+ :var int source_port: requester port
+ :var stem.control.StreamPurpose purpose: purpose for the stream
+ """
+
+ _POSITIONAL_ARGS = ("id", "status", "circ_id", "target")
+ _KEYWORD_ARGS = {
+ "REASON": "reason",
+ "REMOTE_REASON": "remote_reason",
+ "SOURCE": "source",
+ "SOURCE_ADDR": "source_addr",
+ "PURPOSE": "purpose",
+ }
+
+ def _parse(self):
+ if self.target is None:
+ self.target_address = None
+ self.target_port = None
+ else:
+ if not ':' in self.target:
+ raise stem.ProtocolError("Target location must be of the form 'address:port': %s" % self)
+
+ address, port = self.target.split(':')
+
+ if not connection.is_valid_port(port):
+ raise stem.ProtocolError("Target location's port is invalid: %s" % self)
+
+ self.target_address = address
+ self.target_port = int(port)
+
+ if self.source_addr is None:
+ self.source_address = None
+ self.source_port = None
+ else:
+ if not ':' in self.source_addr:
+ raise stem.ProtocolError("Source location must be of the form 'address:port': %s" % self)
+
+ address, port = self.source_addr.split(':')
+
+ if not connection.is_valid_port(port):
+ raise stem.ProtocolError("Source location's port is invalid: %s" % self)
+
+ self.source_address = address
+ self.source_port = int(port)
+
+ # spec specifies a circ_id of zero if the stream is unattached
+
+ if self.circ_id == "0":
+ self.circ_id = None
+
+ # log if we have an unrecognized closure reason or purpose
+
+ unrecognized_msg = "STREAM event had an unrecognised %%s (%%s). Maybe a new addition to the control protocol? Full Event: '%s'" % self
+
+ if self.reason and (not self.reason in stem.control.StreamClosureReason):
+ log_id = "event.stream.reason.%s" % self.reason
+ log.log_once(log_id, log.INFO, unrecognized_msg % ('reason', self.reason))
+
+ if self.remote_reason and (not self.remote_reason in stem.control.StreamClosureReason):
+ log_id = "event.stream.remote_reason.%s" % self.remote_reason
+ log.log_once(log_id, log.INFO, unrecognized_msg % ('remote reason', self.remote_reason))
+
+ if self.purpose and (not self.purpose in stem.control.StreamPurpose):
+ log_id = "event.stream.purpose.%s" % self.purpose
+ log.log_once(log_id, log.INFO, unrecognized_msg % ('purpose', self.purpose))
+
class BandwidthEvent(Event):
"""
Event emitted every second with the bytes sent and received by tor.
@@ -196,6 +276,7 @@ class LogEvent(Event):
EVENT_TYPE_TO_CLASS = {
"CIRC": CircuitEvent,
+ "STREAM": StreamEvent,
"BW": BandwidthEvent,
"DEBUG": LogEvent,
"INFO": LogEvent,
diff --git a/test/unit/response/events.py b/test/unit/response/events.py
index d0c0b4e..ac4a22f 100644
--- a/test/unit/response/events.py
+++ b/test/unit/response/events.py
@@ -11,7 +11,14 @@ import stem.response.events
import test.mocking as mocking
from stem import ProtocolError
-from stem.control import CircStatus, CircBuildFlag, CircPurpose, CircClosureReason
+from stem.control import CircStatus,\
+ CircBuildFlag,\
+ CircPurpose,\
+ CircClosureReason,\
+ StreamStatus,\
+ StreamClosureReason,\
+ StreamSource,\
+ StreamPurpose
# CIRC events from tor v0.2.3.16
@@ -42,6 +49,19 @@ $E57A476CD4DFBD99B4EE52A100A58610AD6E80B9,hamburgerphone"
CIRC_BUILT_OLD = "650 CIRC 1 BUILT \
$E57A476CD4DFBD99B4EE52A100A58610AD6E80B9,hamburgerphone,PrivacyRepublic14"
+# STREAM events from tor 0.2.3.16 for visiting the google front page
+
+STREAM_NEW = "650 STREAM 18 NEW 0 \
+encrypted.google.com:443 \
+SOURCE_ADDR=127.0.0.1:47849 \
+PURPOSE=USER"
+
+STREAM_SENTCONNECT = "650 STREAM 18 SENTCONNECT 26 encrypted.google.com:443"
+STREAM_REMAP = "650 STREAM 18 REMAP 26 74.125.227.129:443 SOURCE=EXIT"
+STREAM_SUCCEEDED = "650 STREAM 18 SUCCEEDED 26 74.125.227.129:443"
+STREAM_CLOSED_RESET = "650 STREAM 21 CLOSED 26 74.125.227.129:443 REASON=CONNRESET"
+STREAM_CLOSED_DONE = "650 STREAM 25 CLOSED 26 199.7.52.72:80 REASON=DONE"
+
def _get_event(content):
controller_event = mocking.get_message(content)
stem.response.convert("EVENT", controller_event, arrived_at = 25)
@@ -169,6 +189,115 @@ class TestEvents(unittest.TestCase):
self.assertEqual(None, event.reason)
self.assertEqual(None, event.remote_reason)
+ def test_stream_event(self):
+ event = _get_event(STREAM_NEW)
+
+ self.assertTrue(isinstance(event, stem.response.events.StreamEvent))
+ self.assertEqual(STREAM_NEW.lstrip("650 "), str(event))
+ self.assertEqual("18", event.id)
+ self.assertEqual(StreamStatus.NEW, event.status)
+ self.assertEqual(None, event.circ_id)
+ self.assertEqual("encrypted.google.com:443", event.target)
+ self.assertEqual("encrypted.google.com", event.target_address)
+ self.assertEqual(443, event.target_port)
+ self.assertEqual(None, event.reason)
+ self.assertEqual(None, event.remote_reason)
+ self.assertEqual(None, event.source)
+ self.assertEqual("127.0.0.1:47849", event.source_addr)
+ self.assertEqual("127.0.0.1", event.source_address)
+ self.assertEqual(47849, event.source_port)
+ self.assertEqual(StreamPurpose.USER, event.purpose)
+
+ event = _get_event(STREAM_SENTCONNECT)
+
+ self.assertTrue(isinstance(event, stem.response.events.StreamEvent))
+ self.assertEqual(STREAM_SENTCONNECT.lstrip("650 "), str(event))
+ self.assertEqual("18", event.id)
+ self.assertEqual(StreamStatus.SENTCONNECT, event.status)
+ self.assertEqual("26", event.circ_id)
+ self.assertEqual("encrypted.google.com:443", event.target)
+ self.assertEqual("encrypted.google.com", event.target_address)
+ self.assertEqual(443, event.target_port)
+ self.assertEqual(None, event.reason)
+ self.assertEqual(None, event.remote_reason)
+ self.assertEqual(None, event.source)
+ self.assertEqual(None, event.source_addr)
+ self.assertEqual(None, event.source_address)
+ self.assertEqual(None, event.source_port)
+ self.assertEqual(None, event.purpose)
+
+ event = _get_event(STREAM_REMAP)
+
+ self.assertTrue(isinstance(event, stem.response.events.StreamEvent))
+ self.assertEqual(STREAM_REMAP.lstrip("650 "), str(event))
+ self.assertEqual("18", event.id)
+ self.assertEqual(StreamStatus.REMAP, event.status)
+ self.assertEqual("26", event.circ_id)
+ self.assertEqual("74.125.227.129:443", event.target)
+ self.assertEqual("74.125.227.129", event.target_address)
+ self.assertEqual(443, event.target_port)
+ self.assertEqual(None, event.reason)
+ self.assertEqual(None, event.remote_reason)
+ self.assertEqual(StreamSource.EXIT, event.source)
+ self.assertEqual(None, event.source_addr)
+ self.assertEqual(None, event.source_address)
+ self.assertEqual(None, event.source_port)
+ self.assertEqual(None, event.purpose)
+
+ event = _get_event(STREAM_SUCCEEDED)
+
+ self.assertTrue(isinstance(event, stem.response.events.StreamEvent))
+ self.assertEqual(STREAM_SUCCEEDED.lstrip("650 "), str(event))
+ self.assertEqual("18", event.id)
+ self.assertEqual(StreamStatus.SUCCEEDED, event.status)
+ self.assertEqual("26", event.circ_id)
+ self.assertEqual("74.125.227.129:443", event.target)
+ self.assertEqual("74.125.227.129", event.target_address)
+ self.assertEqual(443, event.target_port)
+ self.assertEqual(None, event.reason)
+ self.assertEqual(None, event.remote_reason)
+ self.assertEqual(None, event.source)
+ self.assertEqual(None, event.source_addr)
+ self.assertEqual(None, event.source_address)
+ self.assertEqual(None, event.source_port)
+ self.assertEqual(None, event.purpose)
+
+ event = _get_event(STREAM_CLOSED_RESET)
+
+ self.assertTrue(isinstance(event, stem.response.events.StreamEvent))
+ self.assertEqual(STREAM_CLOSED_RESET.lstrip("650 "), str(event))
+ self.assertEqual("21", event.id)
+ self.assertEqual(StreamStatus.CLOSED, event.status)
+ self.assertEqual("26", event.circ_id)
+ self.assertEqual("74.125.227.129:443", event.target)
+ self.assertEqual("74.125.227.129", event.target_address)
+ self.assertEqual(443, event.target_port)
+ self.assertEqual(StreamClosureReason.CONNRESET, event.reason)
+ self.assertEqual(None, event.remote_reason)
+ self.assertEqual(None, event.source)
+ self.assertEqual(None, event.source_addr)
+ self.assertEqual(None, event.source_address)
+ self.assertEqual(None, event.source_port)
+ self.assertEqual(None, event.purpose)
+
+ event = _get_event(STREAM_CLOSED_DONE)
+
+ self.assertTrue(isinstance(event, stem.response.events.StreamEvent))
+ self.assertEqual(STREAM_CLOSED_DONE.lstrip("650 "), str(event))
+ self.assertEqual("25", event.id)
+ self.assertEqual(StreamStatus.CLOSED, event.status)
+ self.assertEqual("26", event.circ_id)
+ self.assertEqual("199.7.52.72:80", event.target)
+ self.assertEqual("199.7.52.72", event.target_address)
+ self.assertEqual(80, event.target_port)
+ self.assertEqual(StreamClosureReason.DONE, event.reason)
+ self.assertEqual(None, event.remote_reason)
+ self.assertEqual(None, event.source)
+ self.assertEqual(None, event.source_addr)
+ self.assertEqual(None, event.source_address)
+ self.assertEqual(None, event.source_port)
+ self.assertEqual(None, event.purpose)
+
def test_bw_event(self):
event = _get_event("650 BW 15 25")
More information about the tor-commits
mailing list