[or-cvs] r18224: {} Rename soatstats.py to soatcli.py. I may want to create a ne (torflow/trunk/NetworkScanners)

mikeperry at seul.org mikeperry at seul.org
Thu Jan 22 12:17:38 UTC 2009


Author: mikeperry
Date: 2009-01-22 07:17:38 -0500 (Thu, 22 Jan 2009)
New Revision: 18224

Added:
   torflow/trunk/NetworkScanners/soatcli.py
Removed:
   torflow/trunk/NetworkScanners/soatstats.py
Log:

Rename soatstats.py to soatcli.py. I may want to create a new
script that just dumps stats instead of being interactive.



Copied: torflow/trunk/NetworkScanners/soatcli.py (from rev 18083, torflow/trunk/NetworkScanners/soatstats.py)
===================================================================
--- torflow/trunk/NetworkScanners/soatcli.py	                        (rev 0)
+++ torflow/trunk/NetworkScanners/soatcli.py	2009-01-22 12:17:38 UTC (rev 18224)
@@ -0,0 +1,418 @@
+#!/usr/bin/python
+#
+# 2008 Aleksei Gorny, mentored by Mike Perry
+
+import dircache
+import operator
+import os
+import pickle
+import sys
+import time
+
+import sets
+from sets import Set
+
+import libsoat
+from libsoat import *
+
+#
+# Displaying stats on the console
+#
+
+class StatsConsole:
+    ''' Class to display statistics from CLI'''
+    
+    def Listen(self):
+        while 1:
+            input = raw_input(">>>")
+            if input == 'e' or input == 'exit':
+                exit()
+            elif input == 's' or input == 'summary':
+                self.Summary()
+            elif input == 'h' or input == 'help' or len(input) > 6:
+                self.Help() 
+            else:
+                self.Reply(input)
+    
+    def Summary(self):
+        dh = DataHandler()
+        data = dh.getAll()
+        
+        nodeSet = Set([])
+        sshSet = Set([])
+        sslSet = Set([])
+        httpSet = Set([])
+        smtpSet = Set([])
+        popSet = Set([])
+        imapSet = Set([])
+        dnsSet = Set([])
+        dnsrebindSet = Set([])
+
+        total = len(data)
+        good = bad = inconclusive = 0
+        ssh = http = ssl = pop = imap = smtp = dns = dnsrebind = 0
+
+        for result in data:
+            nodeSet.add(result.exit_node)
+            
+            if result.status == 0:
+                good += 1
+            elif result.status == 1:
+                inconclusive += 1
+            elif result.status == 2:
+                bad += 1
+            
+            if result.__class__.__name__ == 'SSHTestResult':
+                sshSet.add(result.exit_node)
+                ssh += 1
+            elif result.__class__.__name__ == 'HttpTestResult':
+                httpSet.add(result.exit_node)
+                http += 1
+            elif result.__class__.__name__ == 'SSLTestResult':
+                sslSet.add(result.exit_node)
+                ssl += 1
+            elif result.__class__.__name__ == 'IMAPTestResult':
+                imapSet.add(result.exit_node)
+                imap += 1
+            elif result.__class__.__name__ == 'POPTestResult':
+                popSet.add(result.exit_node)
+                pop += 1
+            elif result.__class__.__name__ == 'SMTPTestResult':
+                smtpSet.add(result.exit_node)
+                smtp += 1
+            elif result.__class__.__name__ == 'DNSTestResult':
+                dnsSet.add(result.exit_node)
+                dns += 1
+            elif result.__class__.__name__ == 'DNSRebindTestResult':
+                dnsrebindSet.add(result.exit_node)
+                dnsrebind += 1
+
+        swidth = 25
+        nwidth = 10
+        width = swidth + nwidth
+
+        header_format = '%-*s%*s'
+        format = '%-*s%*i'
+
+        print '=' * width
+        print header_format % (swidth, 'Parameter', nwidth, 'Count')
+        print '-' * width
+
+        stats = [
+            ('Tests completed', total),
+            ('Nodes tested', len(nodeSet)),
+            ('Nodes SSL-tested', len(sslSet)),
+            ('Nodes HTTP-tested', len(httpSet)),
+            ('Nodes SSH-tested', len(sshSet)),
+            ('Nodes POP-tested', len(popSet)),
+            ('Nodes IMAP-tested', len(imapSet)),
+            ('Nodes SMTP-tested', len(smtpSet)),
+            ('Nodes DNS-tested', len(dnsSet)),
+            ('Nodes DNSRebind-tested', len(dnsrebindSet)),
+            ('Failed tests', bad),
+            ('Succeeded tests', good),
+            ('Inconclusive tests', inconclusive),
+            ('SSH tests', ssh),
+            ('HTTP tests', http),
+            ('SSL tests', ssl),
+            ('POP tests', pop),
+            ('IMAP tests', imap),
+            ('SMTP tests', smtp),
+            ('DNS tests', dns),
+            ('DNS rebind tests', dnsrebind)
+        ]
+
+        for (k,v) in stats:
+            print format % (swidth, k, nwidth, v)
+        print '=' * width
+
+    def Reply(self, input):
+
+        good = bad = inconclusive = False
+        protocols = []
+
+        if 'a' in input:
+            good = bad = inconclusive = True
+            protocols.extend(["ssh", "http", "ssl", "imap", "pop", "smtp"])
+        else:
+            good = 'g' in input
+            bad = 'b' in input
+            inconclusive = 'i' in input
+
+            if 's' in input:
+                protocols.append("ssh")
+            if 'h' in input:
+                protocols.append("http")
+            if 'l' in input:
+                protocols.append("ssl")
+            if 'p' in input:
+                protocols.append("imap")
+            if 'o' in input:
+                protocols.append("pop")
+            if 't' in input:
+                protocols.append("smtp")
+            if 'd' in input:
+                protocols.append("dns")
+            if 'r' in input:
+                protocols.append("dnsrebind")
+
+        dh = DataHandler()
+        data = dh.getAll()
+        filtered = dh.filterResults(data, protocols, good, bad, inconclusive)
+
+        nodewidth = 45
+        typewidth = 10
+        sitewidth = 30
+        timewidth = 30
+        statuswidth = 6
+        width = nodewidth + typewidth + sitewidth + timewidth + statuswidth
+
+        format = '%-*s%-*s%-*s%-*s%-*s'
+
+        print '=' * width 
+        print format % (nodewidth, 'Exit node', typewidth, 'Test type', sitewidth, 'Remote site', 
+                timewidth, 'Time', statuswidth, 'Status')
+        print '-' * width
+        for result in filtered:
+            print format % (nodewidth, `result.exit_node`, 
+                    typewidth, result.__class__.__name__[:-10],
+                    sitewidth, result.site, 
+                    timewidth, time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(result.timestamp)), 
+                    statuswidth, `result.status`)
+        print '=' * width
+
+    def Help(self):
+        print ''
+        print 'Options:'
+        print '* summmary (s) - display a short summary about all tests done so far'
+        print '* exit (e) - terminate the program'
+        print '* help (h) - display this help text'
+        print '* all (a) - list all the results'
+        print '* (shlgbi) - display a filtered list of test results. Letters are optional and mean the following:'
+        print '  s - show ssh results'
+        print '  h - show http results'
+        print '  l - show ssl results'
+        print '  g - show good results'
+        print '  b - show bad results'
+        print '  i - show inconclusive results'
+        print '  p - show imap results'
+        print '  o - show pop results'
+        print '  t - show smtp results'
+        print '  d - show dns results'
+        print '  r - show dnsrebind results'
+        print ''
+
+#
+# Displaying stats in a graphical setting (first check if we have wx)
+#
+
+nowx = False
+try:
+    import wx
+    from wx.lib.mixins.listctrl import ListCtrlAutoWidthMixin, ColumnSorterMixin
+except:
+    nowx = True
+
+if not nowx:
+
+    class ListMixin(wx.ListCtrl, ListCtrlAutoWidthMixin, ColumnSorterMixin):
+        def __init__(self, parent, map):
+            wx.ListCtrl.__init__(self, parent, -1, style=wx.LC_REPORT)
+            ListCtrlAutoWidthMixin.__init__(self)
+            ColumnSorterMixin.__init__(self, len(map))
+            self.itemDataMap = map
+
+        def GetListCtrl(self):
+            return self
+
+    # menu item ids
+    ID_EXIT = 1
+
+    ID_SHOW_GOOD = 11
+    ID_SHOW_BAD = 12
+    ID_SHOW_UNSURE = 13
+
+    ID_SHOW_SSL = 21
+    ID_SHOW_HTTP = 22
+    ID_SHOW_SSH = 23
+    ID_SHOW_SMTP = 24
+    ID_SHOW_IMAP = 25
+    ID_SHOW_POP = 26
+    ID_SHOW_DNS = 27
+    ID_SHOW_DNSREBIND = 28
+
+    ID_NODE = 31
+
+    class MainFrame(wx.Frame):
+        ''' the main application window for displaying statistics with a GUI'''
+        def __init__(self):
+            wx.Frame.__init__(self, None, title="Soat test results", size=(900,500))
+         
+            # get the data
+
+            self.dataHandler = DataHandler()
+            self.dataList = self.dataHandler.getAll()
+            self.filteredList = self.dataList
+
+            # display it
+        
+            self.CreateStatusBar()
+            self.initMenuBar()
+            self.initContent()
+
+            self.Center()
+            self.Show()
+    
+        def initMenuBar(self):
+            fileMenu = wx.Menu()
+            fileMenu.Append(ID_EXIT, "E&xit", "Exit the program")
+        
+            viewMenu = wx.Menu()
+            self.showGood = viewMenu.Append(ID_SHOW_GOOD, 'Show &Good', 'Show sucessful test results', kind=wx.ITEM_CHECK)
+            self.showBad = viewMenu.Append(ID_SHOW_BAD, 'Show &Bad', 'Show unsucessful test results', kind=wx.ITEM_CHECK)
+            self.showUnsure = viewMenu.Append(ID_SHOW_UNSURE, 'Show &Inconclusive', 'Show inconclusive test results', kind=wx.ITEM_CHECK)
+            viewMenu.AppendSeparator()
+            self.showSSL = viewMenu.Append(ID_SHOW_SSL, 'Show SS&L', 'Show SSL test results', kind=wx.ITEM_CHECK)
+            self.showHTTP = viewMenu.Append(ID_SHOW_HTTP, 'Show &HTTP', 'Show HTTP test results', kind=wx.ITEM_CHECK)
+            self.showSSH = viewMenu.Append(ID_SHOW_SSH, 'Show &SSH', 'Show SSH test results', kind=wx.ITEM_CHECK)
+            viewMenu.AppendSeparator()
+            self.showSMTP = viewMenu.Append(ID_SHOW_SMTP, 'Show SMTP', 'Show SMTP test results', kind=wx.ITEM_CHECK)
+            self.showIMAP = viewMenu.Append(ID_SHOW_IMAP, 'Show IMAP', 'Show IMAP test results', kind=wx.ITEM_CHECK)
+            self.showPOP = viewMenu.Append(ID_SHOW_POP, 'Show POP', 'Show POP test results', kind=wx.ITEM_CHECK)
+            viewMenu.AppendSeparator()
+            self.showDNS = viewMenu.Append(ID_SHOW_DNS, 'Show DNS', 'Show DNS test results', kind=wx.ITEM_CHECK)
+            self.showDNSRebind = viewMenu.Append(ID_SHOW_DNSREBIND, 'Show DNSRebind', 'Show DNS rebind test results', kind=wx.ITEM_CHECK)
+            viewMenu.AppendSeparator()
+            viewMenu.Append(ID_NODE, '&Find node...', 'View test results for a given node [NOT IMPLEMENTED]')
+    
+            menuBar = wx.MenuBar()
+            menuBar.Append(fileMenu,"&File")
+            menuBar.Append(viewMenu,"&View")
+
+            self.SetMenuBar(menuBar)
+
+            wx.EVT_MENU(self, ID_EXIT, self.OnExit)
+
+            wx.EVT_MENU(self, ID_SHOW_GOOD, self.GenerateFilteredList)
+            wx.EVT_MENU(self, ID_SHOW_BAD, self.GenerateFilteredList)
+            wx.EVT_MENU(self, ID_SHOW_UNSURE, self.GenerateFilteredList)
+            viewMenu.Check(ID_SHOW_GOOD, True)
+            viewMenu.Check(ID_SHOW_BAD, True)
+            viewMenu.Check(ID_SHOW_UNSURE, True)
+            
+            for i in range(ID_SHOW_SSL, ID_SHOW_DNSREBIND + 1):
+                viewMenu.Check(i, True)
+                wx.EVT_MENU(self, i, self.GenerateFilteredList)
+
+        def initContent(self): 
+            base = wx.Panel(self, -1)
+            sizer = wx.GridBagSizer(0,0)
+
+            box = wx.StaticBox(base, -1, 'Summary')
+            boxSizer = wx.StaticBoxSizer(box, wx.HORIZONTAL)
+
+            total = wx.StaticText(base, -1, 'Total tests: ' + `len(self.filteredList)`)
+            boxSizer.Add(total, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 10)
+
+            nodes = wx.StaticText(base, -1, 'Nodes scanned: ' + `len(Set([x.exit_node for x in self.filteredList]))`)
+            boxSizer.Add(nodes, 0, wx.LEFT | wx.TOP | wx.BOTTOM , 10)
+
+            bad = wx.StaticText(base, -1, 'Failed tests: ' + `len([x for x in self.filteredList if x.status == 2])`)
+            boxSizer.Add(bad, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 10)
+
+            suspicious = wx.StaticText(base, -1, 'Inconclusive tests: ' + `len([x for x in self.filteredList if x.status == 1])`)
+            boxSizer.Add(suspicious, 0, wx.ALL, 10)
+
+            sizer.Add(boxSizer, (0,0), (1, 5), wx.EXPAND | wx.ALL, 15)
+
+            dataMap = {}
+            self.fillDataMap(dataMap)
+        
+            self.listCtrl = ListMixin(base, dataMap)
+            self.listCtrl.InsertColumn(0, 'exit node', width=380)
+            self.listCtrl.InsertColumn(1, 'type', width=70)
+            self.listCtrl.InsertColumn(2, 'site', width=180)
+            self.listCtrl.InsertColumn(3, 'time', width=180)
+            self.listCtrl.InsertColumn(4, 'status', wx.LIST_FORMAT_CENTER, width=50)
+
+            self.fillListCtrl(dataMap)
+        
+            sizer.Add(self.listCtrl, (1,0), (1,5), wx.EXPAND | wx.LEFT | wx.BOTTOM | wx.RIGHT, border=15)
+
+            sizer.AddGrowableCol(3)
+            sizer.AddGrowableRow(1)
+
+            base.SetSizerAndFit(sizer)
+
+        # make a nasty dictionary from the current self.filteredList object so columns would be sortable
+        def fillDataMap(self, dataMap):
+            for i in range(len(self.filteredList)):
+                dataMap.update([(i,(self.filteredList[i].exit_node, 
+                                self.filteredList[i].__class__.__name__[:-10],
+                                self.filteredList[i].site, 
+                                time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(self.filteredList[i].timestamp)), 
+                                self.filteredList[i].status))])
+
+        # fill the result listing with data
+        def fillListCtrl(self, dataMap):
+            if self.listCtrl.GetItemCount() > 0:
+                self.listCtrl.DeleteAllItems()
+
+            for k, i in dataMap.items():
+                index = self.listCtrl.InsertStringItem(sys.maxint, `i[0]`)
+                self.listCtrl.SetStringItem(index, 1, i[1])
+                self.listCtrl.SetStringItem(index, 2, `i[2]`) 
+                self.listCtrl.SetStringItem(index, 3, i[3])
+                self.listCtrl.SetStringItem(index, 4, `i[4]`)
+                self.listCtrl.SetItemData(index,k)
+
+        def OnExit(self,e):
+            self.Close(True)
+
+        def GenerateFilteredList(self, e): 
+            protocols = []
+            if self.showSSH.IsChecked():
+                protocols.append("ssh") 
+            if self.showHTTP.IsChecked():
+                protocols.append("http")
+            if self.showSSL.IsChecked():
+                protocols.append("ssl")
+            if self.showIMAP.IsChecked():
+                protocols.append("imap")
+            if self.showPOP.IsChecked():
+                protocols.append("pop")
+            if self.showSMTP.IsChecked():
+                protocols.append("smtp")
+            if self.showDNS.IsChecked():
+                protocols.append("dns")
+            if self.showDNSRebind.IsChecked():
+                protocols.append("dnsrebind")
+
+            self.filteredList = list(self.dataHandler.filterResults(self.dataList, protocols, 
+                self.showGood.IsChecked(), self.showBad.IsChecked(), self.showUnsure.IsChecked()))
+
+            dataMap = {}
+            self.fillDataMap(dataMap)
+            self.fillListCtrl(dataMap)
+            self.listCtrl.RefreshItems(0, len(dataMap)) 
+
+if __name__ == "__main__":
+    if len(sys.argv) == 1:
+        console = StatsConsole()
+        console.Listen()
+    elif len(sys.argv) == 2 and sys.argv[1] == 'wx':
+        if nowx:
+            print 'wxpython doesn\'t seem to be installed on your system'
+            print 'you can use the console interface instead (see help)'
+        else:
+            app = wx.App(0)
+            MainFrame()
+            app.MainLoop()
+    else:
+        print ''
+        print 'This app displays results of tests carried out by soat.py (in a user-friendly way).'
+        print ''
+        print 'Usage:'
+        print 'python soatstats.py - app starts console-only'
+        print 'python soatstats.py wx - app starts with a wxpython gui'
+        print ''

