[or-cvs] r15468: adapt Blossom to use spiffy new TorCtl from Mike Perry (blossom/trunk)
goodell at seul.org
goodell at seul.org
Thu Jun 26 04:07:40 UTC 2008
Author: goodell
Date: 2008-06-26 00:07:38 -0400 (Thu, 26 Jun 2008)
New Revision: 15468
Removed:
blossom/trunk/TorCtl0.py
blossom/trunk/TorCtl1.py
Modified:
blossom/trunk/TorCtl.py
blossom/trunk/blossom.py
blossom/trunk/exit.py
Log:
adapt Blossom to use spiffy new TorCtl from Mike Perry
Modified: blossom/trunk/TorCtl.py
===================================================================
--- blossom/trunk/TorCtl.py 2008-06-26 03:17:14 UTC (rev 15467)
+++ blossom/trunk/TorCtl.py 2008-06-26 04:07:38 UTC (rev 15468)
@@ -1,536 +1,1092 @@
#!/usr/bin/python
# TorCtl.py -- Python module to interface with Tor Control interface.
-# Copyright 2005 Nick Mathewson -- See LICENSE for licensing information.
-#$Id: TorCtl.py,v 1.13 2005/11/19 19:42:31 nickm Exp $
+# Copyright 2005 Nick Mathewson
+# Copyright 2007 Mike Perry. See LICENSE file.
"""
-TorCtl -- Library to control Tor processes. See TorCtlDemo.py for example use.
+Library to control Tor processes.
+
+This library handles sending commands, parsing responses, and delivering
+events to and from the control port. The basic usage is to create a
+socket, wrap that in a TorCtl.Connection, and then add an EventHandler
+to that connection. A simple example with a DebugEventHandler (that just
+echoes the events back to stdout) is present in run_example().
+
+Note that the TorCtl.Connection is fully compatible with the more
+advanced EventHandlers in TorCtl.PathSupport (and of course any other
+custom event handlers that you may extend off of those).
+
+This package also contains a helper class for representing Routers, and
+classes and constants for each event.
+
"""
+__all__ = ["EVENT_TYPE", "TorCtlError", "TorCtlClosed", "ProtocolError",
+ "ErrorReply", "NetworkStatus", "ExitPolicyLine", "Router",
+ "RouterVersion", "Connection", "parse_ns_body",
+ "EventHandler", "DebugEventHandler", "NetworkStatusEvent",
+ "NewDescEvent", "CircuitEvent", "StreamEvent", "ORConnEvent",
+ "StreamBwEvent", "LogEvent", "AddrMapEvent", "BWEvent",
+ "UnknownEvent" ]
+
import os
import re
import struct
import sys
import threading
import Queue
+import datetime
+import traceback
+import socket
+import binascii
+import types
+import time
+from TorUtil import *
+# Types of "EVENT" message.
+EVENT_TYPE = Enum2(
+ CIRC="CIRC",
+ STREAM="STREAM",
+ ORCONN="ORCONN",
+ STREAM_BW="STREAM_BW",
+ BW="BW",
+ NS="NS",
+ NEWDESC="NEWDESC",
+ ADDRMAP="ADDRMAP",
+ DEBUG="DEBUG",
+ INFO="INFO",
+ NOTICE="NOTICE",
+ WARN="WARN",
+ ERR="ERR")
+
class TorCtlError(Exception):
- "Generic error raised by TorControl code."
- pass
+ "Generic error raised by TorControl code."
+ pass
class TorCtlClosed(TorCtlError):
- "Raised when the controller connection is closed by Tor (not by us.)"
- pass
+ "Raised when the controller connection is closed by Tor (not by us.)"
+ pass
class ProtocolError(TorCtlError):
- "Raised on violations in Tor controller protocol"
- pass
+ "Raised on violations in Tor controller protocol"
+ pass
class ErrorReply(TorCtlError):
- "Raised when Tor controller returns an error"
- pass
+ "Raised when Tor controller returns an error"
+ pass
-class EventHandler:
- """An 'EventHandler' wraps callbacks for the events Tor can return."""
- def __init__(self):
- """Create a new EventHandler."""
- from TorCtl0 import EVENT_TYPE
- self._map0 = {
- EVENT_TYPE.CIRCSTATUS : self.circ_status,
- EVENT_TYPE.STREAMSTATUS : self.stream_status,
- EVENT_TYPE.ORCONNSTATUS : self.or_conn_status,
- EVENT_TYPE.BANDWIDTH : self.bandwidth,
- EVENT_TYPE.NEWDESC : self.new_desc,
- EVENT_TYPE.DEBUG_MSG : self.msg,
- EVENT_TYPE.INFO_MSG : self.msg,
- EVENT_TYPE.NOTICE_MSG : self.msg,
- EVENT_TYPE.WARN_MSG : self.msg,
- EVENT_TYPE.ERR_MSG : self.msg,
- }
- self._map1 = {
- "CIRC" : self.circ_status,
- "STREAM" : self.stream_status,
- "ORCONN" : self.or_conn_status,
- "BW" : self.bandwidth,
- "DEBUG" : self.msg,
- "INFO" : self.msg,
- "NOTICE" : self.msg,
- "WARN" : self.msg,
- "ERR" : self.msg,
- "NEWDESC" : self.new_desc,
- "ADDRMAP" : self.address_mapped
- }
+class NetworkStatus:
+ "Filled in during NS events"
+ def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags):
+ self.nickname = nickname
+ self.idhash = idhash
+ self.orhash = orhash
+ self.ip = ip
+ self.orport = int(orport)
+ self.dirport = int(dirport)
+ self.flags = flags
+ self.idhex = (self.idhash + "=").decode("base64").encode("hex").upper()
+ m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
+ self.updated = datetime.datetime(*map(int, m.groups()))
- def handle0(self, evbody):
- """Dispatcher: called from Connection when an event is received."""
- evtype, args = self.decode0(evbody)
- self._map0.get(evtype, self.unknown_event)(evtype, *args)
+class NetworkStatusEvent:
+ def __init__(self, event_name, nslist):
+ self.event_name = event_name
+ self.arrived_at = 0
+ self.nslist = nslist # List of NetworkStatus objects
- def decode0(self, body):
- """Unpack an event message into a type/arguments-tuple tuple."""
- if len(body)<2:
- raise ProtocolError("EVENT body too short.")
- evtype, = struct.unpack("!H", body[:2])
- body = body[2:]
- if evtype == EVENT_TYPE.CIRCSTATUS:
- if len(body)<5:
- raise ProtocolError("CIRCUITSTATUS event too short.")
- status,ident = struct.unpack("!BL", body[:5])
- path = _unterminate(body[5:]).split(",")
- args = status, ident, path
- elif evtype == EVENT_TYPE.STREAMSTATUS:
- if len(body)<5:
- raise ProtocolError("STREAMSTATUS event too short.")
- status,ident = struct.unpack("!BL", body[:5])
- target = _unterminate(body[5:])
- args = status, ident, target
- elif evtype == EVENT_TYPE.ORCONNSTATUS:
- if len(body)<2:
- raise ProtocolError("ORCONNSTATUS event too short.")
- status = ord(body[0])
- target = _unterminate(body[1:])
- args = status, target
- elif evtype == EVENT_TYPE.BANDWIDTH:
- if len(body)<8:
- raise ProtocolError("BANDWIDTH event too short.")
- read, written = struct.unpack("!LL",body[:8])
- args = read, written
- elif evtype == EVENT_TYPE.OBSOLETE_LOG:
- args = (_unterminate(body),)
- elif evtype == EVENT_TYPE.NEWDESC:
- args = (_unterminate(body).split(","),)
- elif EVENT_TYPE.DEBUG_MSG <= evtype <= EVENT_TYPE.ERR_MSG:
- args = (EVENT_TYPE.nameOf[evtype], _unterminate(body))
- else:
- args = (body,)
+class NewDescEvent:
+ def __init__(self, event_name, idlist):
+ self.event_name = event_name
+ self.arrived_at = 0
+ self.idlist = idlist
- return evtype, args
+class CircuitEvent:
+ def __init__(self, event_name, circ_id, status, path, reason,
+ remote_reason):
+ self.event_name = event_name
+ self.arrived_at = 0
+ self.circ_id = circ_id
+ self.status = status
+ self.path = path
+ self.reason = reason
+ self.remote_reason = remote_reason
- def handle1(self, lines):
- """Dispatcher: called from Connection when an event is received."""
- for code, msg, data in lines:
- evtype, args = self.decode1(msg)
- self._map1.get(evtype, self.unknown_event)(evtype, *args)
+class StreamEvent:
+ def __init__(self, event_name, strm_id, status, circ_id, target_host,
+ target_port, reason, remote_reason, source, source_addr, purpose):
+ self.event_name = event_name
+ self.arrived_at = 0
+ self.strm_id = strm_id
+ self.status = status
+ self.circ_id = circ_id
+ self.target_host = target_host
+ self.target_port = int(target_port)
+ self.reason = reason
+ self.remote_reason = remote_reason
+ self.source = source
+ self.source_addr = source_addr
+ self.purpose = purpose
- def decode1(self, body):
- """Unpack an event message into a type/arguments-tuple tuple."""
- if " " in body:
- evtype,body = body.split(" ",1)
- else:
- evtype,body = body,""
- evtype = evtype.upper()
- if evtype == "CIRC":
- m = re.match(r"(\S+)\s+(\S+)(\s\S+)?", body)
- if not m:
- raise ProtocolError("CIRC event misformatted.")
- status,ident,path = m.groups()
- if path:
- path = path.strip().split(",")
- else:
- path = []
- args = status, ident, path
- elif evtype == "STREAM":
- m = re.match(r"(\S+)\s+(\S+)\s+(\S+)\s+(\S+)", body)
- if not m:
- raise ProtocolError("STREAM event misformatted.")
- ident,status,circ,target = m.groups()
- args = status, ident, target, circ
- elif evtype == "ORCONN":
- m = re.match(r"(\S+)\s+(\S+)", body)
- if not m:
- raise ProtocolError("ORCONN event misformatted.")
- target, status = m.groups()
- args = status, target
- elif evtype == "BW":
- m = re.match(r"(\d+)\s+(\d+)", body)
- if not m:
- raise ProtocolError("BANDWIDTH event misformatted.")
- read, written = map(long, m.groups())
- args = read, written
- elif evtype in ("DEBUG", "INFO", "NOTICE", "WARN", "ERR"):
- args = evtype, body
- elif evtype == "NEWDESC":
- args = ((" ".split(body)),)
- elif evtype == "ADDRMAP":
- m = re.match(r'(\S+)\s+(\S+)\s+(\"[^"]+\"|\w+)')
- if not m:
- raise ProtocolError("BANDWIDTH event misformatted.")
- fromaddr, toaddr, when = m.groups()
- if when.upper() == "NEVER":
- when = None
- else:
- when = time.localtime(
- time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S"))
- args = fromaddr, toaddr, when
- else:
- args = (body,)
+class ORConnEvent:
+ def __init__(self, event_name, status, endpoint, age, read_bytes,
+ wrote_bytes, reason, ncircs):
+ self.event_name = event_name
+ self.arrived_at = 0
+ self.status = status
+ self.endpoint = endpoint
+ self.age = age
+ self.read_bytes = read_bytes
+ self.wrote_bytes = wrote_bytes
+ self.reason = reason
+ self.ncircs = ncircs
- return evtype, args
+class StreamBwEvent:
+ def __init__(self, event_name, strm_id, read, written):
+ self.event_name = event_name
+ self.strm_id = int(strm_id)
+ self.bytes_read = int(read)
+ self.bytes_written = int(written)
- def unknown_event(self, eventtype, evtype, *args):
- """Called when we get an event type we don't recognize. This
- is almost alwyas an error.
- """
- raise NotImplemented
+class LogEvent:
+ def __init__(self, level, msg):
+ self.event_name = self.level = level
+ self.msg = msg
- def circ_status(self, eventtype, status, circID, path):
- """Called when a circuit status changes if listening to CIRCSTATUS
- events. 'status' is a member of CIRC_STATUS; circID is a numeric
- circuit ID, and 'path' is the circuit's path so far as a list of
- names.
- """
- raise NotImplemented
+class AddrMapEvent:
+ def __init__(self, event_name, from_addr, to_addr, when):
+ self.event_name = event_name
+ self.from_addr = from_addr
+ self.to_addr = to_addr
+ self.when = when
- def stream_status(self, eventtype, status, streamID, target, circID="0"):
- """Called when a stream status changes if listening to STREAMSTATUS
- events. 'status' is a member of STREAM_STATUS; streamID is a
- numeric stream ID, and 'target' is the destination of the stream.
- """
- raise NotImplemented
+class BWEvent:
+ def __init__(self, event_name, read, written):
+ self.event_name = event_name
+ self.read = read
+ self.written = written
- def or_conn_status(self, eventtype, status, target):
- """Called when an OR connection's status changes if listening to
- ORCONNSTATUS events. 'status' is a member of OR_CONN_STATUS; target
- is the OR in question.
- """
- raise NotImplemented
+class UnknownEvent:
+ def __init__(self, event_name, event_string):
+ self.event_name = event_name
+ self.event_string = event_string
- def bandwidth(self, eventtype, read, written):
- """Called once a second if listening to BANDWIDTH events. 'read' is
- the number of bytes read; 'written' is the number of bytes written.
- """
- raise NotImplemented
+class ExitPolicyLine:
+ """ Class to represent a line in a Router's exit policy in a way
+ that can be easily checked. """
+ def __init__(self, match, ip_mask, port_low, port_high):
+ self.match = match
+ if ip_mask == "*":
+ self.ip = 0
+ self.netmask = 0
+ else:
+ if not "/" in ip_mask:
+ self.netmask = 0xFFFFFFFF
+ ip = ip_mask
+ else:
+ ip, mask = ip_mask.split("/")
+ if re.match(r"\d+.\d+.\d+.\d+", mask):
+ self.netmask=struct.unpack(">I", socket.inet_aton(mask))[0]
+ else:
+ self.netmask = ~(2**(32 - int(mask)) - 1)
+ self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+ self.ip &= self.netmask
+ if port_low == "*":
+ self.port_low,self.port_high = (0,65535)
+ else:
+ if not port_high:
+ port_high = port_low
+ self.port_low = int(port_low)
+ self.port_high = int(port_high)
+
+ def check(self, ip, port):
+ """Check to see if an ip and port is matched by this line.
+ Returns true if the line is an Accept, and False if it is a Reject. """
+ ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+ if (ip & self.netmask) == self.ip:
+ if self.port_low <= port and port <= self.port_high:
+ return self.match
+ return -1
- def new_desc(self, eventtype, identities):
- """Called when Tor learns a new server descriptor if listenting to
- NEWDESC events.
- """
- raise NotImplemented
+class RouterVersion:
+ """ Represents a Router's version. Overloads all comparison operators
+ to check for newer, older, or equivalent versions. """
+ def __init__(self, version):
+ if version:
+ v = re.search("^(\d+).(\d+).(\d+).(\d+)", version).groups()
+ self.version = int(v[0])*0x1000000 + int(v[1])*0x10000 + int(v[2])*0x100 + int(v[3])
+ self.ver_string = version
+ else:
+ self.version = version
+ self.ver_string = "unknown"
- def msg(self, eventtype, severity, message):
- """Called when a log message of a given severity arrives if listening
- to INFO_MSG, NOTICE_MSG, WARN_MSG, or ERR_MSG events."""
- raise NotImplemented
+ def __lt__(self, other): return self.version < other.version
+ def __gt__(self, other): return self.version > other.version
+ def __ge__(self, other): return self.version >= other.version
+ def __le__(self, other): return self.version <= other.version
+ def __eq__(self, other): return self.version == other.version
+ def __ne__(self, other): return self.version != other.version
+ def __str__(self): return self.ver_string
- def address_mapped(self, eventtype, fromAddr, toAddr, expiry=None):
- """Called when Tor adds a mapping for an address if listening
- to ADDRESSMAPPED events.
- """
- raise NotImplemented
+class Router:
+ """
+ Class to represent a router from a descriptor. Can either be
+ created from the parsed fields, or can be built from a
+ descriptor+NetworkStatus
+ """
+ def __init__(self, idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime):
+ self.idhex = idhex
+ self.nickname = name
+ self.bw = bw
+ self.exitpolicy = exitpolicy
+ self.flags = flags
+ self.down = down
+ self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+ self.version = RouterVersion(version)
+ self.os = os
+ self.list_rank = 0 # position in a sorted list of routers.
+ self.uptime = uptime
-class _ConnectionBase:
- def __init__(self):
- self._s = None
- self._handler = None
- self._handleFn = None
- self._sendLock = threading.RLock()
- self._queue = Queue.Queue()
- self._thread = None
- self._closedEx = None
- self._closed = 0
- self._closeHandler = None
- self._eventThread = None
- self._eventQueue = Queue.Queue()
+ def __str__(self):
+ s = self.idhex, self.nickname
+ return s.__str__()
- def set_event_handler(self, handler):
- """Cause future events from the Tor process to be sent to 'handler'.
- """
- raise NotImplemented
+ def build_from_desc(desc, ns):
+ """
+ Static method of Router that parses a descriptor string into this class.
+ 'desc' is a full descriptor as a string.
+ 'ns' is a TorCtl.NetworkStatus instance for this router (needed for
+ the flags, the nickname, and the idhex string).
+ Returns a Router instance.
+ """
+ # XXX: Compile these regular expressions? This is an expensive process
+ # Use http://docs.python.org/lib/profile.html to verify this is
+ # the part of startup that is slow
+ exitpolicy = []
+ dead = not ("Running" in ns.flags)
+ bw_observed = 0
+ version = None
+ os = None
+ uptime = 0
+ ip = 0
+ router = "[none]"
- def set_close_handler(self, handler):
- """Call 'handler' when the Tor process has closed its connection or
- given us an exception. If we close normally, no arguments are
- provided; otherwise, it will be called with an exception as its
- argument.
- """
- self._closeHandler = handler
+ for line in desc:
+ rt = re.search(r"^router (\S+) (\S+)", line)
+ fp = re.search(r"^opt fingerprint (.+).*on (\S+)", line)
+ pl = re.search(r"^platform Tor (\S+).*on (\S+)", line)
+ ac = re.search(r"^accept (\S+):([^-]+)(?:-(\d+))?", line)
+ rj = re.search(r"^reject (\S+):([^-]+)(?:-(\d+))?", line)
+ bw = re.search(r"^bandwidth \d+ \d+ (\d+)", line)
+ up = re.search(r"^uptime (\d+)", line)
+ if re.search(r"^opt hibernating 1", line):
+ #dead = 1 # XXX: Technically this may be stale..
+ if ("Running" in ns.flags):
+ plog("INFO", "Hibernating router "+ns.nickname+" is running..")
+ if ac:
+ exitpolicy.append(ExitPolicyLine(True, *ac.groups()))
+ elif rj:
+ exitpolicy.append(ExitPolicyLine(False, *rj.groups()))
+ elif bw:
+ bw_observed = int(bw.group(1))
+ elif pl:
+ version, os = pl.groups()
+ elif up:
+ uptime = int(up.group(1))
+ elif rt:
+ router,ip = rt.groups()
+ if router != ns.nickname:
+ plog("NOTICE", "Got different names " + ns.nickname + " vs " +
+ router + " for " + ns.idhex)
+ if not bw_observed and not dead and ("Valid" in ns.flags):
+ plog("INFO", "No bandwidth for live router " + ns.nickname)
+ if not version or not os:
+ plog("INFO", "No version and/or OS for router " + ns.nickname)
+ return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy,
+ ns.flags, ip, version, os, uptime)
+ build_from_desc = Callable(build_from_desc)
- def close(self):
- """Shut down this controller connection"""
- self._sendLock.acquire()
- try:
- self._queue.put("CLOSE")
- self._eventQueue.put("CLOSE")
- self._s.close()
- self._s = None
- self._closed = 1
- finally:
- self._sendLock.release()
+ def update_to(self, new):
+ """ Somewhat hackish method to update this router to be a copy of
+ 'new' """
+ if self.idhex != new.idhex:
+ plog("ERROR", "Update of router "+self.nickname+"changes idhex!")
+ self.idhex = new.idhex
+ self.nickname = new.nickname
+ self.bw = new.bw
+ self.exitpolicy = new.exitpolicy
+ self.flags = new.flags
+ self.ip = new.ip
+ self.version = new.version
+ self.os = new.os
+ self.uptime = new.uptime
- def _read_reply(self):
- """DOCDOC"""
- raise NotImplementd
+ def will_exit_to(self, ip, port):
+ """ Check the entire exitpolicy to see if the router will allow
+ connections to 'ip':'port' """
+ for line in self.exitpolicy:
+ ret = line.check(ip, port)
+ if ret != -1:
+ return ret
+ plog("WARN", "No matching exit line for "+self.nickname)
+ return False
+
+class Connection:
+ """A Connection represents a connection to the Tor process via the
+ control port."""
+ def __init__(self, sock):
+ """Create a Connection to communicate with the Tor process over the
+ socket 'sock'.
+ """
+ self._handler = None
+ self._handleFn = None
+ self._sendLock = threading.RLock()
+ self._queue = Queue.Queue()
+ self._thread = None
+ self._closedEx = None
+ self._closed = 0
+ self._closeHandler = None
+ self._eventThread = None
+ self._eventQueue = Queue.Queue()
+ self._s = BufSock(sock)
+ self._debugFile = None
- def launch_thread(self, daemon=1):
- """Launch a background thread to handle messages from the Tor process."""
- assert self._thread is None
- t = threading.Thread(target=self._loop)
- if daemon:
- t.setDaemon(daemon)
- t.start()
- self._thread = t
- t = threading.Thread(target=self._eventLoop)
- if daemon:
- t.setDaemon(daemon)
- t.start()
- self._eventThread = t
- return self._thread
+ def set_close_handler(self, handler):
+ """Call 'handler' when the Tor process has closed its connection or
+ given us an exception. If we close normally, no arguments are
+ provided; otherwise, it will be called with an exception as its
+ argument.
+ """
+ self._closeHandler = handler
- def _loop(self):
- """Main subthread loop: Read commands from Tor, and handle them either
- as events or as responses to other commands.
- """
- while 1:
- ex = None
- try:
- isEvent, reply = self._read_reply()
- except:
- self._err(sys.exc_info())
- return
+ def close(self):
+ """Shut down this controller connection"""
+ self._sendLock.acquire()
+ try:
+ self._queue.put("CLOSE")
+ self._eventQueue.put((time.time(), "CLOSE"))
+ finally:
+ self._sendLock.release()
- if isEvent:
- if self._handler is not None:
- self._eventQueue.put(reply)
- else:
- cb = self._queue.get()
- cb(reply)
+ def launch_thread(self, daemon=1):
+ """Launch a background thread to handle messages from the Tor process."""
+ assert self._thread is None
+ t = threading.Thread(target=self._loop)
+ if daemon:
+ t.setDaemon(daemon)
+ t.start()
+ self._thread = t
+ t = threading.Thread(target=self._eventLoop)
+ if daemon:
+ t.setDaemon(daemon)
+ t.start()
+ self._eventThread = t
+ return self._thread
- def _err(self, (tp, ex, tb), fromEventLoop=0):
- """DOCDOC"""
- if self._s:
- try:
- self.close()
- except:
- pass
- self._sendLock.acquire()
- try:
- self._closedEx = ex
- self._closed = 1
- finally:
- self._sendLock.release()
- while 1:
- try:
- cb = self._queue.get(timeout=0)
- if cb != "CLOSE":
- cb("EXCEPTION")
- except Queue.Empty:
- break
- if self._closeHandler is not None:
- self._closeHandler(ex)
+ def _loop(self):
+ """Main subthread loop: Read commands from Tor, and handle them either
+ as events or as responses to other commands.
+ """
+ while 1:
+ try:
+ isEvent, reply = self._read_reply()
+ except:
+ self._err(sys.exc_info())
return
- def _eventLoop(self):
- """DOCDOC"""
- while 1:
- reply = self._eventQueue.get()
- if reply == "CLOSE":
- return
- try:
- self._handleFn(reply)
- except:
- self._err(sys.exc_info(), 1)
- return
+ if isEvent:
+ if self._handler is not None:
+ self._eventQueue.put((time.time(), reply))
+ else:
+ cb = self._queue.get() # atomic..
+ if cb == "CLOSE":
+ print "closing time"
+ self._s.close()
+ self._s = None
+ self._closed = 1
+ return
+ else:
+ cb(reply)
- def _sendImpl(self, sendFn, msg):
- """DOCDOC"""
- if self._thread is None:
- self.launch_thread(1)
- # This condition will get notified when we've got a result...
- condition = threading.Condition()
- # Here's where the result goes...
- result = []
+ def _err(self, (tp, ex, tb), fromEventLoop=0):
+ """DOCDOC"""
+ # silent death is bad :(
+ traceback.print_exception(tp, ex, tb)
+ if self._s:
+ try:
+ self.close()
+ except:
+ pass
+ self._sendLock.acquire()
+ try:
+ self._closedEx = ex
+ self._closed = 1
+ finally:
+ self._sendLock.release()
+ while 1:
+ try:
+ cb = self._queue.get(timeout=0)
+ if cb != "CLOSE":
+ cb("EXCEPTION")
+ except Queue.Empty:
+ break
+ if self._closeHandler is not None:
+ self._closeHandler(ex)
+ return
- if self._closedEx is not None:
- raise self._closedEx
- elif self._closed:
- raise TorCtl.TorCtlClosed()
+ def _eventLoop(self):
+ """DOCDOC"""
+ while 1:
+ (timestamp, reply) = self._eventQueue.get()
+ if reply[0][0] == "650" and reply[0][1] == "OK":
+ plog("DEBUG", "Ignoring incompatible syntactic sugar: 650 OK")
+ continue
+ if reply == "CLOSE":
+ return
+ try:
+ self._handleFn(timestamp, reply)
+ except:
+ for code, msg, data in reply:
+ plog("WARN", "No event for: "+str(code)+" "+str(msg))
+ self._err(sys.exc_info(), 1)
+ return
- def cb(reply,condition=condition,result=result):
- condition.acquire()
- try:
- result.append(reply)
- condition.notify()
- finally:
- condition.release()
+ def _sendImpl(self, sendFn, msg):
+ """DOCDOC"""
+ if self._thread is None:
+ self.launch_thread(1)
+ # This condition will get notified when we've got a result...
+ condition = threading.Condition()
+ # Here's where the result goes...
+ result = []
- # Sends a message to Tor...
- self._sendLock.acquire()
- try:
- self._queue.put(cb)
- sendFn(msg)
- finally:
- self._sendLock.release()
+ if self._closedEx is not None:
+ raise self._closedEx
+ elif self._closed:
+ raise TorCtlClosed()
- # Now wait till the answer is in...
- condition.acquire()
- try:
- while not result:
- condition.wait()
- finally:
- condition.release()
+ def cb(reply,condition=condition,result=result):
+ condition.acquire()
+ try:
+ result.append(reply)
+ condition.notify()
+ finally:
+ condition.release()
- # ...And handle the answer appropriately.
- assert len(result) == 1
- reply = result[0]
- if reply == "EXCEPTION":
- raise self._closedEx
+ # Sends a message to Tor...
+ self._sendLock.acquire() # ensure queue+sendmsg is atomic
+ try:
+ self._queue.put(cb)
+ sendFn(msg) # _doSend(msg)
+ finally:
+ self._sendLock.release()
- return reply
+ # Now wait till the answer is in...
+ condition.acquire()
+ try:
+ while not result:
+ condition.wait()
+ finally:
+ condition.release()
-class DebugEventHandler(EventHandler):
- """Trivial event handler: dumps all events to stdout."""
- def __init__(self, out=None):
- if out is None:
- out = sys.stdout
- self._out = out
+ # ...And handle the answer appropriately.
+ assert len(result) == 1
+ reply = result[0]
+ if reply == "EXCEPTION":
+ raise self._closedEx
- def handle0(self, body):
- evtype, args = self.decode0(body)
- print >>self._out,EVENT_TYPE.nameOf[evtype],args
+ return reply
- def handle1(self, lines):
- for code, msg, data in lines:
- print >>self._out, msg
-def detectVersion(s):
- """Helper: sends a trial command to Tor to tell whether it's running
- the first or second version of the control protocol.
+ def debug(self, f):
+ """DOCDOC"""
+ self._debugFile = f
+
+ def set_event_handler(self, handler):
+ """Cause future events from the Tor process to be sent to 'handler'.
"""
- s.sendall("\x00\x00\r\n")
- m = s.recv(4)
- v0len, v0type = struct.unpack("!HH", m)
- if v0type == '\x00\x00':
- s.recv(v0len)
- return 0
- if '\n' not in m:
+ self._handler = handler
+ self._handleFn = handler._handle1
+
+ def _read_reply(self):
+ lines = []
+ while 1:
+ line = self._s.readline().strip()
+ if self._debugFile:
+ self._debugFile.write(" %s\n" % line)
+ if len(line)<4:
+ raise ProtocolError("Badly formatted reply line: Too short")
+ code = line[:3]
+ tp = line[3]
+ s = line[4:]
+ if tp == "-":
+ lines.append((code, s, None))
+ elif tp == " ":
+ lines.append((code, s, None))
+ isEvent = (lines and lines[0][0][0] == '6')
+ return isEvent, lines
+ elif tp != "+":
+ raise ProtocolError("Badly formatted reply line: unknown type %r"%tp)
+ else:
+ more = []
while 1:
- c = s.recv(1)
- if c == '\n':
- break
- return 1
+ line = self._s.readline()
+ if self._debugFile:
+ self._debugFile.write("+++ %s" % line)
+ if line in (".\r\n", ".\n", "650 OK\n", "650 OK\r\n"):
+ break
+ more.append(line)
+ lines.append((code, s, unescape_dots("".join(more))))
+ isEvent = (lines and lines[0][0][0] == '6')
+ if isEvent: # Need "250 OK" if it's not an event. Otherwise, end
+ return (isEvent, lines)
-def parseHostAndPort(h):
- """Given a string of the form 'address:port' or 'address' or
- 'port' or '', return a two-tuple of (address, port)
+ # Notreached
+ raise TorCtlError()
+
+ def _doSend(self, msg):
+ if self._debugFile:
+ amsg = msg
+ lines = amsg.split("\n")
+ if len(lines) > 2:
+ amsg = "\n".join(lines[:2]) + "\n"
+ self._debugFile.write(">>> %s" % amsg)
+ self._s.write(msg)
+
+ def sendAndRecv(self, msg="", expectedTypes=("250", "251")):
+ """Helper: Send a command 'msg' to Tor, and wait for a command
+ in response. If the response type is in expectedTypes,
+ return a list of (tp,body,extra) tuples. If it is an
+ error, raise ErrorReply. Otherwise, raise ProtocolError.
"""
- host, port = "localhost", 9100
- if ":" in h:
- i = h.index(":")
- host = h[:i]
- try:
- port = int(h[i+1:])
- except ValueError:
- print "Bad hostname %r"%h
- sys.exit(1)
- elif h:
- try:
- port = int(h)
- except ValueError:
- host = h
+ if type(msg) == types.ListType:
+ msg = "".join(msg)
+ assert msg.endswith("\r\n")
- return host, port
+ lines = self._sendImpl(self._doSend, msg)
+ # print lines
+ for tp, msg, _ in lines:
+ if tp[0] in '45':
+ raise ErrorReply("%s %s"%(tp, msg))
+ if tp not in expectedTypes:
+ raise ProtocolError("Unexpectd message type %r"%tp)
-def get_connection(sock):
- """Given a socket attached to a Tor control port, detect the version of Tor
- and return an appropriate 'Connection' object."""
- v = detectVersion(sock)
- if v == 0:
- import TorCtl0
- return TorCtl0.Connection(sock)
+ return lines
+
+ def authenticate(self, secret=""):
+ """Send an authenticating secret to Tor. You'll need to call this
+ method before Tor can start.
+ """
+ #hexstr = binascii.b2a_hex(secret)
+ self.sendAndRecv("AUTHENTICATE \"%s\"\r\n"%secret)
+
+ def get_option(self, name):
+ """Get the value of the configuration option named 'name'. To
+ retrieve multiple values, pass a list for 'name' instead of
+ a string. Returns a list of (key,value) pairs.
+ Refer to section 3.3 of control-spec.txt for a list of valid names.
+ """
+ if not isinstance(name, str):
+ name = " ".join(name)
+ lines = self.sendAndRecv("GETCONF %s\r\n" % name)
+
+ r = []
+ for _,line,_ in lines:
+ try:
+ key, val = line.split("=", 1)
+ r.append((key,val))
+ except ValueError:
+ r.append((line, None))
+
+ return r
+
+ def set_option(self, key, value):
+ """Set the value of the configuration option 'key' to the value 'value'.
+ """
+ self.set_options([(key, value)])
+
+ def set_options(self, kvlist):
+ """Given a list of (key,value) pairs, set them as configuration
+ options.
+ """
+ if not kvlist:
+ return
+ msg = " ".join(["%s=%s"%(k,quote(v)) for k,v in kvlist])
+ self.sendAndRecv("SETCONF %s\r\n"%msg)
+
+ def reset_options(self, keylist):
+ """Reset the options listed in 'keylist' to their default values.
+
+ Tor started implementing this command in version 0.1.1.7-alpha;
+ previous versions wanted you to set configuration keys to "".
+ That no longer works.
+ """
+ self.sendAndRecv("RESETCONF %s\r\n"%(" ".join(keylist)))
+
+ def get_network_status(self, who="all"):
+ """Get the entire network status list. Returns a list of
+ TorCtl.NetworkStatus instances."""
+ return parse_ns_body(self.sendAndRecv("GETINFO ns/"+who+"\r\n")[0][2])
+
+ def get_router(self, ns):
+ """Fill in a Router class corresponding to a given NS class"""
+ desc = self.sendAndRecv("GETINFO desc/id/" + ns.idhex + "\r\n")[0][2].split("\n")
+ return Router.build_from_desc(desc, ns)
+
+
+ def read_routers(self, nslist):
+ """ Given a list a NetworkStatuses in 'nslist', this function will
+ return a list of new Router instances.
+ """
+ bad_key = 0
+ new = []
+ for ns in nslist:
+ try:
+ r = self.get_router(ns)
+ new.append(r)
+ except ErrorReply:
+ bad_key += 1
+ if "Running" in ns.flags:
+ plog("NOTICE", "Running router "+ns.nickname+"="
+ +ns.idhex+" has no descriptor")
+ except:
+ traceback.print_exception(*sys.exc_info())
+ continue
+
+ return new
+
+ def get_info(self, name):
+ """Return the value of the internal information field named 'name'.
+ Refer to section 3.9 of control-spec.txt for a list of valid names.
+ DOCDOC
+ """
+ if not isinstance(name, str):
+ name = " ".join(name)
+ lines = self.sendAndRecv("GETINFO %s\r\n"%name)
+ d = {}
+ for _,msg,more in lines:
+ if msg == "OK":
+ break
+ try:
+ k,rest = msg.split("=",1)
+ except ValueError:
+ raise ProtocolError("Bad info line %r",msg)
+ if more:
+ d[k] = more
+ else:
+ d[k] = rest
+ return d
+
+ def set_events(self, events, extended=False):
+ """Change the list of events that the event handler is interested
+ in to those in 'events', which is a list of event names.
+ Recognized event names are listed in section 3.3 of the control-spec
+ """
+ if extended:
+ plog ("DEBUG", "SETEVENTS EXTENDED %s\r\n" % " ".join(events))
+ self.sendAndRecv("SETEVENTS EXTENDED %s\r\n" % " ".join(events))
else:
- import TorCtl1
- return TorCtl1.Connection(sock)
+ self.sendAndRecv("SETEVENTS %s\r\n" % " ".join(events))
-def secret_to_key(secret, s2k_specifier):
- """Used to generate a hashed password string. DOCDOC."""
- c = ord(s2k_specifier[8])
- EXPBIAS = 6
- count = (16+(c&15)) << ((c>>4) + EXPBIAS)
+ def save_conf(self):
+ """Flush all configuration changes to disk.
+ """
+ self.sendAndRecv("SAVECONF\r\n")
- d = sha.new()
- tmp = s2k_specifier[:8]+secret
- slen = len(tmp)
- while count:
- if count > slen:
- d.update(tmp)
- count -= slen
- else:
- d.update(tmp[:count])
- count = 0
- return d.digest()
+ def send_signal(self, sig):
+ """Send the signal 'sig' to the Tor process; The allowed values for
+ 'sig' are listed in section 3.6 of control-spec.
+ """
+ sig = { 0x01 : "HUP",
+ 0x02 : "INT",
+ 0x03 : "NEWNYM",
+ 0x0A : "USR1",
+ 0x0C : "USR2",
+ 0x0F : "TERM" }.get(sig,sig)
+ self.sendAndRecv("SIGNAL %s\r\n"%sig)
-def urandom_rng(n):
- """Try to read some entropy from the platform entropy source."""
- f = open('/dev/urandom', 'rb')
- try:
- return f.read(n)
- finally:
- f.close()
+ def resolve(self, host):
+ """ Launch a remote hostname lookup request:
+ 'host' may be a hostname or IPv4 address
+ """
+ # TODO: handle "mode=reverse"
+ self.sendAndRecv("RESOLVE %s\r\n"%host)
-def s2k_gen(secret, rng=None):
+ def map_address(self, kvList):
+ """ Sends the MAPADDRESS command for each of the tuples in kvList """
+ if not kvList:
+ return
+ m = " ".join([ "%s=%s" for k,v in kvList])
+ lines = self.sendAndRecv("MAPADDRESS %s\r\n"%m)
+ r = []
+ for _,line,_ in lines:
+ try:
+ key, val = line.split("=", 1)
+ except ValueError:
+ raise ProtocolError("Bad address line %r",v)
+ r.append((key,val))
+ return r
+
+ def extend_circuit(self, circid, hops):
+ """Tell Tor to extend the circuit identified by 'circid' through the
+ servers named in the list 'hops'.
+ """
+ if circid is None:
+ circid = "0"
+ plog("DEBUG", "Extending circuit")
+ lines = self.sendAndRecv("EXTENDCIRCUIT %d %s\r\n"
+ %(circid, ",".join(hops)))
+ tp,msg,_ = lines[0]
+ m = re.match(r'EXTENDED (\S*)', msg)
+ if not m:
+ raise ProtocolError("Bad extended line %r",msg)
+ plog("DEBUG", "Circuit extended")
+ return int(m.group(1))
+
+ def redirect_stream(self, streamid, newaddr, newport=""):
"""DOCDOC"""
- if rng is None:
- if hasattr(os, "urandom"):
- rng = os.urandom
- else:
- rng = urandom_rng
- spec = "%s%s"%(rng(8), chr(96))
- return "16:%s"%(
- binascii.b2a_hex(spec + secret_to_key(secret, spec)))
+ if newport:
+ self.sendAndRecv("REDIRECTSTREAM %d %s %s\r\n"%(streamid, newaddr, newport))
+ else:
+ self.sendAndRecv("REDIRECTSTREAM %d %s\r\n"%(streamid, newaddr))
-def s2k_check(secret, k):
+ def attach_stream(self, streamid, circid, hop=None):
+ """Attach a stream to a circuit, specify both by IDs. If hop is given,
+ try to use the specified hop in the circuit as the exit node for
+ this stream.
+ """
+ if hop:
+ self.sendAndRecv("ATTACHSTREAM %d %d HOP=%d\r\n"%(streamid, circid, hop))
+ plog("DEBUG", "Attaching stream: "+str(streamid)+" to hop "+str(hop)+" of circuit "+str(circid))
+ else:
+ self.sendAndRecv("ATTACHSTREAM %d %d\r\n"%(streamid, circid))
+ plog("DEBUG", "Attaching stream: "+str(streamid)+" to circuit "+str(circid))
+
+ def close_stream(self, streamid, reason=0, flags=()):
"""DOCDOC"""
- assert k[:3] == "16:"
+ self.sendAndRecv("CLOSESTREAM %d %s %s\r\n"
+ %(streamid, reason, "".join(flags)))
- k = binascii.a2b_hex(k[3:])
- return secret_to_key(secret, k[:9]) == k[9:]
+ def close_circuit(self, circid, reason=0, flags=()):
+ """DOCDOC"""
+ self.sendAndRecv("CLOSECIRCUIT %d %s %s\r\n"
+ %(circid, reason, "".join(flags)))
-def run_example(host,port):
- print "host is %s:%d"%(host,port)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect((host,port))
- c = Connection(s)
- c.set_event_handler(DebugEventHandler())
- th = c.launchThread()
- c.authenticate()
- print "nick",`c.get_option("nickname")`
- print c.get_option("DirFetchPeriod\n")
- print `c.get_info("version")`
- #print `c.get_info("desc/name/moria1")`
- print `c.get_info("network-status")`
- print `c.get_info("addr-mappings/all")`
- print `c.get_info("addr-mappings/config")`
- print `c.get_info("addr-mappings/cache")`
- print `c.get_info("addr-mappings/control")`
- print `c.map_address([("0.0.0.0", "Foobar.com"),
- ("1.2.3.4", "foobaz.com"),
- ("frebnitz.com", "5.6.7.8"),
- (".", "abacinator.onion")])`
- print `c.extend_circuit(0,["moria1"])`
+ def post_descriptor(self, desc):
+ self.sendAndRecv("+POSTDESCRIPTOR\r\n%s"%escape_dots(desc))
+
+def parse_ns_body(data):
+ """Parse the body of an NS event or command into a list of
+ NetworkStatus instances"""
+ nsgroups = re.compile(r"^r ", re.M).split(data)
+ nsgroups.pop(0)
+ nslist = []
+ for nsline in nsgroups:
+ m = re.search(r"^s((?:\s\S*)+)", nsline, re.M)
+ flags = m.groups()
+ flags = flags[0].strip().split(" ")
+ m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)
+ nslist.append(NetworkStatus(*(m.groups() + (flags,))))
+ return nslist
+
+class EventHandler:
+ """An 'EventHandler' wraps callbacks for the events Tor can return.
+ Each event argument is an instance of the corresponding event
+ class."""
+ def __init__(self):
+ """Create a new EventHandler."""
+ self._map1 = {
+ "CIRC" : self.circ_status_event,
+ "STREAM" : self.stream_status_event,
+ "ORCONN" : self.or_conn_status_event,
+ "STREAM_BW" : self.stream_bw_event,
+ "BW" : self.bandwidth_event,
+ "DEBUG" : self.msg_event,
+ "INFO" : self.msg_event,
+ "NOTICE" : self.msg_event,
+ "WARN" : self.msg_event,
+ "ERR" : self.msg_event,
+ "NEWDESC" : self.new_desc_event,
+ "ADDRMAP" : self.address_mapped_event,
+ "NS" : self.ns_event
+ }
+
+ def _handle1(self, timestamp, lines):
+ """Dispatcher: called from Connection when an event is received."""
+ for code, msg, data in lines:
+ event = self._decode1(msg, data)
+ event.arrived_at = timestamp
+ self.heartbeat_event(event)
+ self._map1.get(event.event_name, self.unknown_event)(event)
+
+ def _decode1(self, body, data):
+ """Unpack an event message into a type/arguments-tuple tuple."""
+ if " " in body:
+ evtype,body = body.split(" ",1)
+ else:
+ evtype,body = body,""
+ evtype = evtype.upper()
+ if evtype == "CIRC":
+ m = re.match(r"(\d+)\s+(\S+)(\s\S+)?(\s\S+)?(\s\S+)?", body)
+ if not m:
+ raise ProtocolError("CIRC event misformatted.")
+ ident,status,path,reason,remote = m.groups()
+ ident = int(ident)
+ if path:
+ if "REASON=" in path:
+ remote = reason
+ reason = path
+ path=[]
+ else:
+ path = path.strip().split(",")
+ else:
+ path = []
+ if reason: reason = reason[8:]
+ if remote: remote = remote[15:]
+ event = CircuitEvent(evtype, ident, status, path, reason, remote)
+ elif evtype == "STREAM":
+ #plog("DEBUG", "STREAM: "+body)
+ m = re.match(r"(\S+)\s+(\S+)\s+(\S+)\s+(\S+):(\d+)(\sREASON=\S+)?(\sREMOTE_REASON=\S+)?(\sSOURCE=\S+)?(\sSOURCE_ADDR=\S+)?(\sPURPOSE=\S+)?", body)
+ if not m:
+ raise ProtocolError("STREAM event misformatted.")
+ ident,status,circ,target_host,target_port,reason,remote,source,source_addr,purpose = m.groups()
+ ident,circ = map(int, (ident,circ))
+ if reason: reason = reason[8:]
+ if remote: remote = remote[15:]
+ if source: source = source[8:]
+ if source_addr: source_addr = source_addr[13:]
+ if purpose: purpose = purpose[9:]
+ event = StreamEvent(evtype, ident, status, circ, target_host,
+ int(target_port), reason, remote, source, source_addr, purpose)
+ elif evtype == "ORCONN":
+ m = re.match(r"(\S+)\s+(\S+)(\sAGE=\S+)?(\sREAD=\S+)?(\sWRITTEN=\S+)?(\sREASON=\S+)?(\sNCIRCS=\S+)?", body)
+ if not m:
+ raise ProtocolError("ORCONN event misformatted.")
+ target, status, age, read, wrote, reason, ncircs = m.groups()
+
+ #plog("DEBUG", "ORCONN: "+body)
+ if ncircs: ncircs = int(ncircs[8:])
+ else: ncircs = 0
+ if reason: reason = reason[8:]
+ if age: age = int(age[5:])
+ else: age = 0
+ if read: read = int(read[6:])
+ else: read = 0
+ if wrote: wrote = int(wrote[9:])
+ else: wrote = 0
+ event = ORConnEvent(evtype, status, target, age, read, wrote,
+ reason, ncircs)
+ elif evtype == "STREAM_BW":
+ m = re.match(r"(\d+)\s+(\d+)\s+(\d+)", body)
+ if not m:
+ raise ProtocolError("STREAM_BW event misformatted.")
+ event = StreamBwEvent(evtype, *m.groups())
+ elif evtype == "BW":
+ m = re.match(r"(\d+)\s+(\d+)", body)
+ if not m:
+ raise ProtocolError("BANDWIDTH event misformatted.")
+ read, written = map(long, m.groups())
+ event = BWEvent(evtype, read, written)
+ elif evtype in ("DEBUG", "INFO", "NOTICE", "WARN", "ERR"):
+ event = LogEvent(evtype, body)
+ elif evtype == "NEWDESC":
+ event = NewDescEvent(evtype, body.split(" "))
+ elif evtype == "ADDRMAP":
+ # TODO: Also parse errors and GMTExpiry
+ m = re.match(r'(\S+)\s+(\S+)\s+(\"[^"]+\"|\w+)', body)
+ if not m:
+ raise ProtocolError("ADDRMAP event misformatted.")
+ fromaddr, toaddr, when = m.groups()
+ if when.upper() == "NEVER":
+ when = None
+ else:
+ when = time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S")
+ event = AddrMapEvent(evtype, fromaddr, toaddr, when)
+ elif evtype == "NS":
+ event = NetworkStatusEvent(evtype, parse_ns_body(data))
+ else:
+ event = UnknownEvent(evtype, body)
+
+ return event
+
+ def heartbeat_event(self, event):
+ """Called before any event is recieved. Convenience function
+ for any cleanup/setup/reconfiguration you may need to do.
+ """
+ pass
+
+ def unknown_event(self, event):
+ """Called when we get an event type we don't recognize. This
+ is almost alwyas an error.
+ """
+ raise NotImplemented()
+
+ def circ_status_event(self, event):
+ """Called when a circuit status changes if listening to CIRCSTATUS
+ events."""
+ raise NotImplemented()
+
+ def stream_status_event(self, event):
+ """Called when a stream status changes if listening to STREAMSTATUS
+ events. """
+ raise NotImplemented()
+
+ def stream_bw_event(self, event):
+ raise NotImplemented()
+
+ def or_conn_status_event(self, event):
+ """Called when an OR connection's status changes if listening to
+ ORCONNSTATUS events."""
+ raise NotImplemented()
+
+ def bandwidth_event(self, event):
+ """Called once a second if listening to BANDWIDTH events.
+ """
+ raise NotImplemented()
+
+ def new_desc_event(self, event):
+ """Called when Tor learns a new server descriptor if listenting to
+ NEWDESC events.
+ """
+ raise NotImplemented()
+
+ def msg_event(self, event):
+ """Called when a log message of a given severity arrives if listening
+ to INFO_MSG, NOTICE_MSG, WARN_MSG, or ERR_MSG events."""
+ raise NotImplemented()
+
+ def ns_event(self, event):
+ raise NotImplemented()
+
+ def address_mapped_event(self, event):
+ """Called when Tor adds a mapping for an address if listening
+ to ADDRESSMAPPED events.
+ """
+ raise NotImplemented()
+
+
+class DebugEventHandler(EventHandler):
+ """Trivial debug event handler: reassembles all parsed events to stdout."""
+ def circ_status_event(self, circ_event): # CircuitEvent()
+ output = [circ_event.event_name, str(circ_event.circ_id),
+ circ_event.status]
+ if circ_event.path:
+ output.append(",".join(circ_event.path))
+ if circ_event.reason:
+ output.append("REASON=" + circ_event.reason)
+ if circ_event.remote_reason:
+ output.append("REMOTE_REASON=" + circ_event.remote_reason)
+ print " ".join(output)
+
+ def stream_status_event(self, strm_event):
+ output = [strm_event.event_name, str(strm_event.strm_id),
+ strm_event.status, str(strm_event.circ_id),
+ strm_event.target_host, str(strm_event.target_port)]
+ if strm_event.reason:
+ output.append("REASON=" + strm_event.reason)
+ if strm_event.remote_reason:
+ output.append("REMOTE_REASON=" + strm_event.remote_reason)
+ print " ".join(output)
+
+ def ns_event(self, ns_event):
+ for ns in ns_event.nslist:
+ print " ".join((ns_event.event_name, ns.nickname, ns.idhash,
+ ns.updated.isoformat(), ns.ip, str(ns.orport),
+ str(ns.dirport), " ".join(ns.flags)))
+
+ def new_desc_event(self, newdesc_event):
+ print " ".join((newdesc_event.event_name, " ".join(newdesc_event.idlist)))
+
+ def or_conn_status_event(self, orconn_event):
+ if orconn_event.age: age = "AGE="+str(orconn_event.age)
+ else: age = ""
+ if orconn_event.read_bytes: read = "READ="+str(orconn_event.read_bytes)
+ else: read = ""
+ if orconn_event.wrote_bytes: wrote = "WRITTEN="+str(orconn_event.wrote_bytes)
+ else: wrote = ""
+ if orconn_event.reason: reason = "REASON="+orconn_event.reason
+ else: reason = ""
+ if orconn_event.ncircs: ncircs = "NCIRCS="+str(orconn_event.ncircs)
+ else: ncircs = ""
+ print " ".join((orconn_event.event_name, orconn_event.endpoint,
+ orconn_event.status, age, read, wrote, reason, ncircs))
+
+ def msg_event(self, log_event):
+ print log_event.event_name+" "+log_event.msg
+
+ def bandwidth_event(self, bw_event):
+ print bw_event.event_name+" "+str(bw_event.read)+" "+str(bw_event.written)
+
+def parseHostAndPort(h):
+ """Given a string of the form 'address:port' or 'address' or
+ 'port' or '', return a two-tuple of (address, port)
+ """
+ host, port = "localhost", 9100
+ if ":" in h:
+ i = h.index(":")
+ host = h[:i]
try:
- print `c.extend_circuit(0,[""])`
- except ErrorReply:
- print "got error. good."
- #send_signal(s,1)
- #save_conf(s)
+ port = int(h[i+1:])
+ except ValueError:
+ print "Bad hostname %r"%h
+ sys.exit(1)
+ elif h:
+ try:
+ port = int(h)
+ except ValueError:
+ host = h
- #set_option(s,"1")
- #set_option(s,"bandwidthburstbytes 100000")
- #set_option(s,"runasdaemon 1")
- #set_events(s,[EVENT_TYPE.WARN])
- c.set_events([EVENT_TYPE.ORCONNSTATUS, EVENT_TYPE.STREAMSTATUS,
- EVENT_TYPE.CIRCSTATUS, EVENT_TYPE.INFO_MSG,
- EVENT_TYPE.BANDWIDTH])
+ return host, port
- th.join()
- return
+def run_example(host,port):
+ """ Example of basic TorCtl usage. See PathSupport for more advanced
+ usage.
+ """
+ print "host is %s:%d"%(host,port)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((host,port))
+ c = Connection(s)
+ c.set_event_handler(DebugEventHandler())
+ th = c.launch_thread()
+ c.authenticate()
+ print "nick",`c.get_option("nickname")`
+ print `c.get_info("version")`
+ #print `c.get_info("desc/name/moria1")`
+ print `c.get_info("network-status")`
+ print `c.get_info("addr-mappings/all")`
+ print `c.get_info("addr-mappings/config")`
+ print `c.get_info("addr-mappings/cache")`
+ print `c.get_info("addr-mappings/control")`
+ print `c.extend_circuit(0,["moria1"])`
+ try:
+ print `c.extend_circuit(0,[""])`
+ except ErrorReply: # wtf?
+ print "got error. good."
+ except:
+ print "Strange error", sys.exc_info()[0]
+
+ #send_signal(s,1)
+ #save_conf(s)
+
+ #set_option(s,"1")
+ #set_option(s,"bandwidthburstbytes 100000")
+ #set_option(s,"runasdaemon 1")
+ #set_events(s,[EVENT_TYPE.WARN])
+# c.set_events([EVENT_TYPE.ORCONN], True)
+ c.set_events([EVENT_TYPE.STREAM, EVENT_TYPE.CIRC,
+ EVENT_TYPE.NS, EVENT_TYPE.NEWDESC,
+ EVENT_TYPE.ORCONN, EVENT_TYPE.BW], True)
+
+ th.join()
+ return
+
if __name__ == '__main__':
- if len(sys.argv) > 2:
- print "Syntax: TorControl.py torhost:torport"
- sys.exit(0)
- else:
- sys.argv.append("localhost:9051")
- sh,sp = parseHostAndPort(sys.argv[1])
- run_example(sh,sp)
+ if len(sys.argv) > 2:
+ print "Syntax: TorControl.py torhost:torport"
+ sys.exit(0)
+ else:
+ sys.argv.append("localhost:9051")
+ sh,sp = parseHostAndPort(sys.argv[1])
+ run_example(sh,sp)
Deleted: blossom/trunk/TorCtl0.py
===================================================================
--- blossom/trunk/TorCtl0.py 2008-06-26 03:17:14 UTC (rev 15467)
+++ blossom/trunk/TorCtl0.py 2008-06-26 04:07:38 UTC (rev 15468)
@@ -1,446 +0,0 @@
-#!/usr/bin/python
-# TorCtl.py -- Python module to interface with Tor Control interface.
-# Copyright 2005 Nick Mathewson -- See LICENSE for licensing information.
-#$Id: TorCtl0.py,v 1.7 2005/11/19 19:42:31 nickm Exp $
-
-"""
-TorCtl0 -- Library to control Tor processes. See TorCtlDemo.py for example use.
-"""
-
-import binascii
-import os
-import sha
-import socket
-import struct
-import sys
-import TorCtl
-
-__all__ = [
- "MSG_TYPE", "EVENT_TYPE", "CIRC_STATUS", "STREAM_STATUS",
- "OR_CONN_STATUS", "SIGNAL", "ERR_CODES",
- "TorCtlError", "ProtocolError", "ErrorReply", "Connection", "EventHandler",
- ]
-
-class _Enum:
- # Helper: define an ordered dense name-to-number 1-1 mapping.
- def __init__(self, start, names):
- self.nameOf = {}
- idx = start
- for name in names:
- setattr(self,name,idx)
- self.nameOf[idx] = name
- idx += 1
-
-class _Enum2:
- # Helper: define an ordered sparse name-to-number 1-1 mapping.
- def __init__(self, **args):
- self.__dict__.update(args)
- self.nameOf = {}
- for k,v in args.items():
- self.nameOf[v] = k
-
-# Message types that client or server can send.
-MSG_TYPE = _Enum(0x0000,
- ["ERROR",
- "DONE",
- "SETCONF",
- "GETCONF",
- "CONFVALUE",
- "SETEVENTS",
- "EVENT",
- "AUTH",
- "SAVECONF",
- "SIGNAL",
- "MAPADDRESS",
- "GETINFO",
- "INFOVALUE",
- "EXTENDCIRCUIT",
- "ATTACHSTREAM",
- "POSTDESCRIPTOR",
- "FRAGMENTHEADER",
- "FRAGMENT",
- "REDIRECTSTREAM",
- "CLOSESTREAM",
- "CLOSECIRCUIT",
- ])
-
-# Make sure that the enumeration code is working.
-assert MSG_TYPE.SAVECONF == 0x0008
-assert MSG_TYPE.CLOSECIRCUIT == 0x0014
-
-# Types of "EVENT" message.
-EVENT_TYPE = _Enum(0x0001,
- ["CIRCSTATUS",
- "STREAMSTATUS",
- "ORCONNSTATUS",
- "BANDWIDTH",
- "OBSOLETE_LOG",
- "NEWDESC",
- "DEBUG_MSG",
- "INFO_MSG",
- "NOTICE_MSG",
- "WARN_MSG",
- "ERR_MSG",
- ])
-
-assert EVENT_TYPE.ERR_MSG == 0x000B
-assert EVENT_TYPE.OBSOLETE_LOG == 0x0005
-
-# Status codes for "CIRCSTATUS" events.
-CIRC_STATUS = _Enum(0x00,
- ["LAUNCHED",
- "BUILT",
- "EXTENDED",
- "FAILED",
- "CLOSED"])
-
-# Status codes for "STREAMSTATUS" events
-STREAM_STATUS = _Enum(0x00,
- ["SENT_CONNECT",
- "SENT_RESOLVE",
- "SUCCEEDED",
- "FAILED",
- "CLOSED",
- "NEW_CONNECT",
- "NEW_RESOLVE",
- "DETACHED"])
-
-# Status codes for "ORCONNSTATUS" events
-OR_CONN_STATUS = _Enum(0x00,
- ["LAUNCHED","CONNECTED","FAILED","CLOSED"])
-
-# Signal codes for "SIGNAL" events.
-SIGNAL = _Enum2(HUP=0x01,INT=0x02,USR1=0x0A,USR2=0x0C,TERM=0x0F)
-
-# Error codes for "ERROR" events.
-ERR_CODES = {
- 0x0000 : "Unspecified error",
- 0x0001 : "Internal error",
- 0x0002 : "Unrecognized message type",
- 0x0003 : "Syntax error",
- 0x0004 : "Unrecognized configuration key",
- 0x0005 : "Invalid configuration value",
- 0x0006 : "Unrecognized byte code",
- 0x0007 : "Unauthorized",
- 0x0008 : "Failed authentication attempt",
- 0x0009 : "Resource exhausted",
- 0x000A : "No such stream",
- 0x000B : "No such circuit",
- 0x000C : "No such OR"
-}
-
-def _unpack_singleton_msg(msg):
- """Helper: unpack a single packet. Return (None, minLength, body-so-far)
- on incomplete packet or (type,body,rest) on somplete packet
- """
- if len(msg) < 4:
- return None, 4, msg
- length,type = struct.unpack("!HH",msg)
- if len(msg) >= 4+length:
- return type,msg[4:4+length],msg[4+length:]
- else:
- return None,4+length,msg
-
-def _minLengthToPack(bytes):
- """Return the minimum number of bytes needed to pack the message 'smg'"""
- whole,left = divmod(bytes,65535)
- if left:
- return whole*(65535+4)+4+left
- else:
- return whole*(65535+4)
-
-def _unpack_msg(msg):
- "returns as for _unpack_singleton_msg"
- tp,body,rest = _unpack_singleton_msg(msg)
- if tp != MSG_TYPE.FRAGMENTHEADER:
- return tp, body, rest
-
- if len(body) < 6:
- raise ProtocolError("FRAGMENTHEADER message too short")
-
- realType,realLength = struct.unpack("!HL", body[:6])
-
- # Okay; could the message _possibly_ be here?
- minLength = _minLengthToPack(realLength+6)
- if len(msg) < minLength:
- return None, minLength, msg
-
- # Okay; optimistically try to build up the msg.
- soFar = [ body[6:] ]
- lenSoFarLen = len(body)-6
- while len(rest)>=4 and lenSoFar < realLength:
- ln, tp = struct.unpack("!HH", rest[:4])
- if tp != MSG_TYPE.FRAGMENT:
- raise ProtocolError("Missing FRAGMENT message")
- soFar.append(rest[4:4+ln])
- lenSoFar += ln
- if 4+ln > len(rest):
- rest = ""
- leftInPacket = 4+ln-len(rest)
- else:
- rest = rest[4+ln:]
- leftInPacket=0
-
- if lenSoFar == realLength:
- return realType, "".join(soFar), rest
- elif lenSoFar > realLength:
- raise ProtocolError("Bad fragmentation: message longer than declared")
- else:
- inOtherPackets = realLength-lenSoFar-leftInPacket
- minLength = _minLengthToPack(inOtherPackets)
- return None, len(msg)+leftInPacket+inOtherPackets, msg
-
-def _receive_singleton_msg(s):
- """Read a single packet from the socket s.
- """
- body = ""
- header = s.recv(4)
- if not header:
- raise TorCtl.TorCtlClosed()
- length,type = struct.unpack("!HH",header)
- if length:
- while length > len(body):
- more = s.recv(length-len(body))
- if not more:
- raise TorCtl.TorCtlClosed()
- body += more
- return length,type,body
-
-def _receive_message(s):
- """Read a single message (possibly multi-packet) from the socket s."""
- length, tp, body = _receive_singleton_msg(s)
- if tp != MSG_TYPE.FRAGMENTHEADER:
- return length, tp, body
- if length < 6:
- raise ProtocolError("FRAGMENTHEADER message too short")
- realType,realLength = struct.unpack("!HL", body[:6])
- data = [ body[6:] ]
- soFar = len(data[0])
- while 1:
- length, tp, body = _receive_singleton_msg(s)
- if tp != MSG_TYPE.FRAGMENT:
- raise ProtocolError("Missing FRAGMENT message")
- soFar += length
- data.append(body)
- if soFar == realLength:
- return realLength, realType, "".join(data)
- elif soFar > realLength:
- raise ProtocolError("FRAGMENT message too long!")
-
-def pack_message(type, body=""):
- """Given a message type and optional message body, generate a set of
- packets to send.
- """
- length = len(body)
- if length < 65536:
- reqheader = struct.pack("!HH", length, type)
- return "%s%s"%(reqheader,body)
-
- fragheader = struct.pack("!HHHL",
- 65535, MSG_TYPE.FRAGMENTHEADER, type, length)
- msgs = [ fragheader, body[:65535-6] ]
- body = body[65535-6:]
- while body:
- if len(body) > 65535:
- fl = 65535
- else:
- fl = len(body)
- fragheader = struct.pack("!HH", MSG_TYPE.FRAGMENT, fl)
- msgs.append(fragheader)
- msgs.append(body[:fl])
- body = body[fl:]
-
- return "".join(msgs)
-
-def _parseKV(body,sep=" ",term="\n"):
- """Helper: parse a key/value list of the form [key sep value term]* .
- Return a list of (k,v)."""
- res = []
- for line in body.split(term):
- if not line: continue
- k, v = line.split(sep,1)
- res.append((k,v))
- return res
-
-def _unterminate(s):
- """Strip trailing NUL characters from s."""
- if s[-1] == '\0':
- return s[:-1]
- else:
- return s
-
-class Connection(TorCtl._ConnectionBase):
- """A Connection represents a connection to the Tor process."""
- def __init__(self, sock):
- """Create a Connection to communicate with the Tor process over the
- socket 'sock'.
- """
- TorCtl._ConnectionBase.__init__(self)
- self._s = sock
-
- def set_event_handler(self, handler):
- """Cause future events from the Tor process to be sent to 'handler'.
- """
- self._handler = handler
- self._handleFn = handler.handle0
-
- def _doSend(self, (type, body)):
- """Helper: Deliver a command of type 'type' and body 'body' to Tor.
- """
- self._s.sendall(pack_message(type, body))
-
- def _read_reply(self):
- length, tp, body = _receive_message(self._s)
- return (tp == MSG_TYPE.EVENT), (tp, body)
-
- def _sendAndRecv(self, tp, msg="", expectedTypes=(MSG_TYPE.DONE,)):
- """Helper: Send a command of type 'tp' and body 'msg' to Tor,
- and wait for a command in response. If the response type is
- in expectedTypes, return a (tp,body) tuple. If it is an error,
- raise ErrorReply. Otherwise, raise ProtocolError.
- """
-
- tp, msg = self.sendImpl(self._doSend, (tp, msg))
- if tp in expectedTypes:
- return tp, msg
- elif tp == MSG_TYPE.ERROR:
- if len(msg)<2:
- raise ProtocolError("(Truncated error message)")
- errCode, = struct.unpack("!H", msg[:2])
- raise ErrorReply((errCode,
- ERR_CODES.get(errCode,"[unrecognized]"),
- msg[2:]))
- else:
- raise ProtocolError("Unexpectd message type 0x%04x"%tp)
-
- def authenticate(self, secret=""):
- """Send an authenticating secret to Tor. You'll need to call
- this method before other commands. You need to use a
- password if Tor expects one.
- """
- self._sendAndRecv(MSG_TYPE.AUTH,secret)
-
- def get_option(self,name):
- """Get the value of the configuration option named 'name'. To
- retrieve multiple values, pass a list for 'name' instead of
- a string. Returns a list of (key,value) pairs.
- """
- if not isinstance(name, str):
- name = "".join(["%s\n"%s for s in name])
- tp,body = self._sendAndRecv(MSG_TYPE.GETCONF,name,[MSG_TYPE.CONFVALUE])
- return _parseKV(body)
-
- def set_option(self,key,value):
- """Set the value of the configuration option 'key' to the
- value 'value'.
- """
- self.set_options([key, value])
-
- def set_options(self,kvlist):
- """Given a list of (key,value) pairs, set them as configuration
- options.
- """
- msg = "".join(["%s %s\n" for k,v in kvlist])
- self._sendAndRecv(MSG_TYPE.SETCONF,msg)
-
- def reset_options(self, keylist):
- msg = "".join(["%s\n" for k in keylist])
- self._sendAndRecv(MSG_TYPE.SETCONF,msg)
-
- def get_info(self,name):
- """Return the value of the internal information field named
- 'name'. To retrieve multiple values, pass a list for
- 'name' instead of a string. Returns a dictionary of
- key->value mappings.
- """
- if not isinstance(name, str):
- name = "".join(["%s\n"%s for s in name])
- tp, body = self._sendAndRecv(MSG_TYPE.GETINFO,name,[MSG_TYPE.INFOVALUE])
- kvs = body.split("\0")
- d = {}
- for i in xrange(0,len(kvs)-1,2):
- d[kvs[i]] = kvs[i+1]
- return d
-
- def set_events(self,events):
- """Change the list of events that the event handler is interested
- in to those in 'events', which is a list of EVENT_TYPE members
- or their corresponding strings.
- """
- evs = []
- for ev in events:
- if isinstance(ev, types.StringType):
- evs.append(getattr(EVENT_TYPE, ev.upper()))
- else:
- evs.append(ev)
- self._sendAndRecv(MSG_TYPE.SETEVENTS,
- "".join([struct.pack("!H", event) for event in events]))
-
- def save_conf(self):
- """Flush all configuration changes to disk.
- """
- self._sendAndRecv(MSG_TYPE.SAVECONF)
-
- def send_signal(self, sig):
- """Send the signal 'sig' to the Tor process; 'sig' must be a member of
- SIGNAL or a corresponding string.
- """
- try:
- sig = sig.upper()
- except AttributeError:
- pass
- sig = { "HUP" : 0x01, "RELOAD" : 0x01,
- "INT" : 0x02, "SHUTDOWN" : 0x02,
- "DUMP" : 0x0A, "USR1" : 0x0A,
- "USR2" : 0x0C, "DEBUG" : 0x0C,
- "TERM" : 0x0F, "HALT" : 0x0F
- }.get(sig,sig)
- self._sendAndRecv(MSG_TYPE.SIGNAL,struct.pack("B",sig))
-
- def map_address(self, kvList):
- """Given a list of (old-address,new-address), have Tor redirect
- streams from old-address to new-address. Old-address can be in a
- special "dont-care" form of "0.0.0.0" or ".".
- """
- msg = [ "%s %s\n"%(k,v) for k,v in kvList ]
- tp, body = self._sendAndRecv(MSG_TYPE.MAPADDRESS,"".join(msg))
- return _parseKV(body)
-
- def extend_circuit(self, circid, hops):
- """Tell Tor to extend the circuit identified by 'circid' through the
- servers named in the list "hops".
- """
- msg = struct.pack("!L",long(circid)) + ",".join(hops) + "\0"
- tp, body = self._sendAndRecv(MSG_TYPE.EXTENDCIRCUIT,msg)
- if len(body) != 4:
- raise ProtocolError("Extendcircuit reply too short or long")
- return struct.unpack("!L",body)[0]
-
- def redirect_stream(self, streamid, newtarget):
- """Tell Tor to change the target address of the stream identified by
- 'streamid' from its old value to 'newtarget'."""
- msg = struct.pack("!L",long(streamid)) + newtarget + "\0"
- self._sendAndRecv(MSG_TYPE.REDIRECTSTREAM,msg)
-
- def attach_stream(self, streamid, circid):
- """Tell Tor To attach stream 'streamid' to circuit 'circid'."""
- msg = struct.pack("!LL",long(streamid), long(circid))
- self._sendAndRecv(MSG_TYPE.ATTACHSTREAM,msg)
-
- def close_stream(self, streamid, reason=0, flags=()):
- """Close the stream 'streamid'. """
- msg = struct.pack("!LBB",long(streamid),reason,flags)
- self._sendAndRecv(MSG_TYPE.CLOSESTREAM,msg)
-
- def close_circuit(self, circid, flags=()):
- """Close the circuit 'circid'."""
- if "IFUNUSED" in flags:
- flags=1
- else:
- flags=0
- msg = struct.pack("!LB",long(circid),flags)
- self._sendAndRecv(MSG_TYPE.CLOSECIRCUIT,msg)
-
- def post_descriptor(self, descriptor):
- """Tell Tor about a new descriptor in 'descriptor'."""
- self._sendAndRecv(MSG_TYPE.POSTDESCRIPTOR,descriptor)
Deleted: blossom/trunk/TorCtl1.py
===================================================================
--- blossom/trunk/TorCtl1.py 2008-06-26 03:17:14 UTC (rev 15467)
+++ blossom/trunk/TorCtl1.py 2008-06-26 04:07:38 UTC (rev 15468)
@@ -1,338 +0,0 @@
-#!/usr/bin/python
-# TorCtl.py -- Python module to interface with Tor Control interface.
-# Copyright 2005 Nick Mathewson -- See LICENSE for licensing information.
-#$Id: TorCtl1.py,v 1.26 2006/01/10 04:05:31 goodell Exp $
-
-import binascii
-import os
-import re
-import socket
-import sys
-import types
-import TorCtl
-
-def _quote(s):
- return re.sub(r'([\r\n\\\"])', r'\\\1', s)
-
-def _escape_dots(s, translate_nl=1):
- if translate_nl:
- lines = re.split(r"\r?\n", s)
- else:
- lines = s.split("\r\n")
- if lines and not lines[-1]:
- del lines[-1]
- for i in xrange(len(lines)):
- if lines[i].startswith("."):
- lines[i] = "."+lines[i]
- lines.append(".\r\n")
- return "\r\n".join(lines)
-
-def _unescape_dots(s, translate_nl=1):
- lines = s.split("\r\n")
-
- for i in xrange(len(lines)):
- if lines[i].startswith("."):
- lines[i] = lines[i][1:]
-
- if lines and lines[-1]:
- lines.append("")
-
- if translate_nl:
- return "\n".join(lines)
- else:
- return "\r\n".join(lines)
-
-class _BufSock:
- def __init__(self, s):
- self._s = s
- self._buf = []
-
- def readline(self):
- if self._buf:
- idx = self._buf[0].find('\n')
- if idx >= 0:
- result = self._buf[0][:idx+1]
- self._buf[0] = self._buf[0][idx+1:]
- return result
-
- while 1:
- s = self._s.recv(128)
- if not s:
- raise TorCtl.TorCtlClosed()
- idx = s.find('\n')
- if idx >= 0:
- self._buf.append(s[:idx+1])
- result = "".join(self._buf)
- rest = s[idx+1:]
- if rest:
- self._buf = [ rest ]
- else:
- del self._buf[:]
- return result
- else:
- self._buf.append(s)
-
- def write(self, s):
- self._s.send(s)
-
- def close(self):
- self._s.close()
-
-def _read_reply(f,debugFile=None):
- lines = []
- while 1:
- line = f.readline().strip()
- if debugFile:
- debugFile.write(" %s\n" % line)
- if len(line)<4:
- raise TorCtl.ProtocolError("Badly formatted reply line: Too short")
- code = line[:3]
- tp = line[3]
- s = line[4:]
- if tp == "-":
- lines.append((code, s, None))
- elif tp == " ":
- lines.append((code, s, None))
- return lines
- elif tp != "+":
- raise TorCtl.ProtocolError("Badly formatted reply line: unknown type %r"%tp)
- else:
- more = []
- while 1:
- line = f.readline()
- if debugFile and tp != "+":
- debugFile.write(" %s" % line)
- if line in (".\r\n", ".\n"):
- break
- more.append(line)
- lines.append((code, s, _unescape_dots("".join(more))))
-
-class Connection(TorCtl._ConnectionBase):
- """A Connection represents a connection to the Tor process."""
- def __init__(self, sock):
- """Create a Connection to communicate with the Tor process over the
- socket 'sock'.
- """
- TorCtl._ConnectionBase.__init__(self)
- self._s = _BufSock(sock)
- self._debugFile = None
-
- def debug(self, f):
- """DOCDOC"""
- self._debugFile = f
-
- def set_event_handler(self, handler):
- """Cause future events from the Tor process to be sent to 'handler'.
- """
- self._handler = handler
- self._handleFn = handler.handle1
-
- def _read_reply(self):
- lines = _read_reply(self._s, self._debugFile)
- isEvent = (lines and lines[0][0][0] == '6')
- return isEvent, lines
-
- def _doSend(self, msg):
- if self._debugFile:
- amsg = msg
- lines = amsg.split("\n")
- if len(lines) > 2:
- amsg = "\n".join(lines[:2]) + "\n"
- self._debugFile.write(">>> %s" % amsg)
- self._s.write(msg)
-
- def _sendAndRecv(self, msg="", expectedTypes=("250", "251")):
- """Helper: Send a command 'msg' to Tor, and wait for a command
- in response. If the response type is in expectedTypes,
- return a list of (tp,body,extra) tuples. If it is an
- error, raise ErrorReply. Otherwise, raise TorCtl.ProtocolError.
- """
- if type(msg) == types.ListType:
- msg = "".join(msg)
- assert msg.endswith("\r\n")
-
- lines = self._sendImpl(self._doSend, msg)
- # print lines
- for tp, msg, _ in lines:
- if tp[0] in '45':
- raise TorCtl.ErrorReply("%s %s"%(tp, msg))
- if tp not in expectedTypes:
- raise TorCtl.ProtocolError("Unexpectd message type %r"%tp)
-
- return lines
-
- def authenticate(self, secret=""):
- """Send an authenticating secret to Tor. You'll need to call this
- method before Tor can start.
- """
- hexstr = binascii.b2a_hex(secret)
- self._sendAndRecv("AUTHENTICATE %s\r\n"%hexstr)
-
- def get_option(self, name):
- """Get the value of the configuration option named 'name'. To
- retrieve multiple values, pass a list for 'name' instead of
- a string. Returns a list of (key,value) pairs.
- Refer to section 3.3 of control-spec.txt for a list of valid names.
- """
- if not isinstance(name, str):
- name = " ".join(name)
- lines = self._sendAndRecv("GETCONF %s\r\n" % name)
-
- r = []
- for _,line,_ in lines:
- try:
- key, val = line.split("=", 1)
- r.append((key,val))
- except ValueError:
- r.append((line, None))
-
- return r
-
- def set_option(self, key, value):
- """Set the value of the configuration option 'key' to the value 'value'.
- """
- self.set_options([(key, value)])
-
- def set_options(self, kvlist):
- """Given a list of (key,value) pairs, set them as configuration
- options.
- """
- if not kvlist:
- return
- msg = " ".join(["%s=%s"%(k,_quote(v)) for k,v in kvlist])
- self._sendAndRecv("SETCONF %s\r\n"%msg)
-
- def reset_options(self, keylist):
- """Reset the options listed in 'keylist' to their default values.
-
- Tor started implementing this command in version 0.1.1.7-alpha;
- previous versions wanted you to set configuration keys to "".
- That no longer works.
- """
- self._sendAndRecv("RESETCONF %s\r\n"%(" ".join(keylist)))
-
- def get_info(self, name):
- """Return the value of the internal information field named 'name'.
- Refer to section 3.9 of control-spec.txt for a list of valid names.
- DOCDOC
- """
- if not isinstance(name, str):
- name = " ".join(name)
- lines = self._sendAndRecv("GETINFO %s\r\n"%name)
- d = {}
- for _,msg,more in lines:
- if msg == "OK":
- break
- try:
- k,rest = msg.split("=",1)
- except ValueError:
- raise TorCtl.ProtocolError("Bad info line %r",msg)
- if more:
- d[k] = more
- else:
- d[k] = rest
- return d
-
- def set_events(self, events):
- """Change the list of events that the event handler is interested
- in to those in 'events', which is a list of event names.
- Recognized event names are listed in section 3.3 of the control-spec
- """
- evs = []
-
- # Translate options supported by old interface.
- for e in events:
- if e == 0x0001 or e == "CIRCSTATUS":
- e = "CIRC"
- elif e == 0x0002 or e == "STREAMSTATUS":
- e = "STREAM"
- elif e == 0x0003 or e == "ORCONNSTATUS":
- e = "ORCONN"
- elif e == 0x0004 or e == "BANDWIDTH":
- e = "BW"
- elif e == 0x0005 or e == "OBSOLETE_LOG":
- coneinue
- elif e == 0x0006 or e == "NEWDESC":
- e = "NEWDESC"
- elif e == 0x0007 or e == "DEBUG_MSG":
- continue
- elif e == 0x0008 or e == "INFO_MSG":
- e = "INFO"
- elif e == 0x0008 or e == "NOTICE_MSG":
- e = "NOTICE"
- elif e == 0x0008 or e == "WARN_MSG":
- e = "WARN"
- elif e == 0x0008 or e == "ERR_MSG":
- e = "ERR"
- evs.append(e)
-
- self._sendAndRecv("SETEVENTS %s\r\n" % " ".join(evs))
-
- def save_conf(self):
- """Flush all configuration changes to disk.
- """
- self._sendAndRecv("SAVECONF\r\n")
-
- def send_signal(self, sig):
- """Send the signal 'sig' to the Tor process; The allowed values for
- 'sig' are listed in section 3.6 of control-spec.
- """
- sig = { 0x01 : "HUP",
- 0x02 : "INT",
- 0x0A : "USR1",
- 0x0C : "USR2",
- 0x0F : "TERM" }.get(sig,sig)
- self._sendAndRecv("SIGNAL %s\r\n"%sig)
-
- def map_address(self, kvList):
- if not kvList:
- return
- m = " ".join([ "%s=%s" for k,v in kvList])
- lines = self._sendAndRecv("MAPADDRESS %s\r\n"%m)
- r = []
- for _,line,_ in lines:
- try:
- key, val = line.split("=", 1)
- except ValueError:
- raise TorCtl.ProtocolError("Bad address line %r",v)
- r.append((key,val))
- return r
-
- def extend_circuit(self, circid, hops):
- """Tell Tor to extend the circuit identified by 'circid' through the
- servers named in the list 'hops'.
- """
- if circid is None:
- circid = "0"
- lines = self._sendAndRecv("EXTENDCIRCUIT %s %s\r\n"
- %(circid, ",".join(hops)))
- tp,msg,_ = lines[0]
- m = re.match(r'EXTENDED (\S*)', msg)
- if not m:
- raise TorCtl.ProtocolError("Bad extended line %r",msg)
- return m.group(1)
-
- def redirect_stream(self, streamid, newaddr, newport=""):
- """DOCDOC"""
- if newport:
- self._sendAndRecv("REDIRECTSTREAM %s %s %s\r\n"%(streamid, newaddr, newport))
- else:
- self._sendAndRecv("REDIRECTSTREAM %s %s\r\n"%(streamid, newaddr))
-
- def attach_stream(self, streamid, circid):
- """DOCDOC"""
- self._sendAndRecv("ATTACHSTREAM %s %s\r\n"%(streamid, circid))
-
- def close_stream(self, streamid, reason=0, flags=()):
- """DOCDOC"""
- self._sendAndRecv("CLOSESTREAM %s %s %s\r\n"
- %(streamid, reason, "".join(flags)))
-
- def close_circuit(self, circid, reason=0, flags=()):
- """DOCDOC"""
- self._sendAndRecv("CLOSECIRCUIT %s %s %s\r\n"
- %(circid, reason, "".join(flags)))
-
- def post_descriptor(self, desc):
- self._sendAndRecv("+POSTDESCRIPTOR\r\n%s"%_escape_dots(desc))
-
Modified: blossom/trunk/blossom.py
===================================================================
--- blossom/trunk/blossom.py 2008-06-26 03:17:14 UTC (rev 15467)
+++ blossom/trunk/blossom.py 2008-06-26 04:07:38 UTC (rev 15468)
@@ -46,6 +46,7 @@
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SocketServer import ThreadingMixIn
from TorCtl import *
+from TorCtl import parseHostAndPort
# constants
@@ -105,16 +106,18 @@
BLOSSOM = []
NICK = ""
-WEB_STATUS = "serifos.exit"
+WEB_STATUS = "cassandra.eecs.harvard.edu"
BLOSSOM_ARGS = "blossom=lefkada&"
-POLICY_ARGS = "addr=1&textonly=1&ports"
-STATUS_ARGS = "addr=1&textonly=fingerprint"
+POLICY_ARGS = "t=1"
+STATUS_ARGS = "t=1"
-HTTP_PROXY = "localhost:8118"
+HTTP_PROXY = "localhost:3128"
DIR_SERVER = "0.0.0.0:9030"
TORCONTROL = "localhost:9051"
SERVER = "localhost:9052"
+AUTH_STRING = "kJIS7CqFHFCrMpzy"
+
IMG_SIZE = "width=18 height=12"
URL_FLAGS = "/flags"
URL_ICONS = "/icons"
@@ -587,7 +590,8 @@
threading.Thread.join(self, timeout)
class TorEventHandler(EventHandler):
- def circ_status(self, eventtype, circID, status, path):
+# def circ_status(self, eventtype, circID, status, path):
+ def circ_status_event(self, c):
"""Called when a circuit status changes if listening to CIRCSTATUS
events. 'status' is a member of CIRC_STATUS; circID is a numeric
circuit ID, and 'path' is the circuit's path so far as a list of
@@ -595,8 +599,13 @@
"""
global semaphore
- curr_time = int(time.time())
- text = "CIRC %s %s %s %s\n" % (curr_time, circID, status, ",".join(path))
+ eventtype = c.event_name
+ circID = c.circ_id
+ status = c.status
+ path = c.path
+ curr_time = int(time.time())
+ text = "CIRC %s %s %s %s\n" % (curr_time, circID, status, ",".join(path))
+
log_msg(2, "CIRC %s %s %s" % (circID, status, ",".join(path)))
process_line(text)
@@ -621,17 +630,24 @@
if pending_streams.has_key(circID):
for streamID in pending_streams[circID]:
if query_streams.has_key(streamID):
- self.stream_status("STREAM", "DETACHED", streamID, query_streams[streamID])
+ self.stream_status_event("STREAM", "DETACHED", streamID, query_streams[streamID])
- def stream_status(self, eventtype, status, streamID, target, circID="0"):
+# def stream_status(self, eventtype, status, streamID, target, circID="0"):
+ def stream_status_event(self, s):
"""Called when a stream status changes if listening to STREAMSTATUS
events. 'status' is a member of STREAM_STATUS; streamID is a
numeric stream ID, and 'target' is the destination of the stream.
"""
global conn
- curr_time = int(time.time())
- text = "STREAM %s %s %s %s %s\n" % (curr_time, status, streamID, target, circID)
+ eventtype = s.event_name
+ status = s.status
+ streamID = s.strm_id
+ target = "%s:%s" % (s.target_host, s.target_port)
+ circID = s.circ_id
+ curr_time = int(time.time())
+ text = "STREAM %s %s %s %s %s\n" % (curr_time, status, streamID, target, circID)
+
log_msg(2, "STREAM %s %s %s %s" % (status, streamID, target, circID))
process_line(text)
@@ -1063,7 +1079,7 @@
if re.search(r'^552 ', e):
pass
elif re.search(r'^555 ', e) and BLOSSOM:
- conn.authenticate("")
+ conn.authenticate(AUTH_STRING)
conn.set_option("__leavestreamsunattached", "1")
else:
log_msg(1, "unknown error: ignoring")
@@ -1170,8 +1186,8 @@
a = ""
min = 1<<16
for rtr in path:
- if bw.has_key(rtr) and bw[rtr.lower()] < min:
- min = bw[rtr.lower()]
+ if bw.has_key(rtr) and bw[rtr] < min:
+ min = bw[rtr]
if min >= 400:
r = "<img %s src=\"%s\">" % (IMG_SIZE, ICON_V3)
elif min >= 60:
@@ -1578,33 +1594,6 @@
log_msg(2, "*** search to %s" % dest)
- ## -- THESIS EXPERIMENT SECTION --
-
- explicit = ""
- m = re.match(r'([0-9a-zA-Z]+)-e', dest)
- if m:
- log_msg(1, "EXPLICIT")
- explicit = m.group(1)
-
- num_hops = 0
- query_server = 0
- m = re.match(r'([0-9]+)-([qs])', dest)
- if m:
- if m.group(2) == 'q':
- query_server = 1
- num_hops = int(m.group(1))
-
- # random nodeset 0
-
- # target_array = ['80708b47', '80708b6a', '815d4438', '81aad6c0', '825c46fb', '82c240a3', '82cb7f28', '84e34a28', '84fc98c2']
-
- # random nodeset 1
-
- # target_array = ['8004240b', '8006c09e', '80708b6a', '80723f0e', '80df0671', '824b5753', '8441f067', '8441f068', '8a17cce8', '8ff88baa']
- target_array = ['ithaca', 'ithaca', '80708b6a', '80723f0e', '80df0671', '824b5753', '8441f067', '8441f068', '8a17cce8', '8ff88baa']
-
- ## -- END THESIS EXPERIMENT SECTION --
-
while not num_hops and not explicit and not desc.has_key(dest) and not fail:
chosen_dir = ""
min = MAXINT
@@ -1789,57 +1778,73 @@
# standard line format
line = re.sub(r" +", " ", line)
- line = re.sub(r"\n", "", line)
item = line.split(" ")
code = item[0]
args = item[1:]
if code == "CIRC":
- if len(args) == 3:
- time, circID, status = args
- elif len(args) == 4:
- time, circID, status, c_path = args
+ lines = line.split("\n")
+ for line in lines:
+ args = line.split(" ")
+ if len(args) > 3:
+ time = args[1]
+ args = args[2:]
+ if len(args) == 2:
+ circID, status = args
+ elif len(args) == 3:
+ circID, status, c_path = args
- if status == "LAUNCHED":
- circuits[circID] = [[], 0, {}]
+ if status == "LAUNCHED":
+ circuits[circID] = [[], 0, {}]
- elif status == "EXTENDED":
- circuits[circID] = [c_path.split(","), 0, {}]
+ elif status == "EXTENDED":
+ circuits[circID] = [c_path.split(","), 0, {}]
- elif status == "BUILT":
- circuits[circID] = [c_path.split(","), 1, {}]
+ elif status == "BUILT":
+ circuits[circID] = [c_path.split(","), 1, {}]
- elif status == "FAILED" or status == "CLOSED":
- if circuits.has_key(circID):
- del circuits[circID]
+ elif status == "FAILED" or status == "CLOSED":
+ if circuits.has_key(circID):
+ del circuits[circID]
elif code == "STREAM":
- time, status, streamID, target, circID = args
+ lines = line.split("\n")
+ for line in lines:
+ args = line.split(" ")
+ if len(args) < 4:
+ break
+ if len(args) > 5:
+ args = args[1:]
+ if len(args) > 4:
+ time, status, streamID, target, circID = args
+ else:
+ status, streamID, target, circID = args
- if status == "NEW":
- streams[streamID] = target
+ if status == "NEW":
+ streams[streamID] = target
- if status == "SENTCONNECT" or status == "SUCCEEDED":
- if circuits.has_key(circID):
- if not circuits[circID][STREAMS].has_key(streamID):
- for circ in circuits:
- if circuits[circ][STREAMS].has_key(streamID):
- del circuits[circ][STREAMS][streamID]
- circuits[circID][STREAMS][streamID] = status
+ if status in ("SENTCONNECT", "REMAP", "SUCCEEDED"):
+ if circuits.has_key(circID):
+ log_msg(2, "circID %s streamID %s" % (circID, streamID))
+ if not circuits[circID][STREAMS].has_key(streamID):
+ for circ in circuits:
+ if circuits[circ][STREAMS].has_key(streamID):
+ del circuits[circ][STREAMS][streamID]
+ circuits[circID][STREAMS][streamID] = status
- if status == "DETACHED":
- if circuits.has_key(circID):
- if circuits[circID][STREAMS].has_key(streamID):
- del circuits[circID][STREAMS][streamID]
+ if status == "DETACHED":
+ if circuits.has_key(circID):
+ if circuits[circID][STREAMS].has_key(streamID):
+ del circuits[circID][STREAMS][streamID]
- if status == "CLOSED":
- closed_streams[streamID] = int(time)
+ if status == "CLOSED":
+ closed_streams[streamID] = int(time)
- if status == "FAILED":
- failed_streams[streamID] = int(time)
+ if status == "FAILED":
+ failed_streams[streamID] = int(time)
- elif code in ["DATA", "DATA+"]:
+ elif code in ["DATA", "DATA+"]:
if len(args) > 3:
name = re.sub(r"^\*", "", args[1])
cc[name] = args[0].lower()
@@ -1926,12 +1931,14 @@
log_msg(1, "CONNECTION FAILED: %s. Is Tor running? Is the ControlPort enabled?\n" % e)
try:
- conn = get_connection(s)
+ conn = Connection(s)
except:
return
th = conn.launch_thread(daemon)
- conn.authenticate("")
+ filename = "debuglog"
+
+ conn.authenticate(AUTH_STRING)
conn.set_event_handler(TorEventHandler())
conn.set_close_handler(TorCloseHandler)
@@ -1949,7 +1956,7 @@
else:
conn.set_option("__leavestreamsunattached", "0")
- conn.set_events(["CIRCSTATUS", "STREAMSTATUS"])
+ conn.set_events(["CIRC", "STREAM"])
return conn
@@ -2906,6 +2913,7 @@
unestablished[elt] = 1
try:
+ log_msg(3, "closing controller connection")
conn.close()
except:
pass
@@ -2913,7 +2921,7 @@
while 1:
try:
conn = getConnection()
- log_msg(2, "*** OPENING CONTROLLER CONNECTION: initialize")
+ log_msg(2, "*** OPENING CONTROLLER CONNECTION: initialize %s" % conn)
break
except socket.error, e:
err_code, err_msg = e
@@ -3112,7 +3120,7 @@
sh, sp = parseHostAndPort(TORCONTROL)
- BASE_URL = "http://%s/cgi-bin/exit.pl?" % WEB_STATUS
+ BASE_URL = "http://%s/cgi-bin/exit.py?" % WEB_STATUS
POLICY_URL = BASE_URL + POLICY_ARGS
STATUS_URL = BASE_URL + STATUS_ARGS
POLICY_URL_BLOSSOM = BASE_URL + BLOSSOM_ARGS + POLICY_ARGS
Modified: blossom/trunk/exit.py
===================================================================
--- blossom/trunk/exit.py 2008-06-26 03:17:14 UTC (rev 15467)
+++ blossom/trunk/exit.py 2008-06-26 04:07:38 UTC (rev 15468)
@@ -329,7 +329,6 @@
else:
extended_content += "<td><tt></tt></td>\n"
else:
- # b = int(rtr[r]["bytes"]/86400)
b = rtr[r]["bandwidth"]
bytes = b
suffix = " "
@@ -406,6 +405,14 @@
s_platform,
extended_content
)
+ rtr[r]["textentry"] = "%s %-20s %10d - %-15s %-60s %s\n" % (
+ addr[a]["cc"],
+ rtr[r]["nickname"],
+ rtr[r]["bandwidth"],
+ rtr[r]["address"],
+ addr[a]["netname"],
+ rtr[r]["fingerprint"]
+ )
# apply restrictive filter
@@ -484,9 +491,14 @@
count = 0
for r in router_set[c]:
if rtr.has_key(r):
- rows += rtr[r]["entry"]
+ if prompts.has_key("t"):
+ rows += rtr[r]["textentry"]
+ else:
+ rows += rtr[r]["entry"]
count += 1
- if count:
+ if prompts.has_key("t"):
+ output += rows
+ elif count:
if f:
f = 0
else:
@@ -540,10 +552,18 @@
""" % rows
total += count
-# output as HTML
+if prompts.has_key("t"):
+ # output as text
-print """Content-type: text/html
+ print """Content-type: text/plain
+%s
+""" % (output)
+else:
+ # output as HTML
+
+ print """Content-type: text/html
+
<!doctype html public "-//W3C//DTD HTML 4.01//EN"
"http://www.w3.org/TR/html4/strict.dtd">
<html>
More information about the tor-commits
mailing list