[or-cvs] Add python interface to new controller protocol; split old ...
Nick Mathewson
nickm at seul.org
Sun Jun 19 22:38:33 UTC 2005
Update of /home/or/cvsroot/control/python
In directory moria:/tmp/cvs-serv9506/python
Modified Files:
TorCtl.py
Added Files:
TorCtl0.py TorCtl1.py
Log Message:
Add python interface to new controller protocol; split old protocol into separate module
Index: TorCtl.py
===================================================================
RCS file: /home/or/cvsroot/control/python/TorCtl.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- TorCtl.py 15 Jun 2005 15:18:22 -0000 1.2
+++ TorCtl.py 19 Jun 2005 22:38:31 -0000 1.3
@@ -7,129 +7,8 @@
TorCtl -- Library to control Tor processes. See TorCtlDemo.py for example use.
"""
-import binascii
-import os
-import sha
-import socket
-import struct
-import sys
-import threading
-import Queue
-
-__all__ = [
- "MSG_TYPE", "EVENT_TYPE", "CIRC_STATUS", "STREAM_STATUS",
- "OR_CONN_STATUS", "SIGNAL", "ERR_CODES",
- "TorCtlError", "ProtocolError", "ErrorReply", "Connection", "EventHandler",
- "DebugEventHandler", "parseHostAndPort"
- ]
-
-class _Enum:
- # Helper: define an ordered dense name-to-number 1-1 mapping.
- def __init__(self, start, names):
- self.nameOf = {}
- idx = start
- for name in names:
- setattr(self,name,idx)
- self.nameOf[idx] = name
- idx += 1
-
-class _Enum2:
- # Helper: define an ordered sparse name-to-number 1-1 mapping.
- def __init__(self, **args):
- self.__dict__.update(args)
- self.nameOf = {}
- for k,v in args.items():
- self.nameOf[v] = k
-
-# Message types that client or server can send.
-MSG_TYPE = _Enum(0x0000,
- ["ERROR",
- "DONE",
- "SETCONF",
- "GETCONF",
- "CONFVALUE",
- "SETEVENTS",
- "EVENT",
- "AUTH",
- "SAVECONF",
- "SIGNAL",
- "MAPADDRESS",
- "GETINFO",
- "INFOVALUE",
- "EXTENDCIRCUIT",
- "ATTACHSTREAM",
- "POSTDESCRIPTOR",
- "FRAGMENTHEADER",
- "FRAGMENT",
- "REDIRECTSTREAM",
- "CLOSESTREAM",
- "CLOSECIRCUIT",
- ])
-
-# Make sure that the enumeration code is working.
-assert MSG_TYPE.SAVECONF == 0x0008
-assert MSG_TYPE.CLOSECIRCUIT == 0x0014
-
-# Types of "EVENT" message.
-EVENT_TYPE = _Enum(0x0001,
- ["CIRCSTATUS",
- "STREAMSTATUS",
- "ORCONNSTATUS",
- "BANDWIDTH",
- "OBSOLETE_LOG",
- "NEWDESC",
- "DEBUG_MSG",
- "INFO_MSG",
- "NOTICE_MSG",
- "WARN_MSG",
- "ERR_MSG",
- ])
-
-assert EVENT_TYPE.ERR_MSG == 0x000B
-assert EVENT_TYPE.OBSOLETE_LOG == 0x0005
-
-# Status codes for "CIRCSTATUS" events.
-CIRC_STATUS = _Enum(0x00,
- ["LAUNCHED",
- "BUILT",
- "EXTENDED",
- "FAILED",
- "CLOSED"])
-
-# Status codes for "STREAMSTATUS" events
-STREAM_STATUS = _Enum(0x00,
- ["SENT_CONNECT",
- "SENT_RESOLVE",
- "SUCCEEDED",
- "FAILED",
- "CLOSED",
- "NEW_CONNECT",
- "NEW_RESOLVE",
- "DETACHED"])
-
-# Status codes for "ORCONNSTATUS" events
-OR_CONN_STATUS = _Enum(0x00,
- ["LAUNCHED","CONNECTED","FAILED","CLOSED"])
-
-# Signal codes for "SIGNAL" events.
-SIGNAL = _Enum2(HUP=0x01,INT=0x02,USR1=0x0A,USR2=0x0C,TERM=0x0F)
-
-# Error codes for "ERROR" events.
-ERR_CODES = {
- 0x0000 : "Unspecified error",
- 0x0001 : "Internal error",
- 0x0002 : "Unrecognized message type",
- 0x0003 : "Syntax error",
- 0x0004 : "Unrecognized configuration key",
- 0x0005 : "Invalid configuration value",
- 0x0006 : "Unrecognized byte code",
- 0x0007 : "Unauthorized",
- 0x0008 : "Failed authentication attempt",
- 0x0009 : "Resource exhausted",
- 0x000A : "No such stream",
- 0x000B : "No such circuit",
- 0x000C : "No such OR"
-}
+import TorCtl0
+import TorCtl1
class TorCtlError(Exception):
"Generic error raised by TorControl code."
@@ -143,392 +22,42 @@
"Raised when Tor controller returns an error"
pass
-def parseHostAndPort(h):
- """Given a string of the form 'address:port' or 'address' or
- 'port' or '', return a two-tuple of (address, port)
- """
- host, port = "localhost", 9100
- if ":" in h:
- i = h.index(":")
- host = h[:i]
- try:
- port = int(h[i+1:])
- except ValueError:
- print "Bad hostname %r"%h
- sys.exit(1)
- elif h:
- try:
- port = int(h)
- except ValueError:
- host = h
-
- return host, port
-
-def _unpack_singleton_msg(msg):
- """Helper: unpack a single packet. Return (None, minLength, body-so-far)
- on incomplete packet or (type,body,rest) on somplete packet
- """
- if len(msg) < 4:
- return None, 4, msg
- length,type = struct.unpack("!HH",msg)
- if len(msg) >= 4+length:
- return type,msg[4:4+length],msg[4+length:]
- else:
- return None,4+length,msg
-
-def _minLengthToPack(bytes):
- """Return the minimum number of bytes needed to pack the message 'smg'"""
- whole,left = divmod(bytes,65535)
- if left:
- return whole*(65535+4)+4+left
- else:
- return whole*(65535+4)
-
-def _unpack_msg(msg):
- "returns as for _unpack_singleton_msg"
- tp,body,rest = _unpack_singleton_msg(msg)
- if tp != MSG_TYPE.FRAGMENTHEADER:
- return tp, body, rest
-
- if len(body) < 6:
- raise ProtocolError("FRAGMENTHEADER message too short")
-
- realType,realLength = struct.unpack("!HL", body[:6])
-
- # Okay; could the message _possibly_ be here?
- minLength = _minLengthToPack(realLength+6)
- if len(msg) < minLength:
- return None, minLength, msg
-
- # Okay; optimistically try to build up the msg.
- soFar = [ body[6:] ]
- lenSoFarLen = len(body)-6
- while len(rest)>=4 and lenSoFar < realLength:
- ln, tp = struct.unpack("!HH", rest[:4])
- if tp != MSG_TYPE.FRAGMENT:
- raise ProtocolError("Missing FRAGMENT message")
- soFar.append(rest[4:4+ln])
- lenSoFar += ln
- if 4+ln > len(rest):
- rest = ""
- leftInPacket = 4+ln-len(rest)
- else:
- rest = rest[4+ln:]
- leftInPacket=0
-
- if lenSoFar == realLength:
- return realType, "".join(soFar), rest
- elif lenSoFar > realLength:
- raise ProtocolError("Bad fragmentation: message longer than declared")
- else:
- inOtherPackets = realLength-lenSoFar-leftInPacket
- minLength = _minLengthToPack(inOtherPackets)
- return None, len(msg)+leftInPacket+inOtherPackets, msg
-
-def _receive_singleton_msg(s):
- """Read a single packet from the socket s.
- """
- body = ""
- header = s.recv(4)
- length,type = struct.unpack("!HH",header)
- if length:
- while length > len(body):
- body += s.recv(length-len(body))
- return length,type,body
-
-def _receive_message(s):
- """Read a single message (possibly multi-packet) from the socket s."""
- length, tp, body = _receive_singleton_msg(s)
- if tp != MSG_TYPE.FRAGMENTHEADER:
- return length, tp, body
- if length < 6:
- raise ProtocolError("FRAGMENTHEADER message too short")
- realType,realLength = struct.unpack("!HL", body[:6])
- data = [ body[6:] ]
- soFar = len(data[0])
- while 1:
- length, tp, body = _receive_singleton_msg(s)
- if tp != MSG_TYPE.FRAGMENT:
- raise ProtocolError("Missing FRAGMENT message")
- soFar += length
- data.append(body)
- if soFar == realLength:
- return realLength, realType, "".join(data)
- elif soFar > realLength:
- raise ProtocolError("FRAGMENT message too long!")
-
-def pack_message(type, body=""):
- """Given a message type and optional message body, generate a set of
- packets to send.
- """
- length = len(body)
- if length < 65536:
- reqheader = struct.pack("!HH", length, type)
- return "%s%s"%(reqheader,body)
-
- fragheader = struct.pack("!HHHL",
- 65535, MSG_TYPE.FRAGMENTHEADER, type, length)
- msgs = [ fragheader, body[:65535-6] ]
- body = body[65535-6:]
- while body:
- if len(body) > 65535:
- fl = 65535
- else:
- fl = len(body)
- fragheader = struct.pack("!HH", MSG_TYPE.FRAGMENT, fl)
- msgs.append(fragheader)
- msgs.append(body[:fl])
- body = body[fl:]
-
- return "".join(msgs)
-
-def _parseKV(body,sep=" ",term="\n"):
- """Helper: parse a key/value list of the form [key sep value term]* .
- Return a list of (k,v)."""
- res = []
- for line in body.split(term):
- if not line: continue
- k, v = line.split(sep,1)
- res.append((k,v))
- return res
-
-def _unterminate(s):
- """Strip trailing NUL characters from s."""
- if s[-1] == '\0':
- return s[:-1]
- else:
- return s
-
-class Connection:
- """A Connection represents a connection to the Tor process."""
- def __init__(self, sock):
- """Create a Connection to communicate with the Tor process over the
- socket 'sock'.
- """
- self._s = sock
- self._handler = None
- self._sendLock = threading.RLock()
- self._queue = Queue.Queue()
- self._thread = None
-
- def setEventHandler(self, handler):
- """Cause future events from the Tor process to be sent to 'handler'.
- """
- self._handler = handler
-
- def launchThread(self, daemon=1):
- """Launch a background thread to handle messages from the Tor process."""
- assert self._thread is None
- t = threading.Thread(target=self._loop)
- if daemon:
- t.setDaemon(daemon)
- t.start()
- self._thread = t
- return t
-
- def _send(self, type, body=""):
- """Helper: Deliver a command of type 'type' and body 'body' to Tor.
- """
- self._s.sendall(pack_message(type, body))
-
- def _loop(self):
- """Main subthread loop: Read commands from Tor, and handle them either
- as events or as responses to other commands.
- """
- while 1:
- try:
- length, tp, body = _receive_message(self._s)
- except OSError:
- if self._queue.get(timeout=0) != "CLOSE":
- raise
- if tp == MSG_TYPE.EVENT:
- if self._handler is not None:
- self._handler.handle(body)
- else:
- cb = self._queue.get()
- cb(tp, body)
-
- def _sendAndRecv(self, tp, msg="", expectedTypes=(MSG_TYPE.DONE,)):
- """Helper: Send a command of type 'tp' and body 'msg' to Tor,
- and wait for a command in response. If the response type is
- in expectedTypes, return a (tp,body) tuple. If it is an error,
- raise ErrorReply. Otherwise, raise ProtocolError.
- """
- # This condition will get notified when we've got a result...
- condition = threading.Condition()
- # Here's where the result goes...
- result = []
-
- def cb(tp,body,condition=condition,result=result):
- condition.acquire()
- try:
- result.append((tp, body))
- condition.notify()
- finally:
- condition.release()
-
- # Sends a message to Tor...
- self._sendLock.acquire()
- try:
- self._queue.put(cb)
- self._send(tp, msg)
- finally:
- self._sendLock.release()
-
- # Now wait till the answer is in...
- condition.acquire()
- try:
- while not result:
- condition.wait()
- finally:
- condition.release()
-
- # ...And handle the answer appropriately.
- assert len(result) == 1
- tp, msg = result[0]
- if tp in expectedTypes:
- return tp, msg
- elif tp == MSG_TYPE.ERROR:
- if len(msg)<2:
- raise ProtocolError("(Truncated error message)")
- errCode, = struct.unpack("!H", msg[:2])
- raise ErrorReply((errCode,
- ERR_CODES.get(errCode,"[unrecognized]"),
- msg[2:]))
- else:
- raise ProtocolError("Unexpectd message type 0x%04x"%tp)
-
- def close(self):
- """Shut down this controller connection"""
- self._sendLock.acquire()
- try:
- self._queue.put("CLOSE")
- self._s.close()
- finally:
- self._sendLock.release()
-
- def authenticate(self, secret=""):
- """Send an authenticating secret to Tor. You'll need to call this
- method before Tor can start.
- """
- self._sendAndRecv(MSG_TYPE.AUTH,secret)
-
- def get_option(self,name):
- """Return the value of the configuration option named 'name'.
- """
- if not isinstance(name, str):
- name = "".join(["%s\n"%s for s in name])
- tp,body = self._sendAndRecv(MSG_TYPE.GETCONF,name,[MSG_TYPE.CONFVALUE])
- return _parseKV(body)
-
- def set_option(self,key,value):
- """Set the value of the configuration option 'key' to the value 'value'.
- """
- self.set_options([key, value])
-
- def set_options(self,kvlist):
- """Given a list of [(key,value)] pairs, set them as configuration
- options.
- """
- msg = "".join(["%s %s\n" for k,v in kvlist])
- self._sendAndRecv(MSG_TYPE.SETCONF,msg)
-
- def get_info(self,name):
- """Return the value of the internal information field named 'named'.
- """
- if not isinstance(name, str):
- name = "".join(["%s\n"%s for s in name])
- tp, body = self._sendAndRecv(MSG_TYPE.GETINFO,name,[MSG_TYPE.INFOVALUE])
- kvs = body.split("\0")
- d = {}
- for i in xrange(0,len(kvs)-1,2):
- d[kvs[i]] = kvs[i+1]
- return d
-
- def set_events(self,events):
- """Change the list of events that the event handler is interested
- in to those in 'events', which is a list of EVENT_TYPE members.
- """
- self._sendAndRecv(MSG_TYPE.SETEVENTS,
- "".join([struct.pack("!H", event) for event in events]))
-
- def save_conf(self):
- """Flush all configuration changes to disk.
- """
- self._sendAndRecv(s,MSG_TYPE.SAVECONF)
-
- def send_signal(self, sig):
- """Send the signal 'sig' to the Tor process; 'sig' must be a member of
- SIGNAL.
- """
- self._sendAndRecv(MSG_TYPE.SIGNAL,struct.pack("B",sig))
-
- def map_address(self, kvList):
- """Given a list of (old-address,new-address), have Tor redirect
- streams from old-address to new-address. Old-address can be in a
- special "dont-care" form of "0.0.0.0" or ".".
- """
- msg = [ "%s %s\n"%(k,v) for k,v in kvList ]
- tp, body = self._sendAndRecv(MSG_TYPE.MAPADDRESS,"".join(msg))
- return _parseKV(body)
-
- def extend_circuit(self, circid, hops):
- """Tell Tor to extend the circuit identified by 'circid' through the
- servers named in the list "hops".
- """
- msg = struct.pack("!L",circid) + ",".join(hops) + "\0"
- tp, body = self._sendAndRecv(MSG_TYPE.EXTENDCIRCUIT,msg)
- if len(body) != 4:
- raise ProtocolError("Extendcircuit reply too short or long")
- return struct.unpack("!L",body)[0]
-
- def redirect_stream(self, streamid, newtarget):
- """Tell Tor to change the target address of the stream identified by
- 'streamid' from its old value to 'newtarget'."""
- msg = struct.pack("!L",streamid) + newtarget + "\0"
- self._sendAndRecv(MSG_TYPE.REDIRECTSTREAM,msg)
-
- def attach_stream(self, streamid, circid):
- """Tell Tor To attach stream 'streamid' to circuit 'circid'."""
- msg = struct.pack("!LL",streamid, circid)
- self._sendAndRecv(MSG_TYPE.ATTACHSTREAM,msg)
-
- def close_stream(self, streamid, reason=0, flags=0):
- """Close the stream 'streamid'. """
- msg = struct.pack("!LBB",streamid,reason,flags)
- self._sendAndRecv(MSG_TYPE.CLOSESTREAM,msg)
-
- def close_circuit(self, circid, flags=0):
- """Close the circuit 'circid'."""
- msg = struct.pack("!LB",circid,flags)
- self._sendAndRecv(MSG_TYPE.CLOSECIRCUIT,msg)
-
- def post_descriptor(self, descriptor):
- """Tell Tor about a new descriptor in 'descriptor'."""
- self._sendAndRecv(MSG_TYPE.POSTDESCRIPTOR,descriptor)
-
class EventHandler:
"""An 'EventHandler' wraps callbacks for the events Tor can return."""
def __init__(self):
"""Create a new EventHandler."""
- self._map = {
+ from TorCtl0 import EVENT_TYPE
+ self._map0 = {
EVENT_TYPE.CIRCSTATUS : self.circStatus,
EVENT_TYPE.STREAMSTATUS : self.streamStatus,
EVENT_TYPE.ORCONNSTATUS : self.orConnStatus,
- EVENT_TYPE.BANDWIDTH : self.circStatus,
+ EVENT_TYPE.BANDWIDTH : self.bandwidth,
EVENT_TYPE.NEWDESC : self.newDesc,
EVENT_TYPE.INFO_MSG : self.msg,
EVENT_TYPE.NOTICE_MSG : self.msg,
EVENT_TYPE.WARN_MSG : self.msg,
EVENT_TYPE.ERR_MSG : self.msg,
}
+ self._map1 = {
+ "CIRC" : self.circStatus,
+ "STREAM" : self.streamStatus,
+ "ORCONN" : self.orConnStatus,
+ "BW" : self.bandwidth,
+ "DEBUG" : self.msg,
+ "INFO" : self.msg,
+ "NOTICE" : self.msg,
+ "WARN" : self.msg,
+ "ERR" : self.msg,
+ "NEWDESC" : self.newDesc,
+ "ADDRMAP" : self.addressMapped
+ },
- def handle(self, evbody):
+ def handle0(self, evbody):
"""Dispatcher: called from Connection when an event is received."""
- evtype, args = self.decode(evbody)
- self._map.get(evtype, self.unknownEvent)(evtype, *args)
+ evtype, args = self.decode0(evbody)
+ self._map0.get(evtype, self.unknownEvent)(evtype, *args)
- def decode(self, body):
+ def decode0(self, body):
"""Unpack an event message into a type/arguments-tuple tuple."""
if len(body)<2:
raise ProtocolError("EVENT body too short.")
@@ -568,6 +97,66 @@
return evtype, args
+ def handle1(self, evbody):
+ """Dispatcher: called from Connection when an event is received."""
+ evtype, args = self.decode1(evbody)
+ self._map1.get(evtype, self.unknownEvent)(evtype, *args)
+
+ def decode1(self, body):
+ """Unpack an event message into a type/arguments-tuple tuple."""
+ if " " in body:
+ evtype,body = body.split(" ",1)
+ else:
+ evtype,body = body,""
+ evtype = evtype.upper()
+ if evtype == "CIRC":
+ m = re.match(r"(\S+)\s+(\S+)(\s\S+)?", body)
+ if not m:
+ raise ProtocolError("CIRC event misformatted.")
+ status,ident,path = m.groups()
+ if path:
+ path = path.strip().split(",")
+ else:
+ path = []
+ args = status, ident, path
+ elif evtype == "STREAM":
+ m = re.match(r"(\S+)\s+(\S+)\s+(\S+)\s+(\S+)", body)
+ if not m:
+ raise ProtocolError("STREAM event misformatted.")
+ ident,status,circ,target = m.groups()
+ args = status, ident, target, circ
+ elif evtype == "ORCONN":
+ m = re.match(r"(\S+)\s+(\S+)", body)
+ if not m:
+ raise ProtocolError("ORCONN event misformatted.")
+ target, status = m.groups()
+ args = status, target
+ elif evtype == "BW":
+ m = re.match(r"(\d+)\s+(\d+)", body)
+ if not m:
+ raise ProtocolError("BANDWIDTH event misformatted.")
+ read, written = map(long, m.groups())
+ args = read, written
+ elif evtype in ("DEBUG", "INFO", "NOTICE", "WARN", "ERR"):
+ args = evtype, body
+ elif evtype == "NEWDESC":
+ args = ((" ".split(body)),)
+ elif evtype == "ADDRMAP":
+ m = re.match(r'(\S+)\s+(\S+)\s+(\"[^"]+\"|\w+)')
+ if not m:
+ raise ProtocolError("BANDWIDTH event misformatted.")
+ fromaddr, toaddr, when = m.groups()
+ if when.upper() == "NEVER":
+ when = None
+ else:
+ when = time.localtime(
+ time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S"))
+ args = fromaddr, toaddr, when
+ else:
+ args = (body,)
+
+ return evtype, args
+
def circStatus(self, status, circID, path):
"""Called when a circuit status changes if listening to CIRCSTATUS
events. 'status' is a member of CIRC_STATUS; circID is a numeric
@@ -576,7 +165,7 @@
"""
raise NotImplemented
- def streamStatus(self, status, circID, target):
+ def streamStatus(self, status, circID, target, circID="0"):
"""Called when a stream status changes if listening to STREAMSTATUS
events. 'status' is a member of STREAM_STATUS; streamID is a
numeric stream ID, and 'target' is the destination of the stream.
@@ -607,16 +196,30 @@
to INFO_MSG, NOTICE_MSG, WARN_MSG, or ERR_MSG events."""
raise NotImplemented
-class DebugEventHandler(EventHandler):
- """Trivial event handler: dumps all events to stdout."""
- def __init__(self, out=None):
- if out is None:
- out = sys.stdout
- self._out = out
+ def addressMapped(self, fromAddr, toAddr, expiry=None):
+ """DOCDOC"""
+ raise NotImplemented
- def handle(self, body):
- evtype, args = self.decode(body)
- print >>self._out,EVENT_TYPE.nameOf[evtype],args
+def parseHostAndPort(h):
+ """Given a string of the form 'address:port' or 'address' or
+ 'port' or '', return a two-tuple of (address, port)
+ """
+ host, port = "localhost", 9100
+ if ":" in h:
+ i = h.index(":")
+ host = h[:i]
+ try:
+ port = int(h[i+1:])
+ except ValueError:
+ print "Bad hostname %r"%h
+ sys.exit(1)
+ elif h:
+ try:
+ port = int(h)
+ except ValueError:
+ host = h
+
+ return host, port
def secret_to_key(secret, s2k_specifier):
c = ord(s2k_specifier[8])
--- NEW FILE: TorCtl0.py ---
#!/usr/bin/python
# TorCtl.py -- Python module to interface with Tor Control interface.
# Copyright 2005 Nick Mathewson -- See LICENSE for licensing information.
#$Id: TorCtl0.py,v 1.1 2005/06/19 22:38:31 nickm Exp $
"""
TorCtl0 -- Library to control Tor processes. See TorCtlDemo.py for example use.
"""
import binascii
import os
import sha
import socket
import struct
import sys
import threading
import Queue
__all__ = [
"MSG_TYPE", "EVENT_TYPE", "CIRC_STATUS", "STREAM_STATUS",
"OR_CONN_STATUS", "SIGNAL", "ERR_CODES",
"TorCtlError", "ProtocolError", "ErrorReply", "Connection", "EventHandler",
"DebugEventHandler", "parseHostAndPort"
]
class _Enum:
# Helper: define an ordered dense name-to-number 1-1 mapping.
def __init__(self, start, names):
self.nameOf = {}
idx = start
for name in names:
setattr(self,name,idx)
self.nameOf[idx] = name
idx += 1
class _Enum2:
# Helper: define an ordered sparse name-to-number 1-1 mapping.
def __init__(self, **args):
self.__dict__.update(args)
self.nameOf = {}
for k,v in args.items():
self.nameOf[v] = k
# Message types that client or server can send.
MSG_TYPE = _Enum(0x0000,
["ERROR",
"DONE",
"SETCONF",
"GETCONF",
"CONFVALUE",
"SETEVENTS",
"EVENT",
"AUTH",
"SAVECONF",
"SIGNAL",
"MAPADDRESS",
"GETINFO",
"INFOVALUE",
"EXTENDCIRCUIT",
"ATTACHSTREAM",
"POSTDESCRIPTOR",
"FRAGMENTHEADER",
"FRAGMENT",
"REDIRECTSTREAM",
"CLOSESTREAM",
"CLOSECIRCUIT",
])
# Make sure that the enumeration code is working.
assert MSG_TYPE.SAVECONF == 0x0008
assert MSG_TYPE.CLOSECIRCUIT == 0x0014
# Types of "EVENT" message.
EVENT_TYPE = _Enum(0x0001,
["CIRCSTATUS",
"STREAMSTATUS",
"ORCONNSTATUS",
"BANDWIDTH",
"OBSOLETE_LOG",
"NEWDESC",
"DEBUG_MSG",
"INFO_MSG",
"NOTICE_MSG",
"WARN_MSG",
"ERR_MSG",
])
assert EVENT_TYPE.ERR_MSG == 0x000B
assert EVENT_TYPE.OBSOLETE_LOG == 0x0005
# Status codes for "CIRCSTATUS" events.
CIRC_STATUS = _Enum(0x00,
["LAUNCHED",
"BUILT",
"EXTENDED",
"FAILED",
"CLOSED"])
# Status codes for "STREAMSTATUS" events
STREAM_STATUS = _Enum(0x00,
["SENT_CONNECT",
"SENT_RESOLVE",
"SUCCEEDED",
"FAILED",
"CLOSED",
"NEW_CONNECT",
"NEW_RESOLVE",
"DETACHED"])
# Status codes for "ORCONNSTATUS" events
OR_CONN_STATUS = _Enum(0x00,
["LAUNCHED","CONNECTED","FAILED","CLOSED"])
# Signal codes for "SIGNAL" events.
SIGNAL = _Enum2(HUP=0x01,INT=0x02,USR1=0x0A,USR2=0x0C,TERM=0x0F)
# Error codes for "ERROR" events.
ERR_CODES = {
0x0000 : "Unspecified error",
0x0001 : "Internal error",
0x0002 : "Unrecognized message type",
0x0003 : "Syntax error",
0x0004 : "Unrecognized configuration key",
0x0005 : "Invalid configuration value",
0x0006 : "Unrecognized byte code",
0x0007 : "Unauthorized",
0x0008 : "Failed authentication attempt",
0x0009 : "Resource exhausted",
0x000A : "No such stream",
0x000B : "No such circuit",
0x000C : "No such OR"
}
def _unpack_singleton_msg(msg):
"""Helper: unpack a single packet. Return (None, minLength, body-so-far)
on incomplete packet or (type,body,rest) on somplete packet
"""
if len(msg) < 4:
return None, 4, msg
length,type = struct.unpack("!HH",msg)
if len(msg) >= 4+length:
return type,msg[4:4+length],msg[4+length:]
else:
return None,4+length,msg
def _minLengthToPack(bytes):
"""Return the minimum number of bytes needed to pack the message 'smg'"""
whole,left = divmod(bytes,65535)
if left:
return whole*(65535+4)+4+left
else:
return whole*(65535+4)
def _unpack_msg(msg):
"returns as for _unpack_singleton_msg"
tp,body,rest = _unpack_singleton_msg(msg)
if tp != MSG_TYPE.FRAGMENTHEADER:
return tp, body, rest
if len(body) < 6:
raise ProtocolError("FRAGMENTHEADER message too short")
realType,realLength = struct.unpack("!HL", body[:6])
# Okay; could the message _possibly_ be here?
minLength = _minLengthToPack(realLength+6)
if len(msg) < minLength:
return None, minLength, msg
# Okay; optimistically try to build up the msg.
soFar = [ body[6:] ]
lenSoFarLen = len(body)-6
while len(rest)>=4 and lenSoFar < realLength:
ln, tp = struct.unpack("!HH", rest[:4])
if tp != MSG_TYPE.FRAGMENT:
raise ProtocolError("Missing FRAGMENT message")
soFar.append(rest[4:4+ln])
lenSoFar += ln
if 4+ln > len(rest):
rest = ""
leftInPacket = 4+ln-len(rest)
else:
rest = rest[4+ln:]
leftInPacket=0
if lenSoFar == realLength:
return realType, "".join(soFar), rest
elif lenSoFar > realLength:
raise ProtocolError("Bad fragmentation: message longer than declared")
else:
inOtherPackets = realLength-lenSoFar-leftInPacket
minLength = _minLengthToPack(inOtherPackets)
return None, len(msg)+leftInPacket+inOtherPackets, msg
def _receive_singleton_msg(s):
"""Read a single packet from the socket s.
"""
body = ""
header = s.recv(4)
length,type = struct.unpack("!HH",header)
if length:
while length > len(body):
body += s.recv(length-len(body))
return length,type,body
def _receive_message(s):
"""Read a single message (possibly multi-packet) from the socket s."""
length, tp, body = _receive_singleton_msg(s)
if tp != MSG_TYPE.FRAGMENTHEADER:
return length, tp, body
if length < 6:
raise ProtocolError("FRAGMENTHEADER message too short")
realType,realLength = struct.unpack("!HL", body[:6])
data = [ body[6:] ]
soFar = len(data[0])
while 1:
length, tp, body = _receive_singleton_msg(s)
if tp != MSG_TYPE.FRAGMENT:
raise ProtocolError("Missing FRAGMENT message")
soFar += length
data.append(body)
if soFar == realLength:
return realLength, realType, "".join(data)
elif soFar > realLength:
raise ProtocolError("FRAGMENT message too long!")
def pack_message(type, body=""):
"""Given a message type and optional message body, generate a set of
packets to send.
"""
length = len(body)
if length < 65536:
reqheader = struct.pack("!HH", length, type)
return "%s%s"%(reqheader,body)
fragheader = struct.pack("!HHHL",
65535, MSG_TYPE.FRAGMENTHEADER, type, length)
msgs = [ fragheader, body[:65535-6] ]
body = body[65535-6:]
while body:
if len(body) > 65535:
fl = 65535
else:
fl = len(body)
fragheader = struct.pack("!HH", MSG_TYPE.FRAGMENT, fl)
msgs.append(fragheader)
msgs.append(body[:fl])
body = body[fl:]
return "".join(msgs)
def _parseKV(body,sep=" ",term="\n"):
"""Helper: parse a key/value list of the form [key sep value term]* .
Return a list of (k,v)."""
res = []
for line in body.split(term):
if not line: continue
k, v = line.split(sep,1)
res.append((k,v))
return res
def _unterminate(s):
"""Strip trailing NUL characters from s."""
if s[-1] == '\0':
return s[:-1]
else:
return s
class Connection:
"""A Connection represents a connection to the Tor process."""
def __init__(self, sock):
"""Create a Connection to communicate with the Tor process over the
socket 'sock'.
"""
self._s = sock
self._handler = None
self._sendLock = threading.RLock()
self._queue = Queue.Queue()
self._thread = None
def setEventHandler(self, handler):
"""Cause future events from the Tor process to be sent to 'handler'.
"""
self._handler = handler
def launchThread(self, daemon=1):
"""Launch a background thread to handle messages from the Tor process."""
assert self._thread is None
t = threading.Thread(target=self._loop)
if daemon:
t.setDaemon(daemon)
t.start()
self._thread = t
return t
def _send(self, type, body=""):
"""Helper: Deliver a command of type 'type' and body 'body' to Tor.
"""
self._s.sendall(pack_message(type, body))
def _loop(self):
"""Main subthread loop: Read commands from Tor, and handle them either
as events or as responses to other commands.
"""
while 1:
try:
length, tp, body = _receive_message(self._s)
except OSError:
if self._queue.get(timeout=0) != "CLOSE":
raise
if tp == MSG_TYPE.EVENT:
if self._handler is not None:
self._handler.handle0(body)
else:
cb = self._queue.get()
cb(tp, body)
def _sendAndRecv(self, tp, msg="", expectedTypes=(MSG_TYPE.DONE,)):
"""Helper: Send a command of type 'tp' and body 'msg' to Tor,
and wait for a command in response. If the response type is
in expectedTypes, return a (tp,body) tuple. If it is an error,
raise ErrorReply. Otherwise, raise ProtocolError.
"""
# This condition will get notified when we've got a result...
condition = threading.Condition()
# Here's where the result goes...
result = []
def cb(tp,body,condition=condition,result=result):
condition.acquire()
try:
result.append((tp, body))
condition.notify()
finally:
condition.release()
# Sends a message to Tor...
self._sendLock.acquire()
try:
self._queue.put(cb)
self._send(tp, msg)
finally:
self._sendLock.release()
# Now wait till the answer is in...
condition.acquire()
try:
while not result:
condition.wait()
finally:
condition.release()
# ...And handle the answer appropriately.
assert len(result) == 1
tp, msg = result[0]
if tp in expectedTypes:
return tp, msg
elif tp == MSG_TYPE.ERROR:
if len(msg)<2:
raise ProtocolError("(Truncated error message)")
errCode, = struct.unpack("!H", msg[:2])
raise ErrorReply((errCode,
ERR_CODES.get(errCode,"[unrecognized]"),
msg[2:]))
else:
raise ProtocolError("Unexpectd message type 0x%04x"%tp)
def close(self):
"""Shut down this controller connection"""
self._sendLock.acquire()
try:
self._queue.put("CLOSE")
self._s.close()
finally:
self._sendLock.release()
def authenticate(self, secret=""):
"""Send an authenticating secret to Tor. You'll need to call this
method before Tor can start.
"""
self._sendAndRecv(MSG_TYPE.AUTH,secret)
def get_option(self,name):
"""Return the value of the configuration option named 'name'.
"""
if not isinstance(name, str):
name = "".join(["%s\n"%s for s in name])
tp,body = self._sendAndRecv(MSG_TYPE.GETCONF,name,[MSG_TYPE.CONFVALUE])
return _parseKV(body)
def set_option(self,key,value):
"""Set the value of the configuration option 'key' to the value 'value'.
"""
self.set_options([key, value])
def set_options(self,kvlist):
"""Given a list of (key,value) pairs, set them as configuration
options.
"""
msg = "".join(["%s %s\n" for k,v in kvlist])
self._sendAndRecv(MSG_TYPE.SETCONF,msg)
def get_info(self,name):
"""Return the value of the internal information field named 'named'.
"""
if not isinstance(name, str):
name = "".join(["%s\n"%s for s in name])
tp, body = self._sendAndRecv(MSG_TYPE.GETINFO,name,[MSG_TYPE.INFOVALUE])
kvs = body.split("\0")
d = {}
for i in xrange(0,len(kvs)-1,2):
d[kvs[i]] = kvs[i+1]
return d
def set_events(self,events):
"""Change the list of events that the event handler is interested
in to those in 'events', which is a list of EVENT_TYPE members
or corresponding strings.
"""
self._sendAndRecv(MSG_TYPE.SETEVENTS,
"".join([struct.pack("!H", event) for event in events]))
def save_conf(self):
"""Flush all configuration changes to disk.
"""
self._sendAndRecv(MSG_TYPE.SAVECONF)
def send_signal(self, sig):
"""Send the signal 'sig' to the Tor process; 'sig' must be a member of
SIGNAL.
"""
self._sendAndRecv(MSG_TYPE.SIGNAL,struct.pack("B",sig))
def map_address(self, kvList):
"""Given a list of (old-address,new-address), have Tor redirect
streams from old-address to new-address. Old-address can be in a
special "dont-care" form of "0.0.0.0" or ".".
"""
msg = [ "%s %s\n"%(k,v) for k,v in kvList ]
tp, body = self._sendAndRecv(MSG_TYPE.MAPADDRESS,"".join(msg))
return _parseKV(body)
def extend_circuit(self, circid, hops):
"""Tell Tor to extend the circuit identified by 'circid' through the
servers named in the list "hops".
"""
msg = struct.pack("!L",circid) + ",".join(hops) + "\0"
tp, body = self._sendAndRecv(MSG_TYPE.EXTENDCIRCUIT,msg)
if len(body) != 4:
raise ProtocolError("Extendcircuit reply too short or long")
return struct.unpack("!L",body)[0]
def redirect_stream(self, streamid, newtarget):
"""Tell Tor to change the target address of the stream identified by
'streamid' from its old value to 'newtarget'."""
msg = struct.pack("!L",streamid) + newtarget + "\0"
self._sendAndRecv(MSG_TYPE.REDIRECTSTREAM,msg)
def attach_stream(self, streamid, circid):
"""Tell Tor To attach stream 'streamid' to circuit 'circid'."""
msg = struct.pack("!LL",streamid, circid)
self._sendAndRecv(MSG_TYPE.ATTACHSTREAM,msg)
def close_stream(self, streamid, reason=0, flags=()):
"""Close the stream 'streamid'. """
msg = struct.pack("!LBB",streamid,reason,flags)
self._sendAndRecv(MSG_TYPE.CLOSESTREAM,msg)
def close_circuit(self, circid, flags=()):
"""Close the circuit 'circid'."""
if "IFUNUSED" in flags:
flags=1
else:
flags=0
msg = struct.pack("!LB",circid,flags)
self._sendAndRecv(MSG_TYPE.CLOSECIRCUIT,msg)
def post_descriptor(self, descriptor):
"""Tell Tor about a new descriptor in 'descriptor'."""
self._sendAndRecv(MSG_TYPE.POSTDESCRIPTOR,descriptor)
class DebugEventHandler(EventHandler):
"""Trivial event handler: dumps all events to stdout."""
def __init__(self, out=None):
if out is None:
out = sys.stdout
self._out = out
def handle(self, body):
evtype, args = self.decode(body)
print >>self._out,EVENT_TYPE.nameOf[evtype],args
--- NEW FILE: TorCtl1.py ---
#!/usr/bin/python
# TorCtl.py -- Python module to interface with Tor Control interface.
# Copyright 2005 Nick Mathewson -- See LICENSE for licensing information.
#$Id: TorCtl1.py,v 1.1 2005/06/19 22:38:31 nickm Exp $
import binascii
import os
import re
import socket
import sys
import threading
import types
import Queue
def _quote(s):
return re.sub(r'([\r\n\\\"])', r'\\\1', s)
def _escape_dots(s, translate_nl=1):
if translate_ln:
lines = re.split(r"\r?\n", s)
else:
lines = s.split("\r\n")
if lines and not lines[-1]:
del lines[-1]
for i in xrange(len(lines)):
if lines[i].startswith("."):
lines[i] = "."+lines[i]
lines.append(".\r\n")
return "\r\n".join(lines)
def _unescape_dots(s, translate_nl=1):
lines = s.split("\r\n")
for i in xrange(len(lines)):
if lines[i].startswith("."):
lines[i] = lines[i][1:]
if lines and lines[-1]:
lines.append("")
if translate_nl:
return "\n".join(lines)
else:
return "\r\n".join(lines)
def _parseKV(body,sep=" ",term="\n"):
"""Helper: parse a key/value list of the form [key sep value term]* .
Return a list of (k,v)."""
res = []
for line in body.split(term):
if not line: continue
k, v = line.split(sep,1)
res.append((k,v))
return res
def _read_reply(f,debugFile=None):
lines = []
while 1:
line = f.readline().strip()
if debugFile:
debugFile.write(" %s\n" % line)
if len(line)<4:
raise ProtocolError("Badly formatted reply line: Too short")
code = line[:3]
tp = line[3]
s = line[3:]
if s == "-":
lines.append((tp, s, None))
elif s == " ":
lines.append((tp, s, None))
return lines
elif s != "+":
raise ProtocolError("Badly formatted reply line: unknown type %r",
s)
else:
more = []
while 1:
line = f.readline()
if debugFile:
debugFile.write(" %s" % line)
if line in (".\r\n", ".\n"):
break
more.append(line)
lines.append((tp, s, _unescape_dots("".join(more)))
class Connection:
"""A Connection represents a connection to the Tor process."""
def __init__(self, sock, file=None):
"""Create a Connection to communicate with the Tor process over the
socket 'sock'.
"""
if file:
self._s = file
else:
assert sock
self._s = sock.makefile("r+b");
self._debugFile = None
self._handler = None
self._sendLock = threading.RLock()
self._queue = Queue.Queue()
self._thread = None
def debug(self, f):
"""DOCDOC"""
self._debugFile = f
def setEventHandler(self, handler):
"""Cause future events from the Tor process to be sent to 'handler'.
"""
self._handler = handler
def launchThread(self, daemon=1):
"""Launch a background thread to handle messages from the Tor process."""
assert self._thread is None
t = threading.Thread(target=self._loop)
if daemon:
t.setDaemon(daemon)
t.start()
self._thread = t
return t
def close(self):
"""Shut down this controller connection"""
self._sendLock.acquire()
try:
self._queue.put("CLOSE")
self._s.close()
finally:
self._sendLock.release()
def _loop(self):
"""Main subthread loop: Read commands from Tor, and handle them either
as events or as responses to other commands.
"""
while 1:
try:
lines = _read_reply(self._s,self._debugFile)
except OSError:
if self._queue.get(timeout=0) != "CLOSE":
raise
assert lines
if lines[0][0][0] == "6":
if self._handler is not None:
self._handler.handle1(body)
else:
cb = self._queue.get()
cb(lines)
def _sendAndRecv(self, msg="", expectedTypes=("250",)):
"""Helper: Send a command 'msg' to Tor, and wait for a command
in response. If the response type is in expectedTypes,
return a list of (tp,body,extra) tuples. If it is an
error, raise ErrorReply. Otherwise, raise ProtocolError.
"""
# This condition will get notified when we've got a result...
condition = threading.Condition()
# Here's where the result goes...
result = []
def cb(lines,condition=condition,result=result):
condition.acquire()
try:
result.append((lines))
condition.notify()
finally:
condition.release()
if type(msg) == types.ListType:
msg = "".join(msg)
assert msg.endswith("\r\n")
# Sends a message to Tor...
self._sendLock.acquire()
try:
self._queue.put(cb)
if self._debugFile:
self._debugFile.write(">>> %s" % msg)
self._s.write(msg)
finally:
self._sendLock.release()
# Now wait till the answer is in...
condition.acquire()
try:
while not result:
condition.wait()
finally:
condition.release()
# ...And handle the answer appropriately.
assert len(result) == 1
lines = result[0]
for tp, msg, _ in lines:
if tp[0] in '45':
raise ErrorReply("%s %s"%(tp, msg))
if tp not in expectedTypes:
raise ProtocolError("Unexpectd message type %r"%tp)
return lines
def authenticate(self, secret=""):
"""Send an authenticating secret to Tor. You'll need to call this
method before Tor can start.
"""
hexstr = binascii.b2a_hex(MSG_TYPE.AUTH,secret)
self._sendAndRecv("AUTHENTICATE %s\r\n"%hexstr)
def get_option(self, name):
"""Return the value of the configuration option named 'name'.
DOCDOC
"""
if not isinstance(name, str):
name = " ".join(name)
lines = self._sendAndRecv("GETCONF %s\r\n" % name)
for _,line,_ in lines:
try:
key, val = line.split("=", 1)
r.append((key,val))
except ValueError:
r.append((line, None))
return r
def set_option(self, key, value):
"""Set the value of the configuration option 'key' to the value 'value'.
"""
self.set_options([key, value])
def set_options(self, kvlist):
"""Given a list of (key,value) pairs, set them as configuration
options.
"""
if not kvlist:
return
msg = " ".join(["%s=%s"%(k,_quote(v)) for k,v in kvlist])
self._sendAndRecv("SETCONF %s\r\n"%msg)
def get_info(self, name):
"""Return the value of the internal information field named 'named'.
DOCDOC
"""
if not isinstance(name, str):
name = " ".join(name)
lines = self._sendAndRecv("GETINFO %s\r\n"%name)
d = {}
for _,msg,more in lines:
if msg == "OK":
break
try:
k,rest = msg.split("=",1)
except ValueError:
raise ProtocolError("Bad info line %r",msg)
if more:
d[k] = more
else:
d[k] = rest
return d
def set_events(self, events):
"""Change the list of events that the event handler is interested
in to those in 'events', which is a list of event names.
Recognized event names are listed in section 3.3 of the control-spec
"""
evs = []
# Translate options supported by old interface.
for e in events:
if e == 0x0001 or e == "CIRCSTATUS":
e = "CIRC"
elif e == 0x0002 or e == "STREAMSTATUS":
e = "STREAM"
elif e == 0x0003 or e == "ORCONNSTATUS":
e = "ORCONN"
elif e == 0x0004 or e == "BANDWIDTH":
e = "BW"
elif e == 0x0005 or e == "OBSOLETE_LOG":
coneinue
elif e == 0x0006 or e == "NEWDESC":
e = "NEWDESC"
elif e == 0x0007 or e == "DEBUG_MSG":
continue
elif e == 0x0008 or e == "INFO_MSG":
e = "INFO"
elif e == 0x0008 or e == "NOTICE_MSG":
e = "NOTICE"
elif e == 0x0008 or e == "WARN_MSG":
e = "WARN"
elif e == 0x0008 or e == "ERR_MSG":
e = "ERR"
evs.append(e)
self._sendAndRecv("SETEVENTS %s"\r\n," ".join(evs))
def save_conf(self):
"""Flush all configuration changes to disk.
"""
self._sendAndRecv("SAVECONF\r\n")
def send_signal(self, sig):
"""Send the signal 'sig' to the Tor process; The allowed values for
'sig' are listed in section 3.6 of control-spec.
"""
sig = { 0x01 : "HUP",
0x02 : "INT",
0x0A : "USR1",
0x0C : "USR2",
0x0F : "TERM" }.get(sig,sig)
seld._sendAndRecv("SIGNAL %s\r\n"%sig)
def map_address(self, kvList):
if not kvList:
return
m = " ".join([ "%s=%s" for k,v in kvList])
lines = self._sendAndRecv("MAPADDRESS %s\r\n"%m)
r = []
for _,line,_ in lines:
try:
key, val = line.split("=", 1)
except ValueError:
raise ProtocolError("Bad address line %r",v)
r.append((key,val))
return r
def extend_circuit(self, circid, hops):
"""Tell Tor to extend the circuit identified by 'circid' through the
servers named in the list 'hops'.
"""
if circid is None:
circid = "0"
lines = self._sendAndRecv("EXTENDCIRCUIT %s %s\r\n"
%(circid, ",".join(hops)))
tp,msg,_ = lines[0]
m = re.match(r'EXTENDED (\S*)', msg)
if not m:
raise ProtocolError("Bad extended line %r",msg)
return m.group(1)
def redirect_stream(self, streamid, newtarget):
"""DOCDOC"""
self._sendAndRecv("REDIRECTSTREAM %s %s\r\n"%(streamid, newtarget))
def attach_stream(self, streamid, circid):
"""DOCDOC"""
self._sendAndRecv("REDIRECTSTREAM %s %s\r\n"%(streamid, circid))
def close_stream(self, streamid, reason=0, flags=()):
"""DOCDOC"""
self._sendAndRecv("CLOSESTREAM %s %s %s\r\n"
%(streamid, reason, "".join(flags)))
def close_circuit(self, circid, reason=0, flags=()):
"""DOCDOC"""
self._sendAndRecv("CLOSECIRCUIT %s %s %s\r\n"
%(circid, reason, "".join(flags)))
def post_descriptor(self, desc):
self._sendAndRecv("+POSTDESCRIPTOR\r\n%s"%_escape_dots(desc))
More information about the tor-commits
mailing list