[tor-commits] [depictor/master] Ingest historical bwauth statistics data

tom at torproject.org tom at torproject.org
Wed May 3 17:52:58 UTC 2017

commit e7a7dce049904664f0c1d7ba99ff76779c23e9ea
Author: Tom Ritter <tom at ritter.vg>
Date:   Fri Apr 14 23:42:30 2017 -0500

    Ingest historical bwauth statistics data
    Update the parseOldConsensuses.py script to ingest historical
    bwauth data. Note that this script is generally run one time
    and is not intended to be a well-maintained script for future use.
    It needs care and feeding for each major run.
    Additionally create a mergeDatabase script. This script...
    probably does not do what we want. It's included mostly as a
    placeholder for future development if we want to correct and use
    it in the future.
 mergeDatabases.py      |  66 ++++++++++++++++
 parseOldConsensuses.py | 208 ++++++++++++++++++++++++++++++++++++++++++++-----
 utility.py             |   8 ++
 3 files changed, 263 insertions(+), 19 deletions(-)

diff --git a/mergeDatabases.py b/mergeDatabases.py
new file mode 100755
index 0000000..ac025f5
--- /dev/null
+++ b/mergeDatabases.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+import os
+import sys
+import time
+import sqlite3
+import datetime
+import operator
+import traceback
+import subprocess
+if __name__ == '__main__':
+	if len(sys.argv) != 3:
+		print "Usage: ", sys.argv[0], "src.db dest.db"
+		print "\tMerge all the data from src into dest"
+		sys.exit(1)
+	if not os.path.isfile(sys.argv[1]):
+		print "Source is not a file"
+		sys.exit(1)
+	if not os.path.isfile(sys.argv[2]):
+		print "Dest is not a file"
+		sys.exit(1)
+	src = sqlite3.connect(sys.argv[1])
+	dst = sqlite3.connect(sys.argv[2])
+	s_tbls = src.execute("SELECT name FROM sqlite_master WHERE type = 'table'")
+	for t in s_tbls:
+		t = t[0]
+		skip_table = False
+		d_tbl = dst.execute("SELECT name FROM sqlite_master WHERE type = 'table' and name = ?", (t,))
+		if not d_tbl.fetchone():
+			print "Skipping table", t, "which is in src but not in dst"
+			continue
+		s_cols = src.execute("PRAGMA table_info(" + t + ")")
+		d_cols = dst.execute("PRAGMA table_info(" + t + ")")
+		s_cols = s_cols.fetchall()
+		d_cols = d_cols.fetchall()
+		if len(s_cols) != len(d_cols):
+			print "Skipping table", t, "which has", len(s_cols), "columns in src and", len(d_cols)
+			continue
+		for i in range(len(s_cols)):
+			if s_cols[i] != d_cols[i]:
+				print "Skipping table", t, "because column", 1, "is", s_cols[i], "in src and", d_cols[i], "in dst"
+				skip_table = True
+		if skip_table:
+			continue
+		print "Merging table", t
+		merged = 0
+		s = src.execute("SELECT * FROM " + t)
+		for r in s.fetchall():
+			date = r[0]
+			has_value = False
+			for v in r[1:]:
+				if v:
+					has_value = True
+			if has_value:
+				merged += 1
+				dst.execute("INSERT OR REPLACE INTO " + t + " VALUES (" + ",".join("?" * len(r)) + ")", r)
+				dst.commit()
+		print "Inserted or updated", merged, "rows"
\ No newline at end of file
diff --git a/parseOldConsensuses.py b/parseOldConsensuses.py
index 834c386..defa2f5 100755
--- a/parseOldConsensuses.py
+++ b/parseOldConsensuses.py
@@ -18,6 +18,7 @@ import stem.util.conf
 import stem.util.enum
 from stem import Flag
+from stem.descriptor.reader import DescriptorReader
 from stem.util.lru_cache import lru_cache
 def get_dirauths_in_tables():
@@ -48,17 +49,23 @@ def get_dirauth_from_filename(filename):
 		return "tor26"
 	elif key == "0232AF901C31A04EE9848595AF9BB7620D4C5B2E" or key == "585769C78764D58426B8B52B6651A5A71137189A":
 		return "dannenberg"
-        elif key == "27B6B5996C426270A5C95488AA5BCEB6BCC86956":
-                return "turtles"
+	elif key == "27B6B5996C426270A5C95488AA5BCEB6BCC86956":
+		return "turtles"
 		raise Exception("Unexpcected dirauth key: " + key + " " + filename)
 def unix_time(dt):
     return (dt - datetime.datetime.utcfromtimestamp(0)).total_seconds() * 1000.0
+def ut_to_datetime(ut):
+	return datetime.datetime.utcfromtimestamp(ut / 1000)
+def ut_to_datetime_format(ut):
+	return ut_to_datetime(ut).strftime("%Y-%m-%d-%H-%M-%S")
 def get_time_from_filename(filename):
 	voteTime = filename.split('-')
-	if len(voteTime) < 9:
+	if len(voteTime) < 7:
 		raise Exception("Strange filename: " + filename)
 	v = [int(x) for x in filename.split('-')[0:6]]
