[tor-commits] [depictor/master] Ingest historical bwauth statistics data
tom at torproject.org
tom at torproject.org
Wed May 3 17:52:58 UTC 2017
commit e7a7dce049904664f0c1d7ba99ff76779c23e9ea
Author: Tom Ritter <tom at ritter.vg>
Date: Fri Apr 14 23:42:30 2017 -0500
Ingest historical bwauth statistics data
Update the parseOldConsensuses.py script to ingest historical
bwauth data. Note that this script is generally run one time
and is not intended to be a well-maintained script for future use.
It needs care and feeding for each major run.
Additionally create a mergeDatabase script. This script...
probably does not do what we want. It's included mostly as a
placeholder for future development if we want to correct and use
it in the future.
---
mergeDatabases.py | 66 ++++++++++++++++
parseOldConsensuses.py | 208 ++++++++++++++++++++++++++++++++++++++++++++-----
utility.py | 8 ++
3 files changed, 263 insertions(+), 19 deletions(-)
diff --git a/mergeDatabases.py b/mergeDatabases.py
new file mode 100755
index 0000000..ac025f5
--- /dev/null
+++ b/mergeDatabases.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+
+import os
+import sys
+import time
+import sqlite3
+import datetime
+import operator
+import traceback
+import subprocess
+
+if __name__ == '__main__':
+ if len(sys.argv) != 3:
+ print "Usage: ", sys.argv[0], "src.db dest.db"
+ print "\tMerge all the data from src into dest"
+ sys.exit(1)
+
+ if not os.path.isfile(sys.argv[1]):
+ print "Source is not a file"
+ sys.exit(1)
+ if not os.path.isfile(sys.argv[2]):
+ print "Dest is not a file"
+ sys.exit(1)
+
+ src = sqlite3.connect(sys.argv[1])
+ dst = sqlite3.connect(sys.argv[2])
+
+ s_tbls = src.execute("SELECT name FROM sqlite_master WHERE type = 'table'")
+ for t in s_tbls:
+ t = t[0]
+ skip_table = False
+
+ d_tbl = dst.execute("SELECT name FROM sqlite_master WHERE type = 'table' and name = ?", (t,))
+ if not d_tbl.fetchone():
+ print "Skipping table", t, "which is in src but not in dst"
+ continue
+
+ s_cols = src.execute("PRAGMA table_info(" + t + ")")
+ d_cols = dst.execute("PRAGMA table_info(" + t + ")")
+ s_cols = s_cols.fetchall()
+ d_cols = d_cols.fetchall()
+ if len(s_cols) != len(d_cols):
+ print "Skipping table", t, "which has", len(s_cols), "columns in src and", len(d_cols)
+ continue
+ for i in range(len(s_cols)):
+ if s_cols[i] != d_cols[i]:
+ print "Skipping table", t, "because column", 1, "is", s_cols[i], "in src and", d_cols[i], "in dst"
+ skip_table = True
+
+ if skip_table:
+ continue
+
+ print "Merging table", t
+ merged = 0
+ s = src.execute("SELECT * FROM " + t)
+ for r in s.fetchall():
+ date = r[0]
+ has_value = False
+ for v in r[1:]:
+ if v:
+ has_value = True
+ if has_value:
+ merged += 1
+ dst.execute("INSERT OR REPLACE INTO " + t + " VALUES (" + ",".join("?" * len(r)) + ")", r)
+ dst.commit()
+ print "Inserted or updated", merged, "rows"
\ No newline at end of file
diff --git a/parseOldConsensuses.py b/parseOldConsensuses.py
index 834c386..defa2f5 100755
--- a/parseOldConsensuses.py
+++ b/parseOldConsensuses.py
@@ -18,6 +18,7 @@ import stem.util.conf
import stem.util.enum
from stem import Flag
+from stem.descriptor.reader import DescriptorReader
from stem.util.lru_cache import lru_cache
def get_dirauths_in_tables():
@@ -48,17 +49,23 @@ def get_dirauth_from_filename(filename):
return "tor26"
elif key == "0232AF901C31A04EE9848595AF9BB7620D4C5B2E" or key == "585769C78764D58426B8B52B6651A5A71137189A":
return "dannenberg"
- elif key == "27B6B5996C426270A5C95488AA5BCEB6BCC86956":
- return "turtles"
+ elif key == "27B6B5996C426270A5C95488AA5BCEB6BCC86956":
+ return "turtles"
else:
raise Exception("Unexpcected dirauth key: " + key + " " + filename)
def unix_time(dt):
return (dt - datetime.datetime.utcfromtimestamp(0)).total_seconds() * 1000.0
+def ut_to_datetime(ut):
+ return datetime.datetime.utcfromtimestamp(ut / 1000)
+
+def ut_to_datetime_format(ut):
+ return ut_to_datetime(ut).strftime("%Y-%m-%d-%H-%M-%S")
+
def get_time_from_filename(filename):
voteTime = filename.split('-')
- if len(voteTime) < 9:
+ if len(voteTime) < 7:
raise Exception("Strange filename: " + filename)
v = [int(x) for x in filename.split('-')[0:6]]
@@ -66,26 +73,30 @@ def get_time_from_filename(filename):
voteTime = unix_time(voteTime)
return voteTime
-def main(dir):
- dirAuths = get_dirauths_in_tables()
- dbc = sqlite3.connect(os.path.join('data', 'historical.db'))
-
+def dirauth_relay_votes(directory, dirAuths, dbc):
dirauth_columns = ""
dirauth_columns_questions = ""
for d in dirAuths:
dirauth_columns += d + "_known integer, " + d + "_running integer, " + d + "_bwauth integer, "
dirauth_columns_questions += ",?,?,?"
+ dbc.execute("CREATE TABLE IF NOT EXISTS vote_data(date integer, " + dirauth_columns + "PRIMARY KEY(date ASC))")
+ dbc.commit()
+
votes = {}
- for root, dirs, files in os.walk(dir):
+ for root, dirs, files in os.walk(directory):
for f in files:
- filepath = os.path.join(root, f)
- print filepath
+ filepath = os.path.join(root, f)
+ print filepath
if '"' in f:
raise Exception("Potentially malicious filename")
- elif "votes-" in f and ".tar" in f:
- continue
+ elif "votes-" in f and ".tar" in f:
+ continue
+ elif "consensuses-" in f and ".tar" in f:
+ continue
+ elif "-vote-" not in f:
+ continue
voteTime = get_time_from_filename(f)
if voteTime not in votes:
@@ -104,11 +115,8 @@ def main(dir):
votes[voteTime][dirauth]['running'] = int(subprocess.check_output('egrep "^s " "' + filepath + '" | grep " Running" | wc -l', shell=True))
votes[voteTime][dirauth]['bwlines'] = int(subprocess.check_output('grep Measured= "' + filepath + '" | wc -l', shell=True))
- dbc.execute("CREATE TABLE IF NOT EXISTS vote_data(date integer, " + dirauth_columns + "PRIMARY KEY(date ASC))")
- dbc.commit()
-
for t in votes:
- print t
+ print ut_to_datetime(t)
print "\t", len(votes[t])
for d in votes[t]:
print "\t", d, votes[t][d]['bwlines'], votes[t][d]['running']
@@ -127,13 +135,175 @@ def main(dir):
dbc.execute("INSERT OR REPLACE INTO vote_data VALUES (?" + dirauth_columns_questions + ")", insertValues)
dbc.commit()
+def bwauth_measurements(directory, dirAuths, dbc):
+ #Find all the consensuses and votesrm
+ votes = {}
+ consensuses = {}
+ for root, dirs, files in os.walk(directory):
+ for f in files:
+ filepath = os.path.join(root, f)
+
+ if '"' in f:
+ raise Exception("Potentially malicious filename")
+ elif "votes-" in f and ".tar" in f:
+ continue
+ elif "consensuses-" in f and ".tar" in f:
+ continue
+
+ if "-consensus" in f:
+ consensusTime = get_time_from_filename(f)
+ if consensusTime not in consensuses:
+ consensuses[consensusTime] = filepath
+ else:
+ print "Found two consensuses with the same time:", ut_to_datetime(consensusTime)
+
+ #print "Consensus:", filepath
+ elif "-vote-" in f:
+ voteTime = get_time_from_filename(f)
+
+ # Test to see if we already processed this one
+ cur = dbc.cursor()
+ cur.execute("SELECT * FROM bwauth_data WHERE date = ?", (voteTime,))
+ if cur.fetchone():
+ print "Skipping", f, "because we already processed it"
+ continue
+ elif voteTime not in votes:
+ votes[voteTime] = {}
+
+ dirauth = get_dirauth_from_filename(f)
+
+ if dirauth not in dirAuths:
+ raise Exception("Found a dirauth I don't know about (probably spelling): " + dirauth)
+ elif dirauth not in votes[voteTime]:
+ votes[voteTime][dirauth] = filepath
+ else:
+ print "Found two votes for dirauth " + dirauth + ":", filepath, "and", votes[voteTime][dirauth]
+
+ #print "Vote:", dirauth, filepath
+
+ #Make sure we have a consensus for each vote
+ to_del = []
+ for v in votes:
+ if v not in consensuses:
+ print "Have votes for time", ut_to_datetime(v), "but no consensus!"
+ to_del.append(v)
+ #sys.exit(1)
+ for i in to_del:
+ del votes[i]
+
+ #Make the table
+ bwauth_columns = ""
+ bwauth_columns_questions = ""
+ for d in dirAuths:
+ bwauth_columns += d + "_above integer, " + d + "_shared integer, " + d + "_exclusive integer, " + d + "_below integer, " + d + "_unmeasured integer, "
+ bwauth_columns_questions += ",?,?,?,?,?"
+
+ dbc.execute("CREATE TABLE IF NOT EXISTS bwauth_data(date integer, " + bwauth_columns + "PRIMARY KEY(date ASC))")
+ dbc.commit()
+
+ reviewed = 0
+ for v in votes:
+ reviewed += 1
+ print "Reviewing", consensuses[v], "(" + str(reviewed) + "/" + str(len(votes)) + ")"
+
+ #Get the consensus data
+ consensusRouters = {}
+ with DescriptorReader(consensuses[v]) as reader:
+ reader.register_skip_listener(my_listener)
+ for relay in reader:
+ consensusRouters[relay.fingerprint] = "Unmeasured" if relay.is_unmeasured else relay.bandwidth
+
+ #The vote data
+ bwauthVotes = {}
+ for d in votes[v]:
+ if d not in bwauthVotes:
+ bwauthVotes[d] = {}
+
+ measured_something = False
+ with DescriptorReader(votes[v][d]) as reader:
+ reader.register_skip_listener(my_listener)
+ for relay in reader:
+ if relay.measured:
+ bwauthVotes[d][relay.fingerprint] = relay.measured
+ measured_something = True
+ if not measured_something:
+ del bwauthVotes[d]
+
+ #Now match them up and store the data
+ thisConsensusResults = {}
+ for r in consensusRouters:
+ for d in bwauthVotes:
+ had_any_value = False
+ if d not in thisConsensusResults:
+ thisConsensusResults[d] = {'unmeasured' : 0, 'above' : 0, 'below' : 0, 'exclusive' : 0 , 'shared' : 0}
+
+ if consensusRouters[r] == "Unmeasured":
+ continue
+ elif r not in bwauthVotes[d]:
+ had_any_value = True
+ thisConsensusResults[d]['unmeasured'] += 1
+ elif consensusRouters[r] < bwauthVotes[d][r]:
+ had_any_value = True
+ thisConsensusResults[d]['above'] += 1
+ elif consensusRouters[r] > bwauthVotes[d][r]:
+ had_any_value = True
+ thisConsensusResults[d]['below'] += 1
+ elif consensusRouters[r] == bwauthVotes[d][r] and \
+ 1 == len([1 for d_i in bwauthVotes if d_i in bwauthVotes and r in bwauthVotes[d_i] and bwauthVotes[d_i][r] == consensusRouters[r]]):
+ had_any_value = True
+ thisConsensusResults[d]['exclusive'] += 1
+ elif consensusRouters[r] == bwauthVotes[d][r] and \
+ 1 != len([1 for d_i in bwauthVotes if d_i in bwauthVotes and r in bwauthVotes[d_i] and bwauthVotes[d_i][r] == consensusRouters[r] ]):
+ had_any_value = True
+ thisConsensusResults[d]['shared'] += 1
+ else:
+ print "What case am I in???"
+ sys.exit(1)
+
+ if not had_any_value:
+ del thisConsensusResults[d]
+
+ insertValues = [v]
+ for d in dirAuths:
+ if d in thisConsensusResults:
+ insertValues.append(thisConsensusResults[d]['above'])
+ insertValues.append(thisConsensusResults[d]['shared'])
+ insertValues.append(thisConsensusResults[d]['exclusive'])
+ insertValues.append(thisConsensusResults[d]['below'])
+ insertValues.append(thisConsensusResults[d]['unmeasured'])
+ else:
+ insertValues.append(None)
+ insertValues.append(None)
+ insertValues.append(None)
+ insertValues.append(None)
+ insertValues.append(None)
+
+ dbc.execute("INSERT OR REPLACE INTO bwauth_data VALUES (?" + bwauth_columns_questions + ")", insertValues)
+ dbc.commit()
+
+def my_listener(path, exception):
+ print "Skipped!"
+ print path
+ print exception
+
+
+def main(itype, directory):
+ dirAuths = get_dirauths_in_tables()
+ dbc = sqlite3.connect(os.path.join('data', 'historical.db'))
+
+ if itype == "dirauth_relay_votes":
+ dirauth_relay_votes(directory, dirAuths, dbc)
+ elif itype == "bwauth_measurements":
+ bwauth_measurements(directory, dirAuths, dbc)
+ else:
+ print "Unknown ingestion type"
if __name__ == '__main__':
try:
- if len(sys.argv) != 2:
- print "Usage: ", sys.argv[0], "vote-directory"
+ if len(sys.argv) != 3:
+ print "Usage: ", sys.argv[0], "ingestion-type vote-directory"
else:
- main(sys.argv[1])
+ main(sys.argv[1], sys.argv[2])
except:
msg = "%s failed with:\n\n%s" % (sys.argv[0], traceback.format_exc())
print "Error: %s" % msg
diff --git a/utility.py b/utility.py
index 6cebe6b..aeb7b8d 100644
--- a/utility.py
+++ b/utility.py
@@ -84,3 +84,11 @@ def _get_documents(label, resource):
def unix_time(dt):
return (dt - datetime.datetime.utcfromtimestamp(0)).total_seconds() * 1000.0
+def ut_to_datetime(ut):
+ return datetime.datetime.utcfromtimestamp(ut / 1000)
+
+def ut_to_datetime_format(ut):
+ return consensus_datetime_format(ut_to_datetime(ut))
+
+def consensus_datetime_format(dt):
+ return dt.strftime("%Y-%m-%d-%H-%M-%S")
More information about the tor-commits
mailing list