[or-cvs] r18490: {torflow} Deprecate soatcli. Improve snakeinspeector for html diffing. (torflow/trunk/NetworkScanners)
mikeperry at seul.org
mikeperry at seul.org
Wed Feb 11 15:23:39 UTC 2009
Author: mikeperry
Date: 2009-02-11 10:23:39 -0500 (Wed, 11 Feb 2009)
New Revision: 18490
Removed:
torflow/trunk/NetworkScanners/soatcli.py
Modified:
torflow/trunk/NetworkScanners/libsoat.py
torflow/trunk/NetworkScanners/snakeinspector.py
torflow/trunk/NetworkScanners/soat.py
Log:
Deprecate soatcli. Improve snakeinspeector for html diffing.
Add unique filename enforcement.
Modified: torflow/trunk/NetworkScanners/libsoat.py
===================================================================
--- torflow/trunk/NetworkScanners/libsoat.py 2009-02-11 13:48:09 UTC (rev 18489)
+++ torflow/trunk/NetworkScanners/libsoat.py 2009-02-11 15:23:39 UTC (rev 18490)
@@ -74,7 +74,8 @@
self.reason = reason
self.false_positive=False
self.false_positive_reason="None"
- self.verbose=False
+ self.verbose=0
+ self.filename=None
def _rebase(self, filename, new_data_root):
if not filename: return filename
@@ -83,7 +84,8 @@
return os.path.normpath(os.path.join(new_data_root, *split_file[1:]))
def rebase(self, new_data_root):
- pass
+ if 'filename' in self.__dict__: # XXX: Kill this...
+ self.filename = self._rebase(self.filename, new_data_root)
def mark_false_positive(self, reason):
self.false_positive=True
@@ -147,15 +149,14 @@
if self.verbose:
for cert in ssl_domain.cert_map.iterkeys():
ret += "\nCert for "+ssl_domain.cert_map[cert]+":\n"
- ret += cert
+ if self.verbose > 1: ret += cert
ret += self._dump_cert(cert)
if self.exit_cert:
- # XXX: Kill the first part of this clause after restart:
- if 'exit_ip' in self.__dict__ and self.exit_ip:
+ if self.exit_ip:
ret += "\nExit node's cert for "+self.exit_ip+":\n"
else:
ret += "\nExit node's cert:\n"
- ret += self.exit_cert
+ if self.verbose > 1: ret += self.exit_cert
ret += self._dump_cert(self.exit_cert)
return ret
@@ -332,29 +333,35 @@
ret = TestResult.__str__(self)
if self.verbose:
soup = old_soup = tor_soup = None
- if self.content and self.content_old:
+ if self.content:
content = open(self.content).read().decode('ascii', 'ignore')
+ soup = FullyStrainedSoup(content)
+
+ if self.content_old:
content_old = open(self.content_old).read().decode('ascii', 'ignore')
- soup = FullyStrainedSoup(content)
old_soup = FullyStrainedSoup(content_old)
- tags = map(str, soup.findAll())
- old_tags = map(str, old_soup.findAll())
- diff = difflib.unified_diff(old_tags, tags, "Non-Tor1", "Non-Tor2",
- lineterm="")
- for line in diff:
- ret+=line+"\n"
- if self.content and self.content_exit:
- content = open(self.content).read().decode('ascii', 'ignore')
+
+ if self.content_exit:
content_exit = open(self.content_exit).read().decode('ascii', 'ignore')
- soup = FullyStrainedSoup(content)
tor_soup = FullyStrainedSoup(content_exit)
- tags = map(str, soup.findAll())
- tor_tags = map(str, tor_soup.findAll())
- diff = difflib.unified_diff(tags, tor_tags, "Non-Tor", "Exit",
- lineterm="")
- for line in diff:
- ret+=line+"\n"
+ if self.verbose > 1:
+ if self.content and self.content_old:
+ tags = map(str, soup.findAll())
+ old_tags = map(str, old_soup.findAll())
+ diff = difflib.unified_diff(old_tags, tags, "Non-Tor1", "Non-Tor2",
+ lineterm="")
+ for line in diff:
+ ret+=line+"\n"
+
+ if self.content and self.content_exit:
+ tags = map(str, soup.findAll())
+ tor_tags = map(str, tor_soup.findAll())
+ diff = difflib.unified_diff(tags, tor_tags, "Non-Tor", "Exit",
+ lineterm="")
+ for line in diff:
+ ret+=line+"\n"
+
if soup and tor_soup and old_soup:
old_vs_new = SoupDiffer(old_soup, soup)
new_vs_old = SoupDiffer(soup, old_soup)
@@ -369,15 +376,28 @@
old_vs_new.changed_attributes_by_tag(),
new_vs_old.changed_attributes_by_tag())
- changed_content = bool(old_vs_new.changed_content() or old_vs_new.changed_content())
-
- ret += "\nTor changed tags:\n"
- ret += new_vs_tor.more_changed_tags(changed_tags)
- ret += "\nTor changed attrs:\n"
- ret += new_vs_tor.more_changed_attrs(changed_attributes)
- if not changed_content:
+ changed_content = bool(new_vs_old.changed_content() or old_vs_new.changed_content())
+
+ more_tags = new_vs_tor.more_changed_tags(changed_tags)
+ more_attrs = new_vs_tor.more_changed_attrs(changed_attributes)
+ more_content = new_vs_tor.changed_content()
+
+ if more_tags:
+ ret += "\nTor changed tags:\n"
+ ret += more_tags
+ if more_attrs:
+ ret += "\nTor changed attrs:\n"
+ ret += more_attrs
+ if not changed_content and more_content:
ret += "\nChanged Content:\n"
- ret += "\n".join(new_vs_tor.changed_content())+"\n"
+ ret += "\n".join(more_content)+"\n"
+ if (changed_content or not more_content) and not more_tags and not more_attrs:
+ ret += "\nSoupDiffer claims false positive.\n"
+ jsdiff = JSSoupDiffer(old_soup)
+ jsdiff.prune_differences(soup)
+ jsdifferences = jsdiff.show_differences(tor_soup)
+ if not jsdifferences: jsdifferences = "None."
+ ret += "Javascript Differences: "+jsdifferences+"\n"
else:
if self.content:
ret += " "+self.content+"\n"
@@ -512,15 +532,17 @@
fh = open(file, 'r')
return pickle.load(fh)
- def uniqueFilename(self, afile):
+ def uniqueFilename(afile):
if not os.path.exists(afile):
return afile
+ (prefix,suffix)=os.path.splitext(afile)
i=1
- while os.path.exists(afile+"."+str(i)):
+ while os.path.exists(prefix+"."+str(i)+suffix):
i+=1
- return afile+"."+str(i)
+ return prefix+"."+str(i)+suffix
+ uniqueFilename = Callable(uniqueFilename)
- def safeFilename(self, unsafe_file):
+ def safeFilename(unsafe_file):
'''
remove characters illegal in some systems
and trim the string to a reasonable length
@@ -528,16 +550,16 @@
unsafe_file = unsafe_file.decode('ascii', 'ignore')
safe_file = re.sub(unsafe_filechars, "_", unsafe_file)
return str(safe_file[:200])
+ safeFilename = Callable(safeFilename)
- def resultFilename(self, result):
- # XXX: Check existance and make a secondary name if exists.
+ def __resultFilename(self, result):
address = ''
if result.__class__.__name__ == 'HtmlTestResult' or result.__class__.__name__ == 'HttpTestResult':
- address = self.safeFilename(result.site[7:])
+ address = DataHandler.safeFilename(result.site[7:])
elif result.__class__.__name__ == 'SSLTestResult':
- address = self.safeFilename(result.site[8:])
+ address = DataHandler.safeFilename(result.site[8:])
elif 'TestResult' in result.__class__.__name__:
- address = self.safeFilename(result.site)
+ address = DataHandler.safeFilename(result.site)
else:
raise Exception, 'This doesn\'t seems to be a result instance.'
@@ -551,11 +573,12 @@
elif result.status == TEST_FAILURE:
rdir += 'failed/'
- return str((rdir+address+'.'+result.exit_node[1:]+".result").decode('ascii', 'ignore'))
+ return DataHandler.uniqueFilename(str((rdir+address+'.'+result.exit_node[1:]+".result").decode('ascii', 'ignore')))
def saveResult(self, result):
''' generic method for saving test results '''
- result_file = open(self.resultFilename(result), 'w')
+ result.filename = self.__resultFilename(result)
+ result_file = open(result.filename, 'w')
pickle.dump(result, result_file)
result_file.close()
@@ -805,6 +828,31 @@
return True
return False
+ def _difference_printer(self, other_cnts):
+ ret = ""
+ missing = []
+ miscount = []
+ new = []
+ for node in self.ast_cnts.iterkeys():
+ if not self.ast_cnts[node]: continue # pruned difference
+ if node not in other_cnts:
+ missing.append(str(node))
+ elif self.ast_cnts[node] != other_cnts[node]:
+ miscount.append(str(node))
+ for node in other_cnts.iterkeys():
+ if node not in self.ast_cnts:
+ new.append(str(node))
+ if missing:
+ ret += "\nMissing: "
+ for node in missing: ret += node
+ if new:
+ ret += "\nNew: "
+ for node in new: ret += node
+ if miscount:
+ ret += "\nMiscount: "
+ for node in miscount: ret += node
+ return ret
+
def prune_differences(self, other_string):
if not HAVE_PYPY: return
other_cnts = self._count_ast_elements(other_string)
@@ -817,6 +865,14 @@
other_cnts = self._count_ast_elements(other_string)
return self._difference_checker(other_cnts)
+ def show_differences(self, other_string):
+ ret = ""
+ if not HAVE_PYPY:
+ return "PyPy import not present. Not diffing javascript"
+ other_cnts = self._count_ast_elements(other_string)
+ return self._difference_printer(other_cnts)
+
+
class JSSoupDiffer(JSDiffer):
def _add_cnts(tag_cnts, ast_cnts):
ret_cnts = {}
@@ -855,14 +911,4 @@
ast_cnts = JSSoupDiffer._add_cnts(tag_cnts, ast_cnts)
return ast_cnts
- def prune_differences(self, other_soup):
- if not HAVE_PYPY: return
- other_cnts = self._count_ast_elements(other_soup)
- self._difference_pruner(other_cnts)
- def contains_differences(self, other_soup):
- if not HAVE_PYPY:
- plog("NOTICE", "PyPy import not present. Not diffing javascript")
- return False
- other_cnts = self._count_ast_elements(other_soup)
- return self._difference_checker(other_cnts)
Modified: torflow/trunk/NetworkScanners/snakeinspector.py
===================================================================
--- torflow/trunk/NetworkScanners/snakeinspector.py 2009-02-11 13:48:09 UTC (rev 18489)
+++ torflow/trunk/NetworkScanners/snakeinspector.py 2009-02-11 15:23:39 UTC (rev 18490)
@@ -36,7 +36,7 @@
node=None
reason=None
result=None
- verbose=False
+ verbose=0
proto=None
resultfilter=None
for o,a in opts:
@@ -49,7 +49,7 @@
elif o == '-r' or o == '--reason':
reason = a
elif o == '-v' or o == '--verbose':
- verbose = True
+ verbose += 1
elif o == '-t' or o == '--resultfilter':
resultfilter = a
elif o == '-p' or o == '--proto':
Modified: torflow/trunk/NetworkScanners/soat.py
===================================================================
--- torflow/trunk/NetworkScanners/soat.py 2009-02-11 13:48:09 UTC (rev 18489)
+++ torflow/trunk/NetworkScanners/soat.py 2009-02-11 15:23:39 UTC (rev 18490)
@@ -207,7 +207,7 @@
# Save this new result file in false positive dir
# and remove old one
try:
- os.unlink(self.datahandler.resultFilename(r))
+ os.unlink(r.filename)
except:
pass
r.mark_false_positive(reason)
@@ -513,7 +513,7 @@
''' check whether a http connection to a given address is molested '''
# an address representation acceptable for a filename
- address_file = self.datahandler.safeFilename(address[7:])
+ address_file = DataHandler.safeFilename(address[7:])
content_prefix = http_content_dir+address_file
# Keep a copy of the cookie jar before mods for refetch or
@@ -730,15 +730,14 @@
def _check_http_worker(self, address, http_ret):
(mime_type,pcontent,psha1sum,content,sha1sum,content_new,sha1sum_new,exit_node) = http_ret
- address_file = self.datahandler.safeFilename(address[7:])
+ address_file = DataHandler.safeFilename(address[7:])
content_prefix = http_content_dir+address_file
failed_prefix = http_failed_dir+address_file
# compare the new and old content
# if they match, means the node has been changing the content
if sha1sum.hexdigest() == sha1sum_new.hexdigest():
- # XXX: Check for existence of this file before overwriting
- exit_content_file = open(failed_prefix+'.'+exit_node[1:]+'.content', 'w')
+ exit_content_file = open(DataHandler.uniqueFilename(failed_prefix+'.'+exit_node[1:]+'.content'), 'w')
exit_content_file.write(pcontent)
exit_content_file.close()
@@ -752,8 +751,7 @@
self.register_exit_failure(address, exit_node)
return TEST_FAILURE
- # XXX: Check for existence of this file before overwriting
- exit_content_file = open(failed_prefix+'.'+exit_node[1:]+'.dyn-content','w')
+ exit_content_file = open(DataHandler.uniqueFilename(failed_prefix+'.'+exit_node[1:]+'.dyn-content'),'w')
exit_content_file.write(pcontent)
exit_content_file.close()
@@ -891,12 +889,11 @@
else: self.successes[address]=1
return TEST_SUCCESS
else:
- address_file = self.datahandler.safeFilename(address[7:])
+ address_file = DataHandler.safeFilename(address[7:])
content_prefix = http_content_dir+address_file
failed_prefix = http_failed_dir+address_file
- # XXX: Check for existence of this file before overwriting
- exit_content_file = open(failed_prefix+'.'+exit_node[1:]+'.dyn-content',
+ exit_content_file = open(DataHandler.uniqueFilename(failed_prefix+'.'+exit_node[1:]+'.dyn-content'),
'w')
exit_content_file.write(tor_js)
exit_content_file.close()
@@ -932,7 +929,7 @@
return self._check_http_worker(address, http_ret)
# an address representation acceptable for a filename
- address_file = self.datahandler.safeFilename(address[7:])
+ address_file = DataHandler.safeFilename(address[7:])
content_prefix = http_content_dir+address_file
failed_prefix = http_failed_dir+address_file
@@ -971,8 +968,7 @@
# compare the new and old content
# if they match, means the node has been changing the content
if str(orig_soup) == str(new_soup):
- # XXX: Check for existence of this file before overwriting
- exit_content_file = open(failed_prefix+'.'+exit_node[1:]+'.content', 'w')
+ exit_content_file = open(DataHandler.uniqueFilename(failed_prefix+'.'+exit_node[1:]+'.content'), 'w')
exit_content_file.write(tor_html)
exit_content_file.close()
@@ -1004,7 +1000,7 @@
old_vs_new.changed_attributes_by_tag(),
new_vs_old.changed_attributes_by_tag())
- changed_content = bool(old_vs_new.changed_content() or old_vs_new.changed_content())
+ changed_content = bool(new_vs_old.changed_content() or old_vs_new.changed_content())
# Verify all of our changed tags are present here
if new_vs_tor.has_more_changed_tags(changed_tags) or \
@@ -1013,11 +1009,14 @@
false_positive = False
else:
false_positive = True
+
+ plog("INFO", "SoupDiffer predicts false_positive="+str(false_positive))
if false_positive:
jsdiff = JSSoupDiffer(orig_soup)
jsdiff.prune_differences(new_soup)
false_positive = not jsdiff.contains_differences(tor_soup)
+ plog("INFO", "JSSoupDiffer predicts false_positive="+str(false_positive))
if false_positive:
plog("NOTICE", "False positive detected for dynamic change at "+address+" via "+exit_node)
@@ -1028,8 +1027,7 @@
else: self.successes[address]=1
return TEST_SUCCESS
- # XXX: Check for existence of this file before overwriting
- exit_content_file = open(failed_prefix+'.'+exit_node[1:]+'.dyn-content','w')
+ exit_content_file = open(DataHandler.uniqueFilename(failed_prefix+'.'+exit_node[1:]+'.dyn-content'),'w')
exit_content_file.write(tor_html)
exit_content_file.close()
@@ -1124,7 +1122,7 @@
plog('INFO', 'Conducting an ssl test with destination ' + address)
# an address representation acceptable for a filename
- address_file = self.datahandler.safeFilename(address[8:])
+ address_file = DataHandler.safeFilename(address[8:])
ssl_file_name = ssl_certs_dir + address_file + '.ssl'
# load the original cert and compare
Deleted: torflow/trunk/NetworkScanners/soatcli.py
===================================================================
--- torflow/trunk/NetworkScanners/soatcli.py 2009-02-11 13:48:09 UTC (rev 18489)
+++ torflow/trunk/NetworkScanners/soatcli.py 2009-02-11 15:23:39 UTC (rev 18490)
@@ -1,418 +0,0 @@
-#!/usr/bin/python
-#
-# 2008 Aleksei Gorny, mentored by Mike Perry
-
-import dircache
-import operator
-import os
-import pickle
-import sys
-import time
-
-import sets
-from sets import Set
-
-import libsoat
-from libsoat import *
-
-#
-# Displaying stats on the console
-#
-
-class StatsConsole:
- ''' Class to display statistics from CLI'''
-
- def Listen(self):
- while 1:
- input = raw_input(">>>")
- if input == 'e' or input == 'exit':
- exit()
- elif input == 's' or input == 'summary':
- self.Summary()
- elif input == 'h' or input == 'help' or len(input) > 6:
- self.Help()
- else:
- self.Reply(input)
-
- def Summary(self):
- dh = DataHandler()
- data = dh.getAll()
-
- nodeSet = Set([])
- sshSet = Set([])
- sslSet = Set([])
- httpSet = Set([])
- smtpSet = Set([])
- popSet = Set([])
- imapSet = Set([])
- dnsSet = Set([])
- dnsrebindSet = Set([])
-
- total = len(data)
- good = bad = inconclusive = 0
- ssh = http = ssl = pop = imap = smtp = dns = dnsrebind = 0
-
- for result in data:
- nodeSet.add(result.exit_node)
-
- if result.status == 0:
- good += 1
- elif result.status == 1:
- inconclusive += 1
- elif result.status == 2:
- bad += 1
-
- if result.__class__.__name__ == 'SSHTestResult':
- sshSet.add(result.exit_node)
- ssh += 1
- elif result.__class__.__name__ == 'HttpTestResult' or result.__class__.__name__ == 'HtmlTestResult':
- httpSet.add(result.exit_node)
- http += 1
- elif result.__class__.__name__ == 'SSLTestResult':
- sslSet.add(result.exit_node)
- ssl += 1
- elif result.__class__.__name__ == 'IMAPTestResult':
- imapSet.add(result.exit_node)
- imap += 1
- elif result.__class__.__name__ == 'POPTestResult':
- popSet.add(result.exit_node)
- pop += 1
- elif result.__class__.__name__ == 'SMTPTestResult':
- smtpSet.add(result.exit_node)
- smtp += 1
- elif result.__class__.__name__ == 'DNSTestResult':
- dnsSet.add(result.exit_node)
- dns += 1
- elif result.__class__.__name__ == 'DNSRebindTestResult':
- dnsrebindSet.add(result.exit_node)
- dnsrebind += 1
-
- swidth = 25
- nwidth = 10
- width = swidth + nwidth
-
- header_format = '%-*s%*s'
- format = '%-*s%*i'
-
- print '=' * width
- print header_format % (swidth, 'Parameter', nwidth, 'Count')
- print '-' * width
-
- stats = [
- ('Tests completed', total),
- ('Nodes tested', len(nodeSet)),
- ('Nodes SSL-tested', len(sslSet)),
- ('Nodes HTTP-tested', len(httpSet)),
- ('Nodes SSH-tested', len(sshSet)),
- ('Nodes POP-tested', len(popSet)),
- ('Nodes IMAP-tested', len(imapSet)),
- ('Nodes SMTP-tested', len(smtpSet)),
- ('Nodes DNS-tested', len(dnsSet)),
- ('Nodes DNSRebind-tested', len(dnsrebindSet)),
- ('Failed tests', bad),
- ('Succeeded tests', good),
- ('Inconclusive tests', inconclusive),
- ('SSH tests', ssh),
- ('HTTP tests', http),
- ('SSL tests', ssl),
- ('POP tests', pop),
- ('IMAP tests', imap),
- ('SMTP tests', smtp),
- ('DNS tests', dns),
- ('DNS rebind tests', dnsrebind)
- ]
-
- for (k,v) in stats:
- print format % (swidth, k, nwidth, v)
- print '=' * width
-
- def Reply(self, input):
-
- good = bad = inconclusive = False
- protocols = []
-
- if 'a' in input:
- good = bad = inconclusive = True
- protocols.extend(["ssh", "http", "ssl", "imap", "pop", "smtp"])
- else:
- good = 'g' in input
- bad = 'b' in input
- inconclusive = 'i' in input
-
- if 's' in input:
- protocols.append("ssh")
- if 'h' in input:
- protocols.append("http")
- if 'l' in input:
- protocols.append("ssl")
- if 'p' in input:
- protocols.append("imap")
- if 'o' in input:
- protocols.append("pop")
- if 't' in input:
- protocols.append("smtp")
- if 'd' in input:
- protocols.append("dns")
- if 'r' in input:
- protocols.append("dnsrebind")
-
- dh = DataHandler()
- data = dh.getAll()
- filtered = dh.filterResults(data, protocols, good, bad, inconclusive)
-
- nodewidth = 45
- typewidth = 10
- sitewidth = 30
- timewidth = 30
- statuswidth = 6
- width = nodewidth + typewidth + sitewidth + timewidth + statuswidth
-
- format = '%-*s%-*s%-*s%-*s%-*s'
-
- print '=' * width
- print format % (nodewidth, 'Exit node', typewidth, 'Test type', sitewidth, 'Remote site',
- timewidth, 'Time', statuswidth, 'Status')
- print '-' * width
- for result in filtered:
- print format % (nodewidth, `result.exit_node`,
- typewidth, result.__class__.__name__[:-10],
- sitewidth, result.site,
- timewidth, time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(result.timestamp)),
- statuswidth, `result.status`)
- print '=' * width
-
- def Help(self):
- print ''
- print 'Options:'
- print '* summmary (s) - display a short summary about all tests done so far'
- print '* exit (e) - terminate the program'
- print '* help (h) - display this help text'
- print '* all (a) - list all the results'
- print '* (shlgbi) - display a filtered list of test results. Letters are optional and mean the following:'
- print ' s - show ssh results'
- print ' h - show http results'
- print ' l - show ssl results'
- print ' g - show good results'
- print ' b - show bad results'
- print ' i - show inconclusive results'
- print ' p - show imap results'
- print ' o - show pop results'
- print ' t - show smtp results'
- print ' d - show dns results'
- print ' r - show dnsrebind results'
- print ''
-
-#
-# Displaying stats in a graphical setting (first check if we have wx)
-#
-
-nowx = False
-try:
- import wx
- from wx.lib.mixins.listctrl import ListCtrlAutoWidthMixin, ColumnSorterMixin
-except:
- nowx = True
-
-if not nowx:
-
- class ListMixin(wx.ListCtrl, ListCtrlAutoWidthMixin, ColumnSorterMixin):
- def __init__(self, parent, map):
- wx.ListCtrl.__init__(self, parent, -1, style=wx.LC_REPORT)
- ListCtrlAutoWidthMixin.__init__(self)
- ColumnSorterMixin.__init__(self, len(map))
- self.itemDataMap = map
-
- def GetListCtrl(self):
- return self
-
- # menu item ids
- ID_EXIT = 1
-
- ID_SHOW_GOOD = 11
- ID_SHOW_BAD = 12
- ID_SHOW_UNSURE = 13
-
- ID_SHOW_SSL = 21
- ID_SHOW_HTTP = 22
- ID_SHOW_SSH = 23
- ID_SHOW_SMTP = 24
- ID_SHOW_IMAP = 25
- ID_SHOW_POP = 26
- ID_SHOW_DNS = 27
- ID_SHOW_DNSREBIND = 28
-
- ID_NODE = 31
-
- class MainFrame(wx.Frame):
- ''' the main application window for displaying statistics with a GUI'''
- def __init__(self):
- wx.Frame.__init__(self, None, title="Soat test results", size=(900,500))
-
- # get the data
-
- self.dataHandler = DataHandler()
- self.dataList = self.dataHandler.getAll()
- self.filteredList = self.dataList
-
- # display it
-
- self.CreateStatusBar()
- self.initMenuBar()
- self.initContent()
-
- self.Center()
- self.Show()
-
- def initMenuBar(self):
- fileMenu = wx.Menu()
- fileMenu.Append(ID_EXIT, "E&xit", "Exit the program")
-
- viewMenu = wx.Menu()
- self.showGood = viewMenu.Append(ID_SHOW_GOOD, 'Show &Good', 'Show sucessful test results', kind=wx.ITEM_CHECK)
- self.showBad = viewMenu.Append(ID_SHOW_BAD, 'Show &Bad', 'Show unsucessful test results', kind=wx.ITEM_CHECK)
- self.showUnsure = viewMenu.Append(ID_SHOW_UNSURE, 'Show &Inconclusive', 'Show inconclusive test results', kind=wx.ITEM_CHECK)
- viewMenu.AppendSeparator()
- self.showSSL = viewMenu.Append(ID_SHOW_SSL, 'Show SS&L', 'Show SSL test results', kind=wx.ITEM_CHECK)
- self.showHTTP = viewMenu.Append(ID_SHOW_HTTP, 'Show &HTTP', 'Show HTTP test results', kind=wx.ITEM_CHECK)
- self.showSSH = viewMenu.Append(ID_SHOW_SSH, 'Show &SSH', 'Show SSH test results', kind=wx.ITEM_CHECK)
- viewMenu.AppendSeparator()
- self.showSMTP = viewMenu.Append(ID_SHOW_SMTP, 'Show SMTP', 'Show SMTP test results', kind=wx.ITEM_CHECK)
- self.showIMAP = viewMenu.Append(ID_SHOW_IMAP, 'Show IMAP', 'Show IMAP test results', kind=wx.ITEM_CHECK)
- self.showPOP = viewMenu.Append(ID_SHOW_POP, 'Show POP', 'Show POP test results', kind=wx.ITEM_CHECK)
- viewMenu.AppendSeparator()
- self.showDNS = viewMenu.Append(ID_SHOW_DNS, 'Show DNS', 'Show DNS test results', kind=wx.ITEM_CHECK)
- self.showDNSRebind = viewMenu.Append(ID_SHOW_DNSREBIND, 'Show DNSRebind', 'Show DNS rebind test results', kind=wx.ITEM_CHECK)
- viewMenu.AppendSeparator()
- viewMenu.Append(ID_NODE, '&Find node...', 'View test results for a given node [NOT IMPLEMENTED]')
-
- menuBar = wx.MenuBar()
- menuBar.Append(fileMenu,"&File")
- menuBar.Append(viewMenu,"&View")
-
- self.SetMenuBar(menuBar)
-
- wx.EVT_MENU(self, ID_EXIT, self.OnExit)
-
- wx.EVT_MENU(self, ID_SHOW_GOOD, self.GenerateFilteredList)
- wx.EVT_MENU(self, ID_SHOW_BAD, self.GenerateFilteredList)
- wx.EVT_MENU(self, ID_SHOW_UNSURE, self.GenerateFilteredList)
- viewMenu.Check(ID_SHOW_GOOD, True)
- viewMenu.Check(ID_SHOW_BAD, True)
- viewMenu.Check(ID_SHOW_UNSURE, True)
-
- for i in range(ID_SHOW_SSL, ID_SHOW_DNSREBIND + 1):
- viewMenu.Check(i, True)
- wx.EVT_MENU(self, i, self.GenerateFilteredList)
-
- def initContent(self):
- base = wx.Panel(self, -1)
- sizer = wx.GridBagSizer(0,0)
-
- box = wx.StaticBox(base, -1, 'Summary')
- boxSizer = wx.StaticBoxSizer(box, wx.HORIZONTAL)
-
- total = wx.StaticText(base, -1, 'Total tests: ' + `len(self.filteredList)`)
- boxSizer.Add(total, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 10)
-
- nodes = wx.StaticText(base, -1, 'Nodes scanned: ' + `len(Set([x.exit_node for x in self.filteredList]))`)
- boxSizer.Add(nodes, 0, wx.LEFT | wx.TOP | wx.BOTTOM , 10)
-
- bad = wx.StaticText(base, -1, 'Failed tests: ' + `len([x for x in self.filteredList if x.status == 2])`)
- boxSizer.Add(bad, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 10)
-
- suspicious = wx.StaticText(base, -1, 'Inconclusive tests: ' + `len([x for x in self.filteredList if x.status == 1])`)
- boxSizer.Add(suspicious, 0, wx.ALL, 10)
-
- sizer.Add(boxSizer, (0,0), (1, 5), wx.EXPAND | wx.ALL, 15)
-
- dataMap = {}
- self.fillDataMap(dataMap)
-
- self.listCtrl = ListMixin(base, dataMap)
- self.listCtrl.InsertColumn(0, 'exit node', width=380)
- self.listCtrl.InsertColumn(1, 'type', width=70)
- self.listCtrl.InsertColumn(2, 'site', width=180)
- self.listCtrl.InsertColumn(3, 'time', width=180)
- self.listCtrl.InsertColumn(4, 'status', wx.LIST_FORMAT_CENTER, width=50)
-
- self.fillListCtrl(dataMap)
-
- sizer.Add(self.listCtrl, (1,0), (1,5), wx.EXPAND | wx.LEFT | wx.BOTTOM | wx.RIGHT, border=15)
-
- sizer.AddGrowableCol(3)
- sizer.AddGrowableRow(1)
-
- base.SetSizerAndFit(sizer)
-
- # make a nasty dictionary from the current self.filteredList object so columns would be sortable
- def fillDataMap(self, dataMap):
- for i in range(len(self.filteredList)):
- dataMap.update([(i,(self.filteredList[i].exit_node,
- self.filteredList[i].__class__.__name__[:-10],
- self.filteredList[i].site,
- time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(self.filteredList[i].timestamp)),
- self.filteredList[i].status))])
-
- # fill the result listing with data
- def fillListCtrl(self, dataMap):
- if self.listCtrl.GetItemCount() > 0:
- self.listCtrl.DeleteAllItems()
-
- for k, i in dataMap.items():
- index = self.listCtrl.InsertStringItem(sys.maxint, `i[0]`)
- self.listCtrl.SetStringItem(index, 1, i[1])
- self.listCtrl.SetStringItem(index, 2, `i[2]`)
- self.listCtrl.SetStringItem(index, 3, i[3])
- self.listCtrl.SetStringItem(index, 4, `i[4]`)
- self.listCtrl.SetItemData(index,k)
-
- def OnExit(self,e):
- self.Close(True)
-
- def GenerateFilteredList(self, e):
- protocols = []
- if self.showSSH.IsChecked():
- protocols.append("ssh")
- if self.showHTTP.IsChecked():
- protocols.append("http")
- if self.showSSL.IsChecked():
- protocols.append("ssl")
- if self.showIMAP.IsChecked():
- protocols.append("imap")
- if self.showPOP.IsChecked():
- protocols.append("pop")
- if self.showSMTP.IsChecked():
- protocols.append("smtp")
- if self.showDNS.IsChecked():
- protocols.append("dns")
- if self.showDNSRebind.IsChecked():
- protocols.append("dnsrebind")
-
- self.filteredList = list(self.dataHandler.filterResults(self.dataList, protocols,
- self.showGood.IsChecked(), self.showBad.IsChecked(), self.showUnsure.IsChecked()))
-
- dataMap = {}
- self.fillDataMap(dataMap)
- self.fillListCtrl(dataMap)
- self.listCtrl.RefreshItems(0, len(dataMap))
-
-if __name__ == "__main__":
- if len(sys.argv) == 1:
- console = StatsConsole()
- console.Listen()
- elif len(sys.argv) == 2 and sys.argv[1] == 'wx':
- if nowx:
- print 'wxpython doesn\'t seem to be installed on your system'
- print 'you can use the console interface instead (see help)'
- else:
- app = wx.App(0)
- MainFrame()
- app.MainLoop()
- else:
- print ''
- print 'This app displays results of tests carried out by soat.py (in a user-friendly way).'
- print ''
- print 'Usage:'
- print 'python soatstats.py - app starts console-only'
- print 'python soatstats.py wx - app starts with a wxpython gui'
- print ''
More information about the tor-commits
mailing list