Deleted: torflow/trunk/NetworkScanners/soatstats.py
===================================================================
--- torflow/trunk/NetworkScanners/soatstats.py	2009-01-22 12:16:05 UTC (rev 18223)
+++ torflow/trunk/NetworkScanners/soatstats.py	2009-01-22 12:17:38 UTC (rev 18224)
@@ -1,598 +0,0 @@
-#!/usr/bin/python
-#
-# 2008 Aleksei Gorny, mentored by Mike Perry
-
-import dircache
-import operator
-import os
-import pickle
-import sys
-import time
-
-import sets
-from sets import Set
-
-#
-# Data storage
-#
-
-# data locations
-
-data_dir = './data/soat/'
-ssl_certs_dir = data_dir + 'ssl/certs/'
-http_tags_dir = data_dir + 'http/tags/'
-
-# constants
-
-TEST_SUCCESS = 0
-TEST_INCONCLUSIVE = 1
-TEST_FAILURE = 2
-
-# classes to use with pickle to dump test results into files
-
-class TestResult(object):
-    ''' Parent class for all test result classes '''
-    def __init__(self, exit_node, site, status):
-        self.exit_node = exit_node
-        self.site = site
-        self.timestamp = time.time()
-        self.status = status
-
-class SSLTestResult(TestResult):
-    ''' Represents the result of an openssl test '''
-    def __init__(self, exit_node, ssl_site, cert_file, status):
-        super(SSLTestResult, self).__init__(exit_node, ssl_site, status)
-        self.cert = cert_file
-
-class HttpTestResult(TestResult):
-    ''' Represents the result of a http test '''
-    def __init__(self, exit_node, website, tag_prints, status):
-        super(HttpTestResult, self).__init__(exit_node, website, status)
-        self.tag_prints = tag_prints
-
-class SSHTestResult(TestResult):
-    ''' Represents the result of an ssh test '''
-    def __init__(self, exit_node, ssh_site, status):
-        super(SSHTestResult, self).__init__(exit_node, ssh_site, status)
-
-class DNSTestResult(TestResult):
-    ''' Represents the result of a dns test '''
-    def __init__(self, exit_node, dns_site, status):
-        super(DNSTestResult, self).__init__(exit_node, dns_site, status)
-
-class DNSRebindTestResult(TestResult):
-    ''' Represents the result of a dns rebind test '''
-    def __init__(self, exit_node, dns_rebind_site, status):
-        super(DNSRebindTestResult, self).__init__(exit_node, dns_rebind_site, status)
-
-class SMTPTestResult(TestResult):
-    ''' Represents the result of an smtp test '''
-    def __init__(self, exit_node, smtp_site, status):
-        super(SMTPTestResult, self).__init__(exit_node, smtp_site, status)
-
-class IMAPTestResult(TestResult):
-    ''' Represents the result of an imap test '''
-    def __init__(self, exit_node, imap_site, status):
-        super(IMAPTestResult, self).__init__(exit_node, imap_site, status)
-
-class POPTestResult(TestResult):
-    ''' Represents the result of a pop test '''
-    def __init__(self, exit_node, pop_site, status):
-        super(POPTestResult, self).__init__(exit_node, pop_site, status)
-
-class DataHandler:
-    ''' Class for saving and managing test result data '''
-    def filterResults(self, results, protocols=[], show_good=False, 
-            show_bad=False, show_inconclusive=False):
-        ''' filter results based on protocol and success level ''' 
-
-        protocol_filters = []
-        status_filters = []
-
-        for protocol in protocols:
-            protocol_filters.append(lambda x, p=protocol: x.__class__.__name__.lower()[:-10].endswith(p))
-        if show_good:
-            status_filters.append(lambda x: x.status == TEST_SUCCESS)
-        if show_bad:
-            status_filters.append(lambda x: x.status == TEST_FAILURE)
-        if show_inconclusive:
-            status_filters.append(lambda x: x.status == TEST_INCONCLUSIVE)
-
-        if len(protocol_filters) == 0 or len(status_filters) == 0:
-            return []
-       
-        protocol_filter = lambda x: reduce(operator.__or__, [f(x) for f in protocol_filters])
-        status_filter = lambda x: reduce(operator.__or__, [f(x) for f in status_filters])
-
-        return [x for x in results if (protocol_filter(x) and status_filter(x))]
-        
-    def filterByNode(self, results, id):
-        ''' filter by node'''
-        return filter(lambda x: x.exit_node == id, results)
-
-    def getAll(self):
-        ''' get all available results'''
-        return self.__getResults(data_dir)
-
-    def getSsh(self):
-        ''' get results of ssh tests '''
-        return self.__getResults(data_dir + 'ssh/')
-        
-    def getHttp(self):
-        ''' get results of http tests '''
-        return self.__getResults(data_dir + 'http/')
-
-    def getSsl(self):
-        ''' get results of ssl tests '''
-        return self.__getResults(data_dir + 'ssl/')
-
-    def getSmtp(self):
-        ''' get results of smtp tests '''
-        return self.__getResults(data_dir + 'smtp/')
-
-    def getPop(self):
-        ''' get results of pop tests '''
-        return self.__getResults(data_dir + 'pop/')
-
-    def getImap(self):
-        ''' get results of imap tests '''
-        return self.__getResults(data_dir + 'imap/')
-
-    def getDns(self):
-        ''' get results of basic dns tests '''
-        return self.__getResults(data_dir + 'dns')
-
-    def getDnsRebind(self):
-        ''' get results of dns rebind tests '''
-        return self.__getResults(data_dir + 'dnsbrebind/')
-
-    def __getResults(self, dir):
-        ''' 
-        recursively traverse the directory tree starting with dir
-        gather test results from files ending with .result
-        '''
-        results = []
-
-        for root, dirs, files in os.walk(dir):
-            for file in files:
-                if file.endswith('result'):
-                    fh = open(os.path.join(root, file))
-                    result = pickle.load(fh)
-                    results.append(result)
-
-        return results
-
-    def safeFilename(self, str):
-        ''' 
-        remove characters illegal in some systems 
-        and trim the string to a reasonable length
-        '''
-        replaced = (str.replace('/','_').replace('\\','_').replace('?','_').replace(':','_').
-            replace('|','_').replace('*','_').replace('<','_').replace('>','_').replace('"',''))
-        return replaced[:200]
-
-    def saveResult(self, result):
-        ''' generic method for saving test results '''
-        address = ''
-        if result.__class__.__name__ == 'HttpTestResult':
-            address = self.safeFilename(result.site[7:])
-        elif result.__class__.__name__ == 'SSLTestResult':
-            address = self.safeFilename(result.site[8:])
-        elif 'TestResult' in result.__class__.__name__:
-            address = self.safeFilename(result.site)
-        else:
-            raise Exception, 'This doesn\'t seems to be a result instance.'
-
-        dir = data_dir + result.__class__.__name__[:-10].lower() + '/'
-        if result.status == TEST_SUCCESS:
-            dir += 'successful/'
-        if result.status == TEST_INCONCLUSIVE:
-            dir += 'inconclusive/'
-        if result.status == TEST_FAILURE:
-            dir += 'failed/'
-        
-        result_file = open(dir + `result.exit_node` + address + '.result', 'w')
-        pickle.dump(result, result_file)
-        result_file.close()
-    
-#
-# Displaying stats on the console
-#
-
-class StatsConsole:
-    ''' Class to display statistics from CLI'''
-    
-    def Listen(self):
-        while 1:
-            input = raw_input(">>>")
-            if input == 'e' or input == 'exit':
-                exit()
-            elif input == 's' or input == 'summary':
-                self.Summary()
-            elif input == 'h' or input == 'help' or len(input) > 6:
-                self.Help() 
-            else:
-                self.Reply(input)
-    
-    def Summary(self):
-        dh = DataHandler()
-        data = dh.getAll()
-        
-        nodeSet = Set([])
-        sshSet = Set([])
-        sslSet = Set([])
-        httpSet = Set([])
-        smtpSet = Set([])
-        popSet = Set([])
-        imapSet = Set([])
-        dnsSet = Set([])
-        dnsrebindSet = Set([])
-
-        total = len(data)
-        good = bad = inconclusive = 0
-        ssh = http = ssl = pop = imap = smtp = dns = dnsrebind = 0
-
-        for result in data:
-            nodeSet.add(result.exit_node)
-            
-            if result.status == 0:
-                good += 1
-            elif result.status == 1:
-                inconclusive += 1
-            elif result.status == 2:
-                bad += 1
-            
-            if result.__class__.__name__ == 'SSHTestResult':
-                sshSet.add(result.exit_node)
-                ssh += 1
-            elif result.__class__.__name__ == 'HttpTestResult':
-                httpSet.add(result.exit_node)
-                http += 1
-            elif result.__class__.__name__ == 'SSLTestResult':
-                sslSet.add(result.exit_node)
-                ssl += 1
-            elif result.__class__.__name__ == 'IMAPTestResult':
-                imapSet.add(result.exit_node)
-                imap += 1
-            elif result.__class__.__name__ == 'POPTestResult':
-                popSet.add(result.exit_node)
-                pop += 1
-            elif result.__class__.__name__ == 'SMTPTestResult':
-                smtpSet.add(result.exit_node)
-                smtp += 1
-            elif result.__class__.__name__ == 'DNSTestResult':
-                dnsSet.add(result.exit_node)
-                dns += 1
-            elif result.__class__.__name__ == 'DNSRebindTestResult':
-                dnsrebindSet.add(result.exit_node)
-                dnsrebind += 1
-
-        swidth = 25
-        nwidth = 10
-        width = swidth + nwidth
-
-        header_format = '%-*s%*s'
-        format = '%-*s%*i'
-
-        print '=' * width
-        print header_format % (swidth, 'Parameter', nwidth, 'Count')
-        print '-' * width
-
-        stats = [
-            ('Tests completed', total),
-            ('Nodes tested', len(nodeSet)),
-            ('Nodes SSL-tested', len(sslSet)),
-            ('Nodes HTTP-tested', len(httpSet)),
-            ('Nodes SSH-tested', len(sshSet)),
-            ('Nodes POP-tested', len(popSet)),
-            ('Nodes IMAP-tested', len(imapSet)),
-            ('Nodes SMTP-tested', len(smtpSet)),
-            ('Nodes DNS-tested', len(dnsSet)),
-            ('Nodes DNSRebind-tested', len(dnsrebindSet)),
-            ('Failed tests', bad),
-            ('Succeeded tests', good),
-            ('Inconclusive tests', inconclusive),
-            ('SSH tests', ssh),
-            ('HTTP tests', http),
-            ('SSL tests', ssl),
-            ('POP tests', pop),
-            ('IMAP tests', imap),
-            ('SMTP tests', smtp),
-            ('DNS tests', dns),
-            ('DNS rebind tests', dnsrebind)
-        ]
-
-        for (k,v) in stats:
-            print format % (swidth, k, nwidth, v)
-        print '=' * width
-
-    def Reply(self, input):
-
-        good = bad = inconclusive = False
-        protocols = []
-
-        if 'a' in input:
-            good = bad = inconclusive = True
-            protocols.extend(["ssh", "http", "ssl", "imap", "pop", "smtp"])
-        else:
-            good = 'g' in input
-            bad = 'b' in input
-            inconclusive = 'i' in input
-
-            if 's' in input:
-                protocols.append("ssh")
-            if 'h' in input:
-                protocols.append("http")
-            if 'l' in input:
-                protocols.append("ssl")
-            if 'p' in input:
-                protocols.append("imap")
-            if 'o' in input:
-                protocols.append("pop")
-            if 't' in input:
-                protocols.append("smtp")
-            if 'd' in input:
-                protocols.append("dns")
-            if 'r' in input:
-                protocols.append("dnsrebind")
-
-        dh = DataHandler()
-        data = dh.getAll()
-        filtered = dh.filterResults(data, protocols, good, bad, inconclusive)
-
-        nodewidth = 45
-        typewidth = 10
-        sitewidth = 30
-        timewidth = 30
-        statuswidth = 6
-        width = nodewidth + typewidth + sitewidth + timewidth + statuswidth
-
-        format = '%-*s%-*s%-*s%-*s%-*s'
-
-        print '=' * width 
-        print format % (nodewidth, 'Exit node', typewidth, 'Test type', sitewidth, 'Remote site', 
-                timewidth, 'Time', statuswidth, 'Status')
-        print '-' * width
-        for result in filtered:
-            print format % (nodewidth, `result.exit_node`, 
-                    typewidth, result.__class__.__name__[:-10],
-                    sitewidth, result.site, 
-                    timewidth, time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(result.timestamp)), 
-                    statuswidth, `result.status`)
-        print '=' * width
-
-    def Help(self):
-        print ''
-        print 'Options:'
-        print '* summmary (s) - display a short summary about all tests done so far'
-        print '* exit (e) - terminate the program'
-        print '* help (h) - display this help text'
-        print '* all (a) - list all the results'
-        print '* (shlgbi) - display a filtered list of test results. Letters are optional and mean the following:'
-        print '  s - show ssh results'
-        print '  h - show http results'
-        print '  l - show ssl results'
-        print '  g - show good results'
-        print '  b - show bad results'
-        print '  i - show inconclusive results'
-        print '  p - show imap results'
-        print '  o - show pop results'
-        print '  t - show smtp results'
-        print '  d - show dns results'
-        print '  r - show dnsrebind results'
-        print ''
-
-#
-# Displaying stats in a graphical setting (first check if we have wx)
-#
-
-nowx = False
-try:
-    import wx
-    from wx.lib.mixins.listctrl import ListCtrlAutoWidthMixin, ColumnSorterMixin
-except:
-    nowx = True
-
-if not nowx:
-
-    class ListMixin(wx.ListCtrl, ListCtrlAutoWidthMixin, ColumnSorterMixin):
-        def __init__(self, parent, map):
-            wx.ListCtrl.__init__(self, parent, -1, style=wx.LC_REPORT)
-            ListCtrlAutoWidthMixin.__init__(self)
-            ColumnSorterMixin.__init__(self, len(map))
-            self.itemDataMap = map
-
-        def GetListCtrl(self):
-            return self
-
-    # menu item ids
-    ID_EXIT = 1
-
-    ID_SHOW_GOOD = 11
-    ID_SHOW_BAD = 12
-    ID_SHOW_UNSURE = 13
-
-    ID_SHOW_SSL = 21
-    ID_SHOW_HTTP = 22
-    ID_SHOW_SSH = 23
-    ID_SHOW_SMTP = 24
-    ID_SHOW_IMAP = 25
-    ID_SHOW_POP = 26
-    ID_SHOW_DNS = 27
-    ID_SHOW_DNSREBIND = 28
-
-    ID_NODE = 31
-
-    class MainFrame(wx.Frame):
-        ''' the main application window for displaying statistics with a GUI'''
-        def __init__(self):
-            wx.Frame.__init__(self, None, title="Soat test results", size=(900,500))
-         
-            # get the data
-
-            self.dataHandler = DataHandler()
-            self.dataList = self.dataHandler.getAll()
-            self.filteredList = self.dataList
-
-            # display it
-        
-            self.CreateStatusBar()
-            self.initMenuBar()
-            self.initContent()
-
-            self.Center()
-            self.Show()
-    
-        def initMenuBar(self):
-            fileMenu = wx.Menu()
-            fileMenu.Append(ID_EXIT, "E&xit", "Exit the program")
-        
-            viewMenu = wx.Menu()
-            self.showGood = viewMenu.Append(ID_SHOW_GOOD, 'Show &Good', 'Show sucessful test results', kind=wx.ITEM_CHECK)
-            self.showBad = viewMenu.Append(ID_SHOW_BAD, 'Show &Bad', 'Show unsucessful test results', kind=wx.ITEM_CHECK)
-            self.showUnsure = viewMenu.Append(ID_SHOW_UNSURE, 'Show &Inconclusive', 'Show inconclusive test results', kind=wx.ITEM_CHECK)
-            viewMenu.AppendSeparator()
-            self.showSSL = viewMenu.Append(ID_SHOW_SSL, 'Show SS&L', 'Show SSL test results', kind=wx.ITEM_CHECK)
-            self.showHTTP = viewMenu.Append(ID_SHOW_HTTP, 'Show &HTTP', 'Show HTTP test results', kind=wx.ITEM_CHECK)
-            self.showSSH = viewMenu.Append(ID_SHOW_SSH, 'Show &SSH', 'Show SSH test results', kind=wx.ITEM_CHECK)
-            viewMenu.AppendSeparator()
-            self.showSMTP = viewMenu.Append(ID_SHOW_SMTP, 'Show SMTP', 'Show SMTP test results', kind=wx.ITEM_CHECK)
-            self.showIMAP = viewMenu.Append(ID_SHOW_IMAP, 'Show IMAP', 'Show IMAP test results', kind=wx.ITEM_CHECK)
-            self.showPOP = viewMenu.Append(ID_SHOW_POP, 'Show POP', 'Show POP test results', kind=wx.ITEM_CHECK)
-            viewMenu.AppendSeparator()
-            self.showDNS = viewMenu.Append(ID_SHOW_DNS, 'Show DNS', 'Show DNS test results', kind=wx.ITEM_CHECK)
-            self.showDNSRebind = viewMenu.Append(ID_SHOW_DNSREBIND, 'Show DNSRebind', 'Show DNS rebind test results', kind=wx.ITEM_CHECK)
-            viewMenu.AppendSeparator()
-            viewMenu.Append(ID_NODE, '&Find node...', 'View test results for a given node [NOT IMPLEMENTED]')
-    
-            menuBar = wx.MenuBar()
-            menuBar.Append(fileMenu,"&File")
-            menuBar.Append(viewMenu,"&View")
-
-            self.SetMenuBar(menuBar)
-
-            wx.EVT_MENU(self, ID_EXIT, self.OnExit)
-
-            wx.EVT_MENU(self, ID_SHOW_GOOD, self.GenerateFilteredList)
-            wx.EVT_MENU(self, ID_SHOW_BAD, self.GenerateFilteredList)
-            wx.EVT_MENU(self, ID_SHOW_UNSURE, self.GenerateFilteredList)
-            viewMenu.Check(ID_SHOW_GOOD, True)
-            viewMenu.Check(ID_SHOW_BAD, True)
-            viewMenu.Check(ID_SHOW_UNSURE, True)
-            
-            for i in range(ID_SHOW_SSL, ID_SHOW_DNSREBIND + 1):
-                viewMenu.Check(i, True)
-                wx.EVT_MENU(self, i, self.GenerateFilteredList)
-
-        def initContent(self): 
-            base = wx.Panel(self, -1)
-            sizer = wx.GridBagSizer(0,0)
-
-            box = wx.StaticBox(base, -1, 'Summary')
-            boxSizer = wx.StaticBoxSizer(box, wx.HORIZONTAL)
-
-            total = wx.StaticText(base, -1, 'Total tests: ' + `len(self.filteredList)`)
-            boxSizer.Add(total, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 10)
-
-            nodes = wx.StaticText(base, -1, 'Nodes scanned: ' + `len(Set([x.exit_node for x in self.filteredList]))`)
-            boxSizer.Add(nodes, 0, wx.LEFT | wx.TOP | wx.BOTTOM , 10)
-
-            bad = wx.StaticText(base, -1, 'Failed tests: ' + `len([x for x in self.filteredList if x.status == 2])`)
-            boxSizer.Add(bad, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 10)
-
-            suspicious = wx.StaticText(base, -1, 'Inconclusive tests: ' + `len([x for x in self.filteredList if x.status == 1])`)
-            boxSizer.Add(suspicious, 0, wx.ALL, 10)
-
-            sizer.Add(boxSizer, (0,0), (1, 5), wx.EXPAND | wx.ALL, 15)
-
-            dataMap = {}
-            self.fillDataMap(dataMap)
-        
-            self.listCtrl = ListMixin(base, dataMap)
-            self.listCtrl.InsertColumn(0, 'exit node', width=380)
-            self.listCtrl.InsertColumn(1, 'type', width=70)
-            self.listCtrl.InsertColumn(2, 'site', width=180)
-            self.listCtrl.InsertColumn(3, 'time', width=180)
-            self.listCtrl.InsertColumn(4, 'status', wx.LIST_FORMAT_CENTER, width=50)
-
-            self.fillListCtrl(dataMap)
-        
-            sizer.Add(self.listCtrl, (1,0), (1,5), wx.EXPAND | wx.LEFT | wx.BOTTOM | wx.RIGHT, border=15)
-
-            sizer.AddGrowableCol(3)
-            sizer.AddGrowableRow(1)
-
-            base.SetSizerAndFit(sizer)
-
-        # make a nasty dictionary from the current self.filteredList object so columns would be sortable
-        def fillDataMap(self, dataMap):
-            for i in range(len(self.filteredList)):
-                dataMap.update([(i,(self.filteredList[i].exit_node, 
-                                self.filteredList[i].__class__.__name__[:-10],
-                                self.filteredList[i].site, 
-                                time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(self.filteredList[i].timestamp)), 
-                                self.filteredList[i].status))])
-
-        # fill the result listing with data
-        def fillListCtrl(self, dataMap):
-            if self.listCtrl.GetItemCount() > 0:
-                self.listCtrl.DeleteAllItems()
-
-            for k, i in dataMap.items():
-                index = self.listCtrl.InsertStringItem(sys.maxint, `i[0]`)
-                self.listCtrl.SetStringItem(index, 1, i[1])
-                self.listCtrl.SetStringItem(index, 2, `i[2]`) 
-                self.listCtrl.SetStringItem(index, 3, i[3])
-                self.listCtrl.SetStringItem(index, 4, `i[4]`)
-                self.listCtrl.SetItemData(index,k)
-
-        def OnExit(self,e):
-            self.Close(True)
-
-        def GenerateFilteredList(self, e): 
-            protocols = []
-            if self.showSSH.IsChecked():
-                protocols.append("ssh") 
-            if self.showHTTP.IsChecked():
-                protocols.append("http")
-            if self.showSSL.IsChecked():
-                protocols.append("ssl")
-            if self.showIMAP.IsChecked():
-                protocols.append("imap")
-            if self.showPOP.IsChecked():
-                protocols.append("pop")
-            if self.showSMTP.IsChecked():
-                protocols.append("smtp")
-            if self.showDNS.IsChecked():
-                protocols.append("dns")
-            if self.showDNSRebind.IsChecked():
-                protocols.append("dnsrebind")
-
-            self.filteredList = list(self.dataHandler.filterResults(self.dataList, protocols, 
-                self.showGood.IsChecked(), self.showBad.IsChecked(), self.showUnsure.IsChecked()))
-
-            dataMap = {}
-            self.fillDataMap(dataMap)
-            self.fillListCtrl(dataMap)
-            self.listCtrl.RefreshItems(0, len(dataMap)) 
-
-if __name__ == "__main__":
-    if len(sys.argv) == 1:
-        console = StatsConsole()
-        console.Listen()
-    elif len(sys.argv) == 2 and sys.argv[1] == 'wx':
-        if nowx:
-            print 'wxpython doesn\'t seem to be installed on your system'
-            print 'you can use the console interface instead (see help)'
-        else:
-            app = wx.App(0)
-            MainFrame()
-            app.MainLoop()
-    else:
-        print ''
-        print 'This app displays results of tests carried out by soat.py (in a user-friendly way).'
-        print ''
-        print 'Usage:'
-        print 'python soatstats.py - app starts console-only'
-        print 'python soatstats.py wx - app starts with a wxpython gui'
-        print ''



More information about the tor-commits mailing list