[tor-commits] [ooni-probe/master] Make the sniffer not run in a separate thread, but use a non blocking fdesc
art at torproject.org
art at torproject.org
Sun Nov 25 14:47:21 UTC 2012
commit 615ce75c47aec249b6b5a4c0b58fdf7a93f09582
Author: Arturo Filastò <art at fuffa.org>
Date: Sun Nov 25 10:08:47 2012 +0100
Make the sniffer not run in a separate thread, but use a non blocking fdesc
* Do some refactoring of scapy testing, following Factory creational pattern
and a pub-sub pattern for the readers and writers (inspired by muxTCP).
* Other misc refactoring
---
ooni/oonicli.py | 6 ++-
ooni/templates/scapyt.py | 50 ++++++++---------
ooni/utils/net.py | 23 --------
ooni/utils/txscapy.py | 138 ++++++++++++++++++++++++++++++++--------------
4 files changed, 126 insertions(+), 91 deletions(-)
diff --git a/ooni/oonicli.py b/ooni/oonicli.py
index 5c582b2..1a316b3 100644
--- a/ooni/oonicli.py
+++ b/ooni/oonicli.py
@@ -100,6 +100,7 @@ def runTest(cmd_line_options):
classes = runner.findTestClassesFromFile(cmd_line_options['test'])
test_cases, options = runner.loadTestsAndOptions(classes, cmd_line_options)
if config.privacy.includepcap:
+ from ooni.utils.txscapy import ScapyFactory, ScapySniffer
try:
checkForRoot()
except NotRootError:
@@ -108,7 +109,10 @@ def runTest(cmd_line_options):
sys.exit(1)
print "Starting sniffer"
- net.capturePackets(config.reports.pcap)
+ config.scapyFactory = ScapyFactory(config.advanced.interface)
+
+ sniffer = ScapySniffer(config.reports.pcap)
+ config.scapyFactory.registerProtocol(sniffer)
return runner.runTestCases(test_cases, options, cmd_line_options)
diff --git a/ooni/templates/scapyt.py b/ooni/templates/scapyt.py
index 11b4381..cb02300 100644
--- a/ooni/templates/scapyt.py
+++ b/ooni/templates/scapyt.py
@@ -16,7 +16,7 @@ from ooni.nettest import NetTestCase
from ooni.utils import log
from ooni import config
-from ooni.utils.txscapy import ScapyProtocol, getDefaultIface
+from ooni.utils.txscapy import ScapySender, getDefaultIface
class BaseScapyTest(NetTestCase):
"""
@@ -66,20 +66,8 @@ class BaseScapyTest(NetTestCase):
else:
config.check_TCPerror_seqack = 0
- if config.advanced.interface == 'auto':
- self.interface = getDefaultIface()
- else:
- self.interface = config.advanced.interface
-
- def reportSentPacket(self, packet):
- if 'sent_packets' not in self.report:
- self.report['sent_packets'] = []
- self.report['sent_packets'].append(packet)
-
- def reportReceivedPacket(self, packet):
- if 'answered_packets' not in self.report:
- self.report['answered_packets'] = []
- self.report['answered_packets'].append(packet)
+ self.report['sent_packets'] = []
+ self.report['answered_packets'] = []
def finishedSendReceive(self, packets):
"""
@@ -98,8 +86,8 @@ class BaseScapyTest(NetTestCase):
sent_packet.src = '127.0.0.1'
received_packet.dst = '127.0.0.1'
- self.reportSentPacket(sent_packet)
- self.reportReceivedPacket(received_packet)
+ self.report['sent_packets'].append(sent_packet)
+ self.report['answered_packets'].append(received_packet)
return packets
def sr(self, packets, *arg, **kw):
@@ -107,8 +95,11 @@ class BaseScapyTest(NetTestCase):
Wrapper around scapy.sendrecv.sr for sending and receiving of packets
at layer 3.
"""
- scapyProtocol = ScapyProtocol(interface=self.interface, *arg, **kw)
- d = scapyProtocol.startSending(packets)
+ scapySender = ScapySender()
+
+ config.scapyFactory.registerProtocol(scapySender)
+
+ d = scapySender.startSending(packets)
d.addCallback(self.finishedSendReceive)
return d
@@ -123,12 +114,15 @@ class BaseScapyTest(NetTestCase):
return packets[0][0][1]
except IndexError:
log.err("Got no response...")
- return None
+ return packets
+
+ scapySender = ScapySender()
+ scapySender.expected_answers = 1
+
+ config.scapyFactory.registerProtocol(scapySender)
- scapyProtocol = ScapyProtocol(interface=self.interface, *arg, **kw)
- scapyProtocol.expected_answers = 1
log.debug("Running sr1")
- d = scapyProtocol.startSending(packets)
+ d = scapySender.startSending(packets)
log.debug("Started to send")
d.addCallback(self.finishedSendReceive)
d.addCallback(done)
@@ -138,9 +132,13 @@ class BaseScapyTest(NetTestCase):
"""
Wrapper around scapy.sendrecv.send for sending of packets at layer 3
"""
- scapyProtocol = ScapyProtocol(interface=self.interface, *arg, **kw)
- scapyProtocol.sendPackets(packets)
- scapyProtocol.stopSending()
+ scapySender = ScapySender()
+
+ config.scapyFactory.registerProtocol(scapySender)
+
+ scapySender.sendPackets(packets)
+
+ scapySender.stopSending()
for packet in packets:
self.reportSentPacket(packet)
diff --git a/ooni/utils/net.py b/ooni/utils/net.py
index 649dc64..df98412 100644
--- a/ooni/utils/net.py
+++ b/ooni/utils/net.py
@@ -82,29 +82,6 @@ class BodyReceiver(protocol.Protocol):
def connectionLost(self, reason):
self.finished.callback(self.data)
-def capturePackets(pcap_filename):
- from scapy.all import sniff
- global stop_packet_capture
- stop_packet_capture = False
-
- def stopCapture():
- # XXX this is a bit of a hack to stop capturing packets when we close
- # the reactor. Ideally we would want to be able to do this
- # programmatically, but this requires some work on implementing
- # properly the sniff function with deferreds.
- global stop_packet_capture
- stop_packet_capture = True
-
- def writePacketToPcap(pkt):
- from scapy.all import utils
- pcapwriter = txscapy.TXPcapWriter(pcap_filename, append=True)
- pcapwriter.write(pkt)
- if stop_packet_capture:
- sys.exit(1)
- d = threads.deferToThread(sniff, lfilter=writePacketToPcap)
- reactor.addSystemEventTrigger('before', 'shutdown', stopCapture)
- return d
-
def getSystemResolver():
"""
XXX implement a function that returns the resolver that is currently
diff --git a/ooni/utils/txscapy.py b/ooni/utils/txscapy.py
index 679eb42..8e479f8 100644
--- a/ooni/utils/txscapy.py
+++ b/ooni/utils/txscapy.py
@@ -32,7 +32,9 @@ try:
config.pcap_dnet = True
except ImportError, e:
- log.err("pypcap or dnet not installed. Certain tests may not work.")
+ log.err("pypcap or dnet not installed. "
+ "Certain tests may not work.")
+
config.pcap_dnet = False
conf.use_pcap = False
conf.use_dnet = False
@@ -59,6 +61,9 @@ def getNetworksFromRoutes():
return networks
+class IfaceError(Exception):
+ pass
+
def getDefaultIface():
networks = getNetworksFromRoutes()
for net in networks:
@@ -66,45 +71,100 @@ def getDefaultIface():
return net.iface
raise IfaceError
-class TXPcapWriter(PcapWriter):
- def __init__(self, *arg, **kw):
- PcapWriter.__init__(self, *arg, **kw)
- fdesc.setNonBlocking(self.f)
+class ProtocolNotRegistered(Exception):
+ pass
-class ScapyProtocol(abstract.FileDescriptor):
+class ProtocolAlreadyRegistered(Exception):
+ pass
+
+class ScapyFactory(abstract.FileDescriptor):
+ """
+ Inspired by muxTCP scapyLink:
+ https://github.com/enki/muXTCP/blob/master/scapyLink.py
+ """
def __init__(self, interface, super_socket=None, timeout=5):
abstract.FileDescriptor.__init__(self, reactor)
+ if interface == 'auto':
+ interface = getDefaultIface()
if not super_socket:
- super_socket = conf.L3socket(iface=interface, promisc=True, filter='')
+ super_socket = conf.L3socket(iface=interface,
+ promisc=True, filter='')
#super_socket = conf.L2socket(iface=interface)
+ self.protocols = []
fdesc._setCloseOnExec(super_socket.ins.fileno())
self.super_socket = super_socket
- self.interface = interface
- self.timeout = timeout
+ def writeSomeData(self, data):
+ """
+ XXX we actually want to use this, but this requires overriding doWrite
+ or writeSequence.
+ """
+ pass
- # This dict is used to store the unique hashes that allow scapy to
- # match up request with answer
- self.hr_sent_packets = {}
+ def send(self, packet):
+ """
+ Write a scapy packet to the wire.
+ """
+ return self.super_socket.send(packet)
- # These are the packets we have received as answer to the ones we sent
- self.answered_packets = []
+ def fileno(self):
+ return self.super_socket.ins.fileno()
- # These are the packets we send
- self.sent_packets = []
+ def doRead(self):
+ packet = self.super_socket.recv(MTU)
+ for protocol in self.protocols:
+ protocol.packetReceived(packet)
+
+ def registerProtocol(self, protocol):
+ if not self.connected:
+ self.startReading()
+
+ if protocol not in self.protocols:
+ protocol.factory = self
+ self.protocols.append(protocol)
+ else:
+ raise ProtocolAlreadyRegistered
+
+ def unRegisterProtocol(self, protocol):
+ if protocol in self.protocols:
+ self.protocols.remove(protocol)
+ if len(self.protocols) == 0:
+ self.loseConnection()
+ else:
+ raise ProtocolNotRegistered
+
+class ScapyProtocol(object):
+ factory = None
+
+ def packetReceived(self, packet):
+ """
+ When you register a protocol, this method will be called with argument
+ the packet it received.
- # This deferred will fire when we have finished sending a receiving packets.
- self.d = defer.Deferred()
- # Should we look for multiple answers for the same sent packet?
- self.multi = False
+ Every protocol that is registered will have this method called.
+ """
+ raise NotImplementedError
- # When 0 we stop when all the packets we have sent have received an
- # answer
- self.expected_answers = 0
+class ScapySender(ScapyProtocol):
+ timeout = 5
+ # This dict is used to store the unique hashes that allow scapy to
+ # match up request with answer
+ hr_sent_packets = {}
- def fileno(self):
- return self.super_socket.ins.fileno()
+ # These are the packets we have received as answer to the ones we sent
+ answered_packets = []
+
+ # These are the packets we send
+ sent_packets = []
+
+ # This deferred will fire when we have finished sending a receiving packets.
+ # Should we look for multiple answers for the same sent packet?
+ multi = False
+
+ # When 0 we stop when all the packets we have sent have received an
+ # answer
+ expected_answers = 0
def processPacket(self, packet):
"""
@@ -131,11 +191,10 @@ class ScapyProtocol(abstract.FileDescriptor):
log.debug("Got the number of expected answers")
self.stopSending()
- def doRead(self):
+ def packetReceived(self, packet):
timeout = time.time() - self._start_time
if self.timeout and time.time() - self._start_time > self.timeout:
self.stopSending()
- packet = self.super_socket.recv(MTU)
if packet:
self.processPacket(packet)
# A string that has the same value for the request than for the
@@ -146,18 +205,9 @@ class ScapyProtocol(abstract.FileDescriptor):
self.processAnswer(packet, answer_hr)
def stopSending(self):
- self.stopReading()
- self.super_socket.close()
- if hasattr(self, "d"):
- result = (self.answered_packets, self.sent_packets)
- self.d.callback(result)
- del self.d
-
- def write(self, packet):
- """
- Write a scapy packet to the wire.
- """
- return self.super_socket.send(packet)
+ result = (self.answered_packets, self.sent_packets)
+ self.d.callback(result)
+ self.factory.unRegisterProtocol(self)
def sendPackets(self, packets):
if not isinstance(packets, Gen):
@@ -169,12 +219,18 @@ class ScapyProtocol(abstract.FileDescriptor):
else:
self.hr_sent_packets[hashret] = [packet]
self.sent_packets.append(packet)
- self.write(packet)
+ self.factory.send(packet)
def startSending(self, packets):
self._start_time = time.time()
- self.startReading()
+ self.d = defer.Deferred()
self.sendPackets(packets)
return self.d
+class ScapySniffer(ScapyProtocol):
+ def __init__(self, pcap_filename, *arg, **kw):
+ self.pcapwriter = PcapWriter(pcap_filename, *arg, **kw)
+
+ def packetReceived(self, packet):
+ self.pcapwriter.write(packet)
More information about the tor-commits
mailing list