[tor-commits] [ooni-probe/master] Make the sniffer not run in a separate thread, but use a non blocking fdesc

art at torproject.org art at torproject.org
Sun Nov 25 14:47:21 UTC 2012


commit 615ce75c47aec249b6b5a4c0b58fdf7a93f09582
Author: Arturo Filastò <art at fuffa.org>
Date:   Sun Nov 25 10:08:47 2012 +0100

    Make the sniffer not run in a separate thread, but use a non blocking fdesc
    * Do some refactoring of scapy testing, following Factory creational pattern
      and a pub-sub pattern for the readers and writers (inspired by muxTCP).
    * Other misc refactoring
---
 ooni/oonicli.py          |    6 ++-
 ooni/templates/scapyt.py |   50 ++++++++---------
 ooni/utils/net.py        |   23 --------
 ooni/utils/txscapy.py    |  138 ++++++++++++++++++++++++++++++++--------------
 4 files changed, 126 insertions(+), 91 deletions(-)

diff --git a/ooni/oonicli.py b/ooni/oonicli.py
index 5c582b2..1a316b3 100644
--- a/ooni/oonicli.py
+++ b/ooni/oonicli.py
@@ -100,6 +100,7 @@ def runTest(cmd_line_options):
     classes = runner.findTestClassesFromFile(cmd_line_options['test'])
     test_cases, options = runner.loadTestsAndOptions(classes, cmd_line_options)
     if config.privacy.includepcap:
+        from ooni.utils.txscapy import ScapyFactory, ScapySniffer
         try:
             checkForRoot()
         except NotRootError:
@@ -108,7 +109,10 @@ def runTest(cmd_line_options):
             sys.exit(1)
 
         print "Starting sniffer"
-        net.capturePackets(config.reports.pcap)
+        config.scapyFactory = ScapyFactory(config.advanced.interface)
+
+        sniffer = ScapySniffer(config.reports.pcap)
+        config.scapyFactory.registerProtocol(sniffer)
 
     return runner.runTestCases(test_cases, options, cmd_line_options)
 
diff --git a/ooni/templates/scapyt.py b/ooni/templates/scapyt.py
index 11b4381..cb02300 100644
--- a/ooni/templates/scapyt.py
+++ b/ooni/templates/scapyt.py
@@ -16,7 +16,7 @@ from ooni.nettest import NetTestCase
 from ooni.utils import log
 from ooni import config
 
-from ooni.utils.txscapy import ScapyProtocol, getDefaultIface
+from ooni.utils.txscapy import ScapySender, getDefaultIface
 
 class BaseScapyTest(NetTestCase):
     """
@@ -66,20 +66,8 @@ class BaseScapyTest(NetTestCase):
         else:
             config.check_TCPerror_seqack = 0
 
-        if config.advanced.interface == 'auto':
-            self.interface = getDefaultIface()
-        else:
-            self.interface = config.advanced.interface
-
-    def reportSentPacket(self, packet):
-        if 'sent_packets' not in self.report:
-            self.report['sent_packets'] = []
-        self.report['sent_packets'].append(packet)
-
-    def reportReceivedPacket(self, packet):
-        if 'answered_packets' not in self.report:
-            self.report['answered_packets'] = []
-        self.report['answered_packets'].append(packet)
+        self.report['sent_packets'] = []
+        self.report['answered_packets'] = []
 
     def finishedSendReceive(self, packets):
         """
@@ -98,8 +86,8 @@ class BaseScapyTest(NetTestCase):
                 sent_packet.src = '127.0.0.1'
                 received_packet.dst = '127.0.0.1'
 
-            self.reportSentPacket(sent_packet)
-            self.reportReceivedPacket(received_packet)
+            self.report['sent_packets'].append(sent_packet)
+            self.report['answered_packets'].append(received_packet)
         return packets
 
     def sr(self, packets, *arg, **kw):
