[or-cvs] [torflow/master 20/92] Added server-side test utility
mikeperry at torproject.org
mikeperry at torproject.org
Sat Aug 21 05:13:58 UTC 2010
Author: John M. Schanck <john at anomos.info>
Date: Sat, 10 Jul 2010 19:53:18 -0400
Subject: Added server-side test utility
Commit: 6863a34fdc25d4f8819981c912a21be1401ab70b
---
NetworkScanners/ExitAuthority/test/soat_tester.py | 118 +++++++++++++++++++++
1 files changed, 118 insertions(+), 0 deletions(-)
create mode 100644 NetworkScanners/ExitAuthority/test/soat_tester.py
diff --git a/NetworkScanners/ExitAuthority/test/soat_tester.py b/NetworkScanners/ExitAuthority/test/soat_tester.py
new file mode 100644
index 0000000..a827b32
--- /dev/null
+++ b/NetworkScanners/ExitAuthority/test/soat_tester.py
@@ -0,0 +1,118 @@
+import socket
+import SocketServer
+import time
+from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+from OpenSSL import SSL, crypto
+
+
+DIRECT_RESP = """\
+<html>
+<head> <title>Direct Response</title> </head>
+<body>
+<center> It works! </center>
+</body>
+</html>
+"""
+
+TOR_RESP= """\
+<html>
+<head> <title>Tor Response</title> </head>
+<body>
+<center> Tamper tamper tamper</center>
+</body>
+</html>
+"""
+
+class SSLServer(HTTPServer):
+ def server_bind(self):
+ HTTPServer.server_bind(self)
+ pkey = crypto.PKey()
+ pkey.generate_key(crypto.TYPE_RSA, 1024)
+ x509 = crypto.X509()
+ x509.set_pubkey(pkey)
+ x509.get_subject().commonName = self.server_name
+ now = time.strftime("%Y%m%d%H%M%S")
+ x509.set_notBefore(now + "-1200")
+ x509.set_notAfter(now + "+1200")
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+ ctx.use_privatekey(pkey)
+ ctx.use_certificate(x509)
+ self.socket = SSL.Connection(ctx, self.socket)
+
+class Tester:
+ direct_ip = "127.0.0.1"
+ exit_ip = None # By default
+
+class HTTPTester(BaseHTTPRequestHandler, Tester):
+ def do_GET(self):
+ if self.client_address[0] == self.direct_ip:
+ self.direct_GET()
+ elif self.exit_ip is None:
+ # If no exit_ip is specified, then assume everyone except direct_ip is a Tor Exit (what a world that'd be!)
+ self.tor_GET()
+ else: # Only serve the Tor result to exit_ip
+ if self.client_address[0] == self.exit_ip:
+ self.tor_GET()
+ else: # Everyone else gets 404'd
+ self.send_error(401)
+
+ def direct_GET(self):
+ self.send_response(200)
+ self.send_header("Content-type", "text/html")
+ self.send_header("Content-Length", str(len(DIRECT_RESP)))
+ self.end_headers()
+ self.wfile.write(DIRECT_RESP)
+
+ def tor_GET(self):
+ self.send_response(200)
+ self.send_header("Content-type", "text/html")
+ self.send_header("Content-Length", str(len(TOR_RESP)))
+ self.end_headers()
+ self.wfile.write(TOR_RESP)
+
+
+class HTTPSTester(HTTPTester):
+
+ def setup(self):
+ self.connection = self.request
+ self.rfile = socket._fileobject(self.connection, "rb", self.rbufsize)
+ self.wfile = socket._fileobject(self.connection, "wb", self.wbufsize)
+
+def run_HTTPTester():
+ serv = HTTPServer(('', 80), HTTPTester)
+ print "Serving HTTP on port 80"
+ serv.serve_forever()
+
+def run_HTTPSTester():
+ serv = SSLServer(('', 443), HTTPSTester)
+ print "Serving HTTP on port 443"
+ serv.serve_forever()
+
+def usage(argv):
+ print "Usage: %s --exit=<exit ip> [options]" % argv[0]
+
+if __name__ == '__main__':
+ import sys
+ import getopt
+ try:
+ flags,rest = getopt.getopt(sys.argv[1:], "", ["exit=", "direct=", "test="])
+ except getopt.GetoptError,err:
+ print err
+ usage(sys.argv)
+
+ run = run_HTTPTester
+ for flag, val in flags:
+ if flag == "--exit":
+ Tester.exit_ip = val
+ elif flag == "--direct":
+ Tester.direct_ip = val
+ elif flag == "--test":
+ if val.lower() == "http":
+ run = run_HTTPTester
+ elif val.lower() == "https":
+ run = run_HTTPSTester
+
+ try:
+ run()
+ except KeyboardInterrupt:
+ print "Done"
--
1.7.1
More information about the tor-commits
mailing list