@@ -66,26 +73,30 @@ def get_time_from_filename(filename):
 	voteTime = unix_time(voteTime)
 	return voteTime
-def main(dir):
-	dirAuths = get_dirauths_in_tables()
-	dbc = sqlite3.connect(os.path.join('data', 'historical.db'))
+def dirauth_relay_votes(directory, dirAuths, dbc):
 	dirauth_columns = ""
 	dirauth_columns_questions = ""
 	for d in dirAuths:
 		dirauth_columns += d + "_known integer, " + d + "_running integer, " + d + "_bwauth integer, "
 		dirauth_columns_questions += ",?,?,?"
+	dbc.execute("CREATE TABLE IF NOT EXISTS vote_data(date integer, " + dirauth_columns + "PRIMARY KEY(date ASC))")
+	dbc.commit()
 	votes = {}
-	for root, dirs, files in os.walk(dir):
+	for root, dirs, files in os.walk(directory):
 		for f in files:
-                        filepath = os.path.join(root, f)
-                        print filepath
+			filepath = os.path.join(root, f)
+			print filepath
 			if '"' in f:
 				raise Exception("Potentially malicious filename")
-                        elif "votes-" in f and ".tar" in f:
-                                continue
+			elif "votes-" in f and ".tar" in f:
+				continue
+			elif "consensuses-" in f and ".tar" in f:
+				continue
+			elif "-vote-" not in f:
+				continue
 			voteTime = get_time_from_filename(f)
 			if voteTime not in votes:
@@ -104,11 +115,8 @@ def main(dir):
 			votes[voteTime][dirauth]['running'] = int(subprocess.check_output('egrep "^s " "' + filepath + '" | grep " Running" | wc -l', shell=True))
 			votes[voteTime][dirauth]['bwlines'] = int(subprocess.check_output('grep Measured= "' + filepath + '" | wc -l', shell=True))
-	dbc.execute("CREATE TABLE IF NOT EXISTS vote_data(date integer, " + dirauth_columns + "PRIMARY KEY(date ASC))")
-	dbc.commit()
 	for t in votes:
-		print t
+		print ut_to_datetime(t)
 		print "\t", len(votes[t])
 		for d in votes[t]:
 			print "\t", d, votes[t][d]['bwlines'], votes[t][d]['running']
@@ -127,13 +135,175 @@ def main(dir):
 		dbc.execute("INSERT OR REPLACE INTO vote_data VALUES (?" + dirauth_columns_questions + ")", insertValues)
+def bwauth_measurements(directory, dirAuths, dbc):
+	#Find all the consensuses and votesrm
+	votes = {}
+	consensuses = {}
+	for root, dirs, files in os.walk(directory):
+		for f in files:
+			filepath = os.path.join(root, f)
+			if '"' in f:
+				raise Exception("Potentially malicious filename")
+			elif "votes-" in f and ".tar" in f:
+				continue
+			elif "consensuses-" in f and ".tar" in f:
+				continue
+			if "-consensus" in f:
+				consensusTime = get_time_from_filename(f)
+				if consensusTime not in consensuses:
+					consensuses[consensusTime] = filepath
+				else:
+					print "Found two consensuses with the same time:", ut_to_datetime(consensusTime)
+				#print "Consensus:", filepath
+			elif "-vote-" in f:
+				voteTime = get_time_from_filename(f)
+				# Test to see if we already processed this one
+				cur = dbc.cursor()
+				cur.execute("SELECT * FROM bwauth_data WHERE date = ?", (voteTime,))
+				if cur.fetchone():
+					print "Skipping", f, "because we already processed it"
+					continue
+				elif voteTime not in votes:
+					votes[voteTime] = {}
+				dirauth = get_dirauth_from_filename(f)
+				if dirauth not in dirAuths:
+					raise Exception("Found a dirauth I don't know about (probably spelling): " + dirauth)
+				elif dirauth not in votes[voteTime]:
+					votes[voteTime][dirauth] = filepath
+				else:
+					print "Found two votes for dirauth " + dirauth + ":", filepath, "and", votes[voteTime][dirauth]
+				#print "Vote:", dirauth, filepath
+	#Make sure we have a consensus for each vote
+	to_del = []
+	for v in votes:
+		if v not in consensuses:
+			print "Have votes for time", ut_to_datetime(v), "but no consensus!"
+			to_del.append(v)
+			#sys.exit(1)
+	for i in to_del:
+		del votes[i]
+	#Make the table
+	bwauth_columns = ""
+	bwauth_columns_questions = ""
+	for d in dirAuths:
+		bwauth_columns += d + "_above integer, " + d + "_shared integer, " + d + "_exclusive integer, " + d + "_below integer, " + d + "_unmeasured integer, "
+		bwauth_columns_questions += ",?,?,?,?,?"
+	dbc.execute("CREATE TABLE IF NOT EXISTS bwauth_data(date integer, " + bwauth_columns + "PRIMARY KEY(date ASC))")
+	dbc.commit()
+	reviewed = 0
+	for v in votes:
+		reviewed += 1
+		print "Reviewing", consensuses[v], "(" + str(reviewed) + "/" + str(len(votes)) + ")"
+		#Get the consensus data
+		consensusRouters = {}
+		with DescriptorReader(consensuses[v]) as reader:
+			reader.register_skip_listener(my_listener)
+			for relay in reader:
+				consensusRouters[relay.fingerprint] = "Unmeasured" if relay.is_unmeasured else relay.bandwidth
+		#The vote data
+		bwauthVotes = {}
+		for d in votes[v]:
+			if d not in bwauthVotes:
+				bwauthVotes[d] = {}
+			measured_something = False
+			with DescriptorReader(votes[v][d]) as reader:
+				reader.register_skip_listener(my_listener)
+				for relay in reader:
+					if relay.measured:
+						bwauthVotes[d][relay.fingerprint] = relay.measured
+						measured_something = True
+			if not measured_something:
+				del bwauthVotes[d]
+		#Now match them up and store the data
+		thisConsensusResults = {}
+		for r in consensusRouters:
+			for d in bwauthVotes:
+				had_any_value = False
+				if d not in thisConsensusResults:
+					thisConsensusResults[d] = {'unmeasured' : 0, 'above' : 0, 'below' : 0, 'exclusive' : 0 , 'shared' : 0}
+				if consensusRouters[r] == "Unmeasured":
+					continue
+				elif r not in bwauthVotes[d]:
+					had_any_value = True
+					thisConsensusResults[d]['unmeasured'] += 1
+				elif consensusRouters[r] < bwauthVotes[d][r]:
+					had_any_value = True
+					thisConsensusResults[d]['above'] += 1
+				elif consensusRouters[r] > bwauthVotes[d][r]:
+					had_any_value = True
+					thisConsensusResults[d]['below'] += 1
+				elif consensusRouters[r] == bwauthVotes[d][r] and \
+					 1 == len([1 for d_i in bwauthVotes if d_i in bwauthVotes and r in bwauthVotes[d_i] and bwauthVotes[d_i][r] == consensusRouters[r]]):
+					had_any_value = True
+				 	thisConsensusResults[d]['exclusive'] += 1
+			 	elif consensusRouters[r] == bwauthVotes[d][r] and \
+					 1 != len([1 for d_i in bwauthVotes if d_i in bwauthVotes and r in bwauthVotes[d_i] and bwauthVotes[d_i][r] == consensusRouters[r] ]):
+				 	had_any_value = True
+				 	thisConsensusResults[d]['shared'] += 1
+			 	else:
+			 		print "What case am I in???"
+			 		sys.exit(1)
+			 	if not had_any_value:
+		 			del thisConsensusResults[d]
+ 		insertValues = [v]
+ 		for d in dirAuths: 
+ 			if d in thisConsensusResults:
+				insertValues.append(thisConsensusResults[d]['above'])
+				insertValues.append(thisConsensusResults[d]['shared'])
+				insertValues.append(thisConsensusResults[d]['exclusive'])
+				insertValues.append(thisConsensusResults[d]['below'])
+				insertValues.append(thisConsensusResults[d]['unmeasured'])
+			else:
+				insertValues.append(None)
+				insertValues.append(None)
+				insertValues.append(None)
+				insertValues.append(None)
+				insertValues.append(None)
+		dbc.execute("INSERT OR REPLACE INTO bwauth_data VALUES (?" + bwauth_columns_questions + ")", insertValues)
+		dbc.commit()
+def my_listener(path, exception):
+	print "Skipped!"
+	print path
+	print exception
+def main(itype, directory):
+	dirAuths = get_dirauths_in_tables()
+	dbc = sqlite3.connect(os.path.join('data', 'historical.db'))
+	if itype == "dirauth_relay_votes":
+		dirauth_relay_votes(directory, dirAuths, dbc)
+	elif itype == "bwauth_measurements":
+		bwauth_measurements(directory, dirAuths, dbc)
+	else:
+		print "Unknown ingestion type"
 if __name__ == '__main__':
-		if len(sys.argv) != 2:
-			print "Usage: ", sys.argv[0], "vote-directory"
+		if len(sys.argv) != 3:
+			print "Usage: ", sys.argv[0], "ingestion-type vote-directory"
-			main(sys.argv[1])
+			main(sys.argv[1], sys.argv[2])
 		msg = "%s failed with:\n\n%s" % (sys.argv[0], traceback.format_exc())
 		print "Error: %s" % msg
diff --git a/utility.py b/utility.py
index 6cebe6b..aeb7b8d 100644
--- a/utility.py
+++ b/utility.py
@@ -84,3 +84,11 @@ def _get_documents(label, resource):
 def unix_time(dt):
     return (dt - datetime.datetime.utcfromtimestamp(0)).total_seconds() * 1000.0
+def ut_to_datetime(ut):
+	return datetime.datetime.utcfromtimestamp(ut / 1000)
+def ut_to_datetime_format(ut):
+	return consensus_datetime_format(ut_to_datetime(ut))
+def consensus_datetime_format(dt):
+	return dt.strftime("%Y-%m-%d-%H-%M-%S")

More information about the tor-commits mailing list