@@ -107,8 +95,11 @@ class BaseScapyTest(NetTestCase):
         Wrapper around scapy.sendrecv.sr for sending and receiving of packets
         at layer 3.
         """
-        scapyProtocol = ScapyProtocol(interface=self.interface, *arg, **kw)
-        d = scapyProtocol.startSending(packets)
+        scapySender = ScapySender()
+
+        config.scapyFactory.registerProtocol(scapySender)
+
+        d = scapySender.startSending(packets)
         d.addCallback(self.finishedSendReceive)
         return d
 
@@ -123,12 +114,15 @@ class BaseScapyTest(NetTestCase):
                 return packets[0][0][1]
             except IndexError:
                 log.err("Got no response...")
-                return None
+                return packets
+
+        scapySender = ScapySender()
+        scapySender.expected_answers = 1
+
+        config.scapyFactory.registerProtocol(scapySender)
 
-        scapyProtocol = ScapyProtocol(interface=self.interface, *arg, **kw)
-        scapyProtocol.expected_answers = 1
         log.debug("Running sr1")
-        d = scapyProtocol.startSending(packets)
+        d = scapySender.startSending(packets)
         log.debug("Started to send")
         d.addCallback(self.finishedSendReceive)
         d.addCallback(done)
@@ -138,9 +132,13 @@ class BaseScapyTest(NetTestCase):
         """
         Wrapper around scapy.sendrecv.send for sending of packets at layer 3
         """
-        scapyProtocol = ScapyProtocol(interface=self.interface, *arg, **kw)
-        scapyProtocol.sendPackets(packets)
-        scapyProtocol.stopSending()
+        scapySender = ScapySender()
+
+        config.scapyFactory.registerProtocol(scapySender)
+
+        scapySender.sendPackets(packets)
+
+        scapySender.stopSending()
         for packet in packets:
             self.reportSentPacket(packet)
 
diff --git a/ooni/utils/net.py b/ooni/utils/net.py
index 649dc64..df98412 100644
--- a/ooni/utils/net.py
+++ b/ooni/utils/net.py
@@ -82,29 +82,6 @@ class BodyReceiver(protocol.Protocol):
     def connectionLost(self, reason):
         self.finished.callback(self.data)
 
-def capturePackets(pcap_filename):
-    from scapy.all import sniff
-    global stop_packet_capture
-    stop_packet_capture = False
-
-    def stopCapture():
-        # XXX this is a bit of a hack to stop capturing packets when we close
-        # the reactor. Ideally we would want to be able to do this
-        # programmatically, but this requires some work on implementing
-        # properly the sniff function with deferreds.
-        global stop_packet_capture
-        stop_packet_capture = True
-
-    def writePacketToPcap(pkt):
-        from scapy.all import utils
-        pcapwriter = txscapy.TXPcapWriter(pcap_filename, append=True)
-        pcapwriter.write(pkt)
-        if stop_packet_capture:
-            sys.exit(1)
-    d = threads.deferToThread(sniff, lfilter=writePacketToPcap)
-    reactor.addSystemEventTrigger('before', 'shutdown', stopCapture)
-    return d
-
 def getSystemResolver():
     """
     XXX implement a function that returns the resolver that is currently
diff --git a/ooni/utils/txscapy.py b/ooni/utils/txscapy.py
index 679eb42..8e479f8 100644
--- a/ooni/utils/txscapy.py
+++ b/ooni/utils/txscapy.py
@@ -32,7 +32,9 @@ try:
     config.pcap_dnet = True
 
 except ImportError, e:
-    log.err("pypcap or dnet not installed. Certain tests may not work.")
+    log.err("pypcap or dnet not installed. "
+            "Certain tests may not work.")
+
     config.pcap_dnet = False
     conf.use_pcap = False
     conf.use_dnet = False
@@ -59,6 +61,9 @@ def getNetworksFromRoutes():
 
     return networks
 
+class IfaceError(Exception):
+    pass
+
 def getDefaultIface():
     networks = getNetworksFromRoutes()
     for net in networks:
@@ -66,45 +71,100 @@ def getDefaultIface():
             return net.iface
     raise IfaceError
 
-class TXPcapWriter(PcapWriter):
-    def __init__(self, *arg, **kw):
-        PcapWriter.__init__(self, *arg, **kw)
-        fdesc.setNonBlocking(self.f)
+class ProtocolNotRegistered(Exception):
+    pass
 
-class ScapyProtocol(abstract.FileDescriptor):
+class ProtocolAlreadyRegistered(Exception):
+    pass
+
+class ScapyFactory(abstract.FileDescriptor):
+    """
+    Inspired by muxTCP scapyLink:
+    https://github.com/enki/muXTCP/blob/master/scapyLink.py
+    """
     def __init__(self, interface, super_socket=None, timeout=5):
         abstract.FileDescriptor.__init__(self, reactor)
+        if interface == 'auto':
+            interface = getDefaultIface()
         if not super_socket:
-            super_socket = conf.L3socket(iface=interface, promisc=True, filter='')
+            super_socket = conf.L3socket(iface=interface,
+                    promisc=True, filter='')
             #super_socket = conf.L2socket(iface=interface)
 
+        self.protocols = []
         fdesc._setCloseOnExec(super_socket.ins.fileno())
         self.super_socket = super_socket
 
-        self.interface = interface
-        self.timeout = timeout
+    def writeSomeData(self, data):
+        """
+        XXX we actually want to use this, but this requires overriding doWrite
+        or writeSequence.
+        """
+        pass
 
-        # This dict is used to store the unique hashes that allow scapy to
-        # match up request with answer
-        self.hr_sent_packets = {}
+    def send(self, packet):
+        """
+        Write a scapy packet to the wire.
+        """
+        return self.super_socket.send(packet)
 
-        # These are the packets we have received as answer to the ones we sent
-        self.answered_packets = []
+    def fileno(self):
+        return self.super_socket.ins.fileno()
 
-        # These are the packets we send
-        self.sent_packets = []
+    def doRead(self):
+        packet = self.super_socket.recv(MTU)
+        for protocol in self.protocols:
+            protocol.packetReceived(packet)
+
+    def registerProtocol(self, protocol):
+        if not self.connected:
+            self.startReading()
+
+        if protocol not in self.protocols:
+            protocol.factory = self
+            self.protocols.append(protocol)
+        else:
+            raise ProtocolAlreadyRegistered
+
+    def unRegisterProtocol(self, protocol):
+        if protocol in self.protocols:
+            self.protocols.remove(protocol)
+            if len(self.protocols) == 0:
+                self.loseConnection()
+        else:
+            raise ProtocolNotRegistered
+
+class ScapyProtocol(object):
+    factory = None
+
+    def packetReceived(self, packet):
+        """
+        When you register a protocol, this method will be called with argument
+        the packet it received.
 
-        # This deferred will fire when we have finished sending a receiving packets.
-        self.d = defer.Deferred()
-        # Should we look for multiple answers for the same sent packet?
-        self.multi = False
+        Every protocol that is registered will have this method called.
+        """
+        raise NotImplementedError
 
-        # When 0 we stop when all the packets we have sent have received an
-        # answer
-        self.expected_answers = 0
+class ScapySender(ScapyProtocol):
+    timeout = 5
+    # This dict is used to store the unique hashes that allow scapy to
+    # match up request with answer
+    hr_sent_packets = {}
 
-    def fileno(self):
-        return self.super_socket.ins.fileno()
+    # These are the packets we have received as answer to the ones we sent
+    answered_packets = []
+
+    # These are the packets we send
+    sent_packets = []
+
+    # This deferred will fire when we have finished sending a receiving packets.
+    # Should we look for multiple answers for the same sent packet?
+    multi = False
+
+    # When 0 we stop when all the packets we have sent have received an
+    # answer
+    expected_answers = 0
 
     def processPacket(self, packet):
         """
@@ -131,11 +191,10 @@ class ScapyProtocol(abstract.FileDescriptor):
             log.debug("Got the number of expected answers")
             self.stopSending()
 
-    def doRead(self):
+    def packetReceived(self, packet):
         timeout = time.time() - self._start_time
         if self.timeout and time.time() - self._start_time > self.timeout:
             self.stopSending()
-        packet = self.super_socket.recv(MTU)
         if packet:
             self.processPacket(packet)
             # A string that has the same value for the request than for the
@@ -146,18 +205,9 @@ class ScapyProtocol(abstract.FileDescriptor):
                 self.processAnswer(packet, answer_hr)
 
     def stopSending(self):
-        self.stopReading()
-        self.super_socket.close()
-        if hasattr(self, "d"):
-            result = (self.answered_packets, self.sent_packets)
-            self.d.callback(result)
-            del self.d
-
-    def write(self, packet):
-        """
-        Write a scapy packet to the wire.
-        """
-        return self.super_socket.send(packet)
+        result = (self.answered_packets, self.sent_packets)
+        self.d.callback(result)
+        self.factory.unRegisterProtocol(self)
 
     def sendPackets(self, packets):
         if not isinstance(packets, Gen):
@@ -169,12 +219,18 @@ class ScapyProtocol(abstract.FileDescriptor):
             else:
                 self.hr_sent_packets[hashret] = [packet]
             self.sent_packets.append(packet)
-            self.write(packet)
+            self.factory.send(packet)
 
     def startSending(self, packets):
         self._start_time = time.time()
-        self.startReading()
+        self.d = defer.Deferred()
         self.sendPackets(packets)
         return self.d
 
+class ScapySniffer(ScapyProtocol):
+    def __init__(self, pcap_filename, *arg, **kw):
+        self.pcapwriter = PcapWriter(pcap_filename, *arg, **kw)
+
+    def packetReceived(self, packet):
+        self.pcapwriter.write(packet)
 



More information about the tor-commits mailing list