[tor-commits] [ooni-probe/master] Refactoring of the ORG queue
art at torproject.org
art at torproject.org
Fri Apr 29 09:42:21 UTC 2016
commit 814b2879843a04450f1822dbdd42909906e876df
Author: Arturo Filastò <arturo at filasto.net>
Date: Fri Feb 5 13:33:27 2016 +0100
Refactoring of the ORG queue
* Do not start tor if we already have a tor state
* Fix failure to import GeoIP
---
ooni/director.py | 2 +-
ooni/geoip.py | 1 +
ooni/oonicli.py | 166 ++++++++++++++++++++++---------------------------------
3 files changed, 67 insertions(+), 102 deletions(-)
diff --git a/ooni/director.py b/ooni/director.py
index 9bac49d..f633048 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -130,7 +130,7 @@ class Director(object):
if start_tor:
if check_incoherences:
yield config.check_tor()
- if config.advanced.start_tor:
+ if config.advanced.start_tor and config.tor_state is None:
yield self.startTor()
elif config.tor.control_port and config.tor_state is None:
log.msg("Connecting to Tor Control Port...")
diff --git a/ooni/geoip.py b/ooni/geoip.py
index cb66675..11800b6 100644
--- a/ooni/geoip.py
+++ b/ooni/geoip.py
@@ -1,3 +1,4 @@
+from __future__ import absolute_import
import re
import os
import random
diff --git a/ooni/oonicli.py b/ooni/oonicli.py
index 47aacfb..fc5eca0 100644
--- a/ooni/oonicli.py
+++ b/ooni/oonicli.py
@@ -226,8 +226,8 @@ def setupCollector(global_options, net_test_loader):
return collector
-def createDeck(global_options,url=None,filename=None):
- log.msg("Creating deck for: %s" %(url or filename,) )
+def createDeck(global_options, url=None, filename=None):
+ log.msg("Creating deck for: %s" % (url or filename,))
deck = Deck(no_collector=global_options['no-collector'])
deck.bouncer = global_options['bouncer']
@@ -239,9 +239,9 @@ def createDeck(global_options,url=None,filename=None):
log.debug("No test deck detected")
test_file = nettest_to_path(global_options['test_file'], True)
if url is not None:
- args = ('-u',url)
+ args = ('-u', url)
elif filename is not None:
- args = ('-f',filename)
+ args = ('-f', filename)
else:
args = tuple()
if any(global_options['subargs']):
@@ -270,48 +270,13 @@ def createDeck(global_options,url=None,filename=None):
sys.exit(5)
return deck
-def runWithDirector(logging=True, start_tor=True, check_incoherences=True):
- """
- Instance the director, parse command line options and start an ooniprobe
- test!
- """
-
- global_options = setupGlobalOptions(logging, start_tor, check_incoherences)
-
- director = Director()
- if global_options['list']:
- print "# Installed nettests"
- for net_test_id, net_test in director.getNetTests().items():
- print "* %s (%s/%s)" % (net_test['name'],
- net_test['category'],
- net_test['id'])
- print " %s" % net_test['description']
-
- sys.exit(0)
-
- elif global_options['printdeck']:
- del global_options['printdeck']
- print "# Copy and paste the lines below into a test deck to run the specified test with the specified arguments"
- print yaml.safe_dump([{'options': global_options}]).strip()
-
- sys.exit(0)
-
- if global_options.get('annotations') is not None:
- annotations = setupAnnotations(global_options)
-
- if global_options['no-collector']:
- log.msg("Not reporting using a collector")
- global_options['collector'] = None
- start_tor = False
- else:
- start_tor = True
-
- if global_options['collector']:
- start_tor |= True
- deck = createDeck(global_options)
+def runTestWithDirector(director, global_options, url=None, filename=None,
+ start_tor=True, check_incoherences=True):
+ deck = createDeck(global_options, url=url, filename=filename)
start_tor |= deck.requiresTor
+
d = director.start(start_tor=start_tor,
check_incoherences=check_incoherences)
@@ -341,18 +306,59 @@ def runWithDirector(logging=True, start_tor=True, check_incoherences=True):
test_details['annotations'] = global_options['annotations']
director.startNetTest(net_test_loader,
- global_options['reportfile'],
- collector)
+ global_options['reportfile'],
+ collector)
return director.allTestsDone
-
d.addCallback(setup_nettest)
d.addCallback(post_director_start)
d.addErrback(director_startup_handled_failures)
d.addErrback(director_startup_other_failures)
return d
-
+def runWithDirector(logging=True, start_tor=True, check_incoherences=True):
+ """
+ Instance the director, parse command line options and start an ooniprobe
+ test!
+ """
+
+ global_options = setupGlobalOptions(logging, start_tor, check_incoherences)
+
+ director = Director()
+ if global_options['list']:
+ print "# Installed nettests"
+ for net_test_id, net_test in director.getNetTests().items():
+ print "* %s (%s/%s)" % (net_test['name'],
+ net_test['category'],
+ net_test['id'])
+ print " %s" % net_test['description']
+
+ sys.exit(0)
+
+ elif global_options['printdeck']:
+ del global_options['printdeck']
+ print "# Copy and paste the lines below into a test deck to run the specified test with the specified arguments"
+ print yaml.safe_dump([{'options': global_options}]).strip()
+
+ sys.exit(0)
+
+ if global_options.get('annotations') is not None:
+ annotations = setupAnnotations(global_options)
+
+ if global_options['no-collector']:
+ log.msg("Not reporting using a collector")
+ global_options['collector'] = None
+ start_tor = False
+ else:
+ start_tor = True
+
+ if global_options['collector']:
+ start_tor |= True
+
+ return runTestWithDirector(director=director,
+ global_options=global_options,
+ check_incoherences=check_incoherences)
+
# this variant version of runWithDirector splits the process in two,
# allowing a single director instance to be reused with multiple decks.
@@ -373,7 +379,6 @@ def runWithDaemonDirector(logging=True, start_tor=True, check_incoherences=True)
sys.exit(7)
-
global_options = setupGlobalOptions(logging, start_tor, check_incoherences)
director = Director()
@@ -388,80 +393,40 @@ def runWithDaemonDirector(logging=True, start_tor=True, check_incoherences=True)
else:
start_tor = True
-
- def run_test(global_options, url=None, filename=None):
- assert url is not None or filename is not None
-
- deck = createDeck(global_options, url=url, filename=filename)
-
- d = director.start(start_tor=True,
- check_incoherences=check_incoherences)
-
- def setup_nettest(_):
- try:
- return deck.setup()
- except errors.UnableToLoadDeckInput as error:
- return defer.failure.Failure(error)
-
-
-
- # Wait until director has started up (including bootstrapping Tor)
- # before adding tests
- def post_director_start(_):
- for net_test_loader in deck.netTestLoaders:
- # Decks can specify different collectors
- # for each net test, so that each NetTest
- # may be paired with a test_helper and its collector
- # However, a user can override this behavior by
- # specifying a collector from the command-line (-c).
- # If a collector is not specified in the deck, or the
- # deck is a singleton, the default collector set in
- # ooniprobe.conf will be used
-
- collector = setupCollector(global_options, net_test_loader)
-
- test_details = net_test_loader.testDetails
- test_details['annotations'] = global_options['annotations']
-
- director.startNetTest(net_test_loader,
- global_options['reportfile'],
- collector)
- return director.allTestsDone
-
- d.addCallback(setup_nettest)
- d.addCallback(post_director_start)
- d.addErrback(director_startup_handled_failures)
- d.addErrback(director_startup_other_failures)
- return d
-
finished = defer.Deferred()
@defer.inlineCallbacks
- def readmsg(_,channel, queue_object, consumer_tag, counter):
+ def readmsg(_, channel, queue_object, consumer_tag, counter):
# Wait for a message and decode it.
if counter >= lifetime:
+ log.msg("Counter")
queue_object.close(LifetimeExceeded())
yield channel.basic_cancel(consumer_tag=consumer_tag)
finished.callback(None)
else:
log.msg("Waiting for message")
-
+
try:
ch, method, properties, body = yield queue_object.get()
log.msg("Got message")
data = json.loads(body)
counter += 1
- log.msg("Received %d/%d: %s" %(counter, lifetime, data['url'],))
+ log.msg("Received %d/%d: %s" % (counter, lifetime, data['url'],))
# acknowledge the message
ch.basic_ack(delivery_tag=method.delivery_tag)
- d = run_test(global_options, url=data['url'].encode('utf8'))
+ d = runTestWithDirector(director=director,
+ global_options=global_options,
+ url=data['url'].encode('utf8'),
+ check_incoherences=check_incoherences)
# When the test has been completed, go back to waiting for a message.
d.addCallback(readmsg, channel, queue_object, consumer_tag, counter+1)
except exceptions.AMQPError,v:
+ log.msg("Error")
+ log.exception(v)
finished.errback(v)
@@ -474,7 +439,7 @@ def runWithDaemonDirector(logging=True, start_tor=True, check_incoherences=True)
queue_object, consumer_tag = yield channel.basic_consume(
queue=name,
no_ack=False)
- readmsg(None,channel,queue_object,consumer_tag, 0)
+ readmsg(None, channel, queue_object, consumer_tag, 0)
@@ -484,10 +449,9 @@ def runWithDaemonDirector(logging=True, start_tor=True, check_incoherences=True)
urlargs = dict(urlparse.parse_qsl(urlp.query))
# random lifetime requests counter
- lifetime = random.randint(820,1032)
+ lifetime = random.randint(820, 1032)
# AMQP connection details are sent through the cmdline parameter '-Q'
-
creds = pika.PlainCredentials(urlp.username or 'guest',
urlp.password or 'guest')
parameters = pika.ConnectionParameters(urlp.hostname,
More information about the tor-commits
mailing list