[tor-commits] [metrics-web/master] Create new clients module from metrics-task #8462.
karsten at torproject.org
karsten at torproject.org
Sun Aug 16 10:55:26 UTC 2015
commit 14840ed2db075bbc1d0991b974becc3826a50969
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date: Fri Aug 14 13:46:12 2015 +0200
Create new clients module from metrics-task #8462.
---
detector/.gitignore | 2 -
detector/country_info.py | 252 ---------
detector/detector.py | 437 ---------------
detector/detector.sh | 6 -
modules/clients/.gitignore | 2 +
modules/clients/build.xml | 44 ++
modules/clients/country_info.py | 252 +++++++++
modules/clients/detector.py | 437 +++++++++++++++
modules/clients/init-userstats.sql | 575 ++++++++++++++++++++
modules/clients/merge-clients.R | 19 +
.../src/org/torproject/metrics/clients/Main.java | 465 ++++++++++++++++
modules/clients/test-userstats.sql | 478 ++++++++++++++++
modules/clients/userstats-detector.R | 18 +
shared/bin/80-run-clients-stats.sh | 30 +
shared/bin/99-copy-stats-files.sh | 1 +
15 files changed, 2321 insertions(+), 697 deletions(-)
diff --git a/detector/.gitignore b/detector/.gitignore
deleted file mode 100644
index 29a7166..0000000
--- a/detector/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-*.csv
-
diff --git a/detector/country_info.py b/detector/country_info.py
deleted file mode 100644
index e23728e..0000000
--- a/detector/country_info.py
+++ /dev/null
@@ -1,252 +0,0 @@
-# -*- coding: utf-8 -*-
-
-countries = {
- "ad" : "Andorra",
- "ae" : "the United Arab Emirates",
- "af" : "Afghanistan",
- "ag" : "Antigua and Barbuda",
- "ai" : "Anguilla",
- "al" : "Albania",
- "am" : "Armenia",
- "an" : "the Netherlands Antilles",
- "ao" : "Angola",
- "aq" : "Antarctica",
- "ar" : "Argentina",
- "as" : "American Samoa",
- "at" : "Austria",
- "au" : "Australia",
- "aw" : "Aruba",
- "ax" : "the Aland Islands",
- "az" : "Azerbaijan",
- "ba" : "Bosnia and Herzegovina",
- "bb" : "Barbados",
- "bd" : "Bangladesh",
- "be" : "Belgium",
- "bf" : "Burkina Faso",
- "bg" : "Bulgaria",
- "bh" : "Bahrain",
- "bi" : "Burundi",
- "bj" : "Benin",
- "bl" : "Saint Bartelemey",
- "bm" : "Bermuda",
- "bn" : "Brunei",
- "bo" : "Bolivia",
- "br" : "Brazil",
- "bs" : "the Bahamas",
- "bt" : "Bhutan",
- "bv" : "the Bouvet Island",
- "bw" : "Botswana",
- "by" : "Belarus",
- "bz" : "Belize",
- "ca" : "Canada",
- "cc" : "the Cocos (Keeling) Islands",
- "cd" : "the Democratic Republic of the Congo",
- "cf" : "Central African Republic",
- "cg" : "Congo",
- "ch" : "Switzerland",
- "ci" : u"Côte d'Ivoire",
- "ck" : "the Cook Islands",
- "cl" : "Chile",
- "cm" : "Cameroon",
- "cn" : "China",
- "co" : "Colombia",
- "cr" : "Costa Rica",
- "cu" : "Cuba",
- "cv" : "Cape Verde",
- "cx" : "the Christmas Island",
- "cy" : "Cyprus",
- "cz" : "the Czech Republic",
- "de" : "Germany",
- "dj" : "Djibouti",
- "dk" : "Denmark",
- "dm" : "Dominica",
- "do" : "the Dominican Republic",
- "dz" : "Algeria",
- "ec" : "Ecuador",
- "ee" : "Estonia",
- "eg" : "Egypt",
- "eh" : "the Western Sahara",
- "er" : "Eritrea",
- "es" : "Spain",
- "et" : "Ethiopia",
- "fi" : "Finland",
- "fj" : "Fiji",
- "fk" : "the Falkland Islands (Malvinas)",
- "fm" : "the Federated States of Micronesia",
- "fo" : "the Faroe Islands",
- "fr" : "France",
- "fx" : "Metropolitan France",
- "ga" : "Gabon",
- "gb" : "the United Kingdom",
- "gd" : "Grenada",
- "ge" : "Georgia",
- "gf" : "French Guiana",
- "gg" : "Guernsey",
- "gh" : "Ghana",
- "gi" : "Gibraltar",
- "gl" : "Greenland",
- "gm" : "Gambia",
- "gn" : "Guinea",
- "gp" : "Guadeloupe",
- "gq" : "Equatorial Guinea",
- "gr" : "Greece",
- "gs" : "South Georgia and the South Sandwich Islands",
- "gt" : "Guatemala",
- "gu" : "Guam",
- "gw" : "Guinea-Bissau",
- "gy" : "Guyana",
- "hk" : "Hong Kong",
- "hm" : "Heard Island and McDonald Islands",
- "hn" : "Honduras",
- "hr" : "Croatia",
- "ht" : "Haiti",
- "hu" : "Hungary",
- "id" : "Indonesia",
- "ie" : "Ireland",
- "il" : "Israel",
- "im" : "the Isle of Man",
- "in" : "India",
- "io" : "the British Indian Ocean Territory",
- "iq" : "Iraq",
- "ir" : "Iran",
- "is" : "Iceland",
- "it" : "Italy",
- "je" : "Jersey",
- "jm" : "Jamaica",
- "jo" : "Jordan",
- "jp" : "Japan",
- "ke" : "Kenya",
- "kg" : "Kyrgyzstan",
- "kh" : "Cambodia",
- "ki" : "Kiribati",
- "km" : "Comoros",
- "kn" : "Saint Kitts and Nevis",
- "kp" : "North Korea",
- "kr" : "the Republic of Korea",
- "kw" : "Kuwait",
- "ky" : "the Cayman Islands",
- "kz" : "Kazakhstan",
- "la" : "Laos",
- "lb" : "Lebanon",
- "lc" : "Saint Lucia",
- "li" : "Liechtenstein",
- "lk" : "Sri Lanka",
- "lr" : "Liberia",
- "ls" : "Lesotho",
- "lt" : "Lithuania",
- "lu" : "Luxembourg",
- "lv" : "Latvia",
- "ly" : "Libya",
- "ma" : "Morocco",
- "mc" : "Monaco",
- "md" : "the Republic of Moldova",
- "me" : "Montenegro",
- "mf" : "Saint Martin",
- "mg" : "Madagascar",
- "mh" : "the Marshall Islands",
- "mk" : "Macedonia",
- "ml" : "Mali",
- "mm" : "Burma",
- "mn" : "Mongolia",
- "mo" : "Macau",
- "mp" : "the Northern Mariana Islands",
- "mq" : "Martinique",
- "mr" : "Mauritania",
- "ms" : "Montserrat",
- "mt" : "Malta",
- "mu" : "Mauritius",
- "mv" : "the Maldives",
- "mw" : "Malawi",
- "mx" : "Mexico",
- "my" : "Malaysia",
- "mz" : "Mozambique",
- "na" : "Namibia",
- "nc" : "New Caledonia",
- "ne" : "Niger",
- "nf" : "Norfolk Island",
- "ng" : "Nigeria",
- "ni" : "Nicaragua",
- "nl" : "the Netherlands",
- "no" : "Norway",
- "np" : "Nepal",
- "nr" : "Nauru",
- "nu" : "Niue",
- "nz" : "New Zealand",
- "om" : "Oman",
- "pa" : "Panama",
- "pe" : "Peru",
- "pf" : "French Polynesia",
- "pg" : "Papua New Guinea",
- "ph" : "the Philippines",
- "pk" : "Pakistan",
- "pl" : "Poland",
- "pm" : "Saint Pierre and Miquelon",
- "pn" : "the Pitcairn Islands",
- "pr" : "Puerto Rico",
- "ps" : "the Palestinian Territory",
- "pt" : "Portugal",
- "pw" : "Palau",
- "py" : "Paraguay",
- "qa" : "Qatar",
- "re" : "Reunion",
- "ro" : "Romania",
- "rs" : "Serbia",
- "ru" : "Russia",
- "rw" : "Rwanda",
- "sa" : "Saudi Arabia",
- "sb" : "the Solomon Islands",
- "sc" : "the Seychelles",
- "sd" : "Sudan",
- "se" : "Sweden",
- "sg" : "Singapore",
- "sh" : "Saint Helena",
- "si" : "Slovenia",
- "sj" : "Svalbard and Jan Mayen",
- "sk" : "Slovakia",
- "sl" : "Sierra Leone",
- "sm" : "San Marino",
- "sn" : "Senegal",
- "so" : "Somalia",
- "sr" : "Suriname",
- "ss" : "South Sudan",
- "st" : u"São Tomé and PrÃncipe",
- "sv" : "El Salvador",
- "sy" : "the Syrian Arab Republic",
- "sz" : "Swaziland",
- "tc" : "Turks and Caicos Islands",
- "td" : "Chad",
- "tf" : "the French Southern Territories",
- "tg" : "Togo",
- "th" : "Thailand",
- "tj" : "Tajikistan",
- "tk" : "Tokelau",
- "tl" : "East Timor",
- "tm" : "Turkmenistan",
- "tn" : "Tunisia",
- "to" : "Tonga",
- "tr" : "Turkey",
- "tt" : "Trinidad and Tobago",
- "tv" : "Tuvalu",
- "tw" : "Taiwan",
- "tz" : "the United Republic of Tanzania",
- "ua" : "Ukraine",
- "ug" : "Uganda",
- "um" : "the United States Minor Outlying Islands",
- "us" : "the United States",
- "uy" : "Uruguay",
- "uz" : "Uzbekistan",
- "va" : "Vatican City",
- "vc" : "Saint Vincent and the Grenadines",
- "ve" : "Venezuela",
- "vg" : "the British Virgin Islands",
- "vi" : "the United States Virgin Islands",
- "vn" : "Vietnam",
- "vu" : "Vanuatu",
- "wf" : "Wallis and Futuna",
- "ws" : "Samoa",
- "ye" : "Yemen",
- "yt" : "Mayotte",
- "za" : "South Africa",
- "zm" : "Zambia",
- "zw" : "Zimbabwe"
- }
diff --git a/detector/detector.py b/detector/detector.py
deleted file mode 100644
index 611f25b..0000000
--- a/detector/detector.py
+++ /dev/null
@@ -1,437 +0,0 @@
-## Copyright (c) 2011 George Danezis <gdane at microsoft.com>
-##
-## All rights reserved.
-##
-## Redistribution and use in source and binary forms, with or without
-## modification, are permitted (subject to the limitations in the
-## disclaimer below) provided that the following conditions are met:
-##
-## * Redistributions of source code must retain the above copyright
-## notice, this list of conditions and the following disclaimer.
-##
-## * Redistributions in binary form must reproduce the above copyright
-## notice, this list of conditions and the following disclaimer in the
-## documentation and/or other materials provided with the
-## distribution.
-##
-## * Neither the name of <Owner Organization> nor the names of its
-## contributors may be used to endorse or promote products derived
-## from this software without specific prior written permission.
-##
-## NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE
-## GRANTED BY THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT
-## HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
-## WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
-## MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-## DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-## LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-## CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-## SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
-## BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
-## WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
-## OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
-## IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-##
-## (Clear BSD license: http://labs.metacarta.com/license-explanation.html#license)
-
-## This script reads a .csv file of the number of Tor users and finds
-## anomalies that might be indicative of censorship.
-
-# Dep: matplotlib
-from pylab import *
-import matplotlib
-
-# Dep: numpy
-import numpy
-
-# Dep: scipy
-import scipy.stats
-from scipy.stats.distributions import norm
-from scipy.stats.distributions import poisson
-
-# Std lib
-from datetime import date
-from datetime import timedelta
-import os.path
-
-# Country code -> Country names
-import country_info
-
-# write utf8 to file
-import codecs
-
-days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
-
-def get_country_name_from_cc(country_code):
- if (country_code.lower() in country_info.countries):
- return country_info.countries[country_code.lower()]
- return country_code # if we didn't find the cc in our map
-
-"""
-Represents a .csv file containing information on the number of
-connecting Tor users per country.
-
-'store': Dictionary with (<country code>, <counter>) as key, and the number of users as value.
- <country code> can also be "date"...
-'all_dates': List of the data intervals (with default timedelta: 1 day).
-'country_codes': List of all relevant country codes.
-'MAX_INDEX': Length of store, number of country codes etc.
-'date_min': The oldest date found in the .csv.
-'date_min': The latest date found in the .csv.
-"""
-class torstatstore:
- def __init__(self, file_name):
- f = file(file_name)
- country_codes = f.readline()
- country_codes = country_codes.strip().split(",")
-
- store = {}
- MAX_INDEX = 0
- for i, line in enumerate(f):
- MAX_INDEX += 1
- line_parsed = line.strip().split(",")
- for j, (ccode, val) in enumerate(zip(country_codes,line_parsed)):
- processed_val = None
- if ccode == "date":
- try:
- year, month, day = int(val[:4]), int(val[5:7]), int(val[8:10])
- processed_val = date(year, month, day)
- except Exception, e:
- print "Parsing error (ignoring line %s):" % j
- print "%s" % val,e
- break
-
- elif val != "NA":
- processed_val = int(val)
- store[(ccode, i)] = processed_val
-
- # min and max
- date_min = store[("date", 0)]
- date_max = store[("date", i)]
-
- all_dates = []
- d = date_min
- dt = timedelta(days=1)
- while d <= date_max:
- all_dates += [d]
- d = d + dt
-
- # Save for later
- self.store = store
- self.all_dates = all_dates
- self.country_codes = country_codes
- self.MAX_INDEX = MAX_INDEX
- self.date_min = date_min
- self.date_max = date_max
-
- """Return a list representing a time series of 'ccode' with respect
- to the number of connected users.
- """
- def get_country_series(self, ccode):
- assert ccode in self.country_codes
- series = {}
- for d in self.all_dates:
- series[d] = None
- for i in range(self.MAX_INDEX):
- series[self.store[("date", i)]] = self.store[(ccode, i)]
- sx = []
- for d in self.all_dates:
- sx += [series[d]]
- return sx
-
- """Return an ordered list containing tuples of the form (<number of
- users>, <country code>). The list is ordered with respect to the
- number of users for each country.
- """
- def get_largest(self, number):
- exclude = set(["all", "??", "date"])
- l = [(self.store[(c, self.MAX_INDEX-1)], c) for c in self.country_codes if c not in exclude]
- l.sort()
- l.reverse()
- return l[:number]
-
- """Return a dictionary, with <country code> as key, and the time
- series of the country code as the value.
- """
- def get_largest_locations(self, number):
- l = self.get_largest(number)
- res = {}
- for _, ccode in l[:number]:
- res[ccode] = self.get_country_series(ccode)
- return res
-
-"""Return a list containing lists (?) where each such list contains
-the difference in users for a time delta of 'days'
-"""
-def n_day_rel(series, days):
- rel = []
- for i, v in enumerate(series):
- if series[i] is None:
- rel += [None]
- continue
-
- if i - days < 0 or series[i-days] is None or series[i-days] == 0:
- rel += [None]
- else:
- rel += [ float(series[i]) / series[i-days]]
- return rel
-
-# Main model: computes the expected min / max range of number of users
-def make_tendencies_minmax(l, INTERVAL = 1):
- lminus1 = dict([(ccode, n_day_rel(l[ccode], INTERVAL)) for ccode in l])
- c = lminus1[lminus1.keys()[0]]
- dists = []
- minx = []
- maxx = []
- for i in range(len(c)):
- vals = [lminus1[ccode][i] for ccode in lminus1.keys() if lminus1[ccode][i] != None]
- if len(vals) < 8:
- dists += [None]
- minx += [None]
- maxx += [None]
- else:
- vals.sort()
- median = vals[len(vals)/2]
- q1 = vals[len(vals)/4]
- q2 = vals[(3*len(vals))/4]
- qd = q2 - q1
- vals = [v for v in vals if median - qd*4 < v and v < median + qd*4]
- if len(vals) < 8:
- dists += [None]
- minx += [None]
- maxx += [None]
- continue
- mu, signma = norm.fit(vals)
- dists += [(mu, signma)]
- maxx += [norm.ppf(0.9999, mu, signma)]
- minx += [norm.ppf(1 - 0.9999, mu, signma)]
- ## print minx[-1], maxx[-1]
- return minx, maxx
-
-# Makes pretty plots
-def raw_plot(series, minc, maxc, labels, xtitle):
- assert len(xtitle) == 3
- fname, stitle, slegend = xtitle
-
- font = {'family' : 'Bitstream Vera Sans',
- 'weight' : 'normal',
- 'size' : 8}
- matplotlib.rc('font', **font)
-
- ylim( (-max(series)*0.1, max(series)*1.1) )
- plot(labels, series, linewidth=1.0, label="Users")
-
- wherefill = []
- for mm,mx in zip(minc, maxc):
- wherefill += [not (mm == None and mx == None)]
- assert mm < mx or (mm == None and mx == None)
-
- fill_between(labels, minc, maxc, where=wherefill, color="gray", label="Prediction")
-
- vdown = []
- vup = []
- for i,v in enumerate(series):
- if minc[i] != None and v < minc[i]:
- vdown += [v]
- vup += [None]
- elif maxc[i] != None and v > maxc[i]:
- vdown += [None]
- vup += [v]
- else:
- vup += [None]
- vdown += [None]
-
- plot(labels, vdown, 'o', ms=10, lw=2, alpha=0.5, mfc='orange', label="Downturns")
- plot(labels, vup, 'o', ms=10, lw=2, alpha=0.5, mfc='green', label="Upturns")
-
- legend(loc=2)
-
- xlabel('Time (days)')
- ylabel('Users')
- title(stitle)
- grid(True)
- F = gcf()
-
- F.set_size_inches(10,5)
- F.savefig(fname, format="png", dpi = (150))
- close()
-
-def absolute_plot(series, minc, maxc, labels,INTERVAL, xtitle):
- in_minc = []
- in_maxc = []
- for i, v in enumerate(series):
- if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None:
- in_minc += [minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])]
- in_maxc += [maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])]
- if not in_minc[-1] < in_maxc[-1]:
- print in_minc[-1], in_maxc[-1], series[i-INTERVAL], minc[i], maxc[i]
- assert in_minc[-1] < in_maxc[-1]
- else:
- in_minc += [None]
- in_maxc += [None]
- raw_plot(series, in_minc, in_maxc, labels, xtitle)
-
-"""Return the number of downscores and upscores of a time series
-'series', given tendencies 'minc' and 'maxc' for the time interval
-'INTERVAL'.
-
-If 'scoring_interval' is specifed we only consider upscore/downscore
-that happened in the latest 'scoring_interval' days.
-"""
-def censor_score(series, minc, maxc, INTERVAL, scoring_interval=None):
- upscore = 0
- downscore = 0
-
- if scoring_interval is None:
- scoring_interval = len(series)
- assert(len(series) >= scoring_interval)
-
- for i, v in enumerate(series):
- if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None:
- in_minc = minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])
- in_maxc = maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])
- if (i >= (len(series) - scoring_interval)):
- downscore += 1 if minc[i] != None and v < in_minc else 0
- upscore += 1 if maxc[i] != None and v > in_maxc else 0
-
- return downscore, upscore
-
-def plot_target(tss, TARGET, xtitle, minx, maxx, DAYS=365, INTERV = 7):
- ctarget = tss.get_country_series(TARGET)
- c = n_day_rel(ctarget, INTERV)
- absolute_plot(ctarget[-DAYS:], minx[-DAYS:], maxx[-DAYS:], tss.all_dates[-DAYS:],INTERV, xtitle = xtitle)
-
-def write_censorship_report_prologue(report_file, dates, notification_period):
- if (notification_period == 1):
- date_str = "%s" % (dates[-1]) # no need for date range if it's just one day
- else:
- date_str = "%s to %s" % (dates[-notification_period], dates[-1])
-
- prologue = "=======================\n"
- prologue += "Automatic Censorship Report for %s\n" % (date_str)
- prologue += "=======================\n\n"
- report_file.write(prologue)
-
-## Make a league table of censorship + nice graphs
-def plot_all(tss, minx, maxx, INTERV, DAYS=None, rdir="img"):
- rdir = os.path.realpath(rdir)
- if not os.path.exists(rdir) or not os.path.isdir(rdir):
- print "ERROR: %s does not exist or is not a directory." % rdir
- return
-
- summary_file = file(os.path.join(rdir, "summary.txt"), "w")
-
- if DAYS == None:
- DAYS = 6*31
-
- s = tss.get_largest(200)
- scores = []
- for num, li in s:
- print ".",
- ds,us = censor_score(tss.get_country_series(li)[-DAYS:], minx[-DAYS:], maxx[-DAYS:], INTERV)
- # print ds, us
- scores += [(ds,num, us, li)]
- scores.sort()
- scores.reverse()
- s = "\n=======================\n"
- s+= "Report for %s to %s\n" % (tss.all_dates[-DAYS], tss.all_dates[-1])
- s+= "=======================\n"
- print s
- summary_file.write(s)
- for a,nx, b,c in scores:
- if a > 0:
- s = "%s -- down: %2d (up: %2d affected: %s)" % (c, a, b, nx)
- print s
- summary_file.write(s + "\n")
- xtitle = (os.path.join(rdir, "%03d-%s-censor.png" % (a,c)), "Tor report for %s -- down: %2d (up: %2d affected: %s)" % (c, a, b, nx),"")
- plot_target(tss, c,xtitle, minx, maxx, DAYS, INTERV)
- summary_file.close()
-
-"""Write a CSV report on the minimum/maximum users of each country per date."""
-def write_all(tss, minc, maxc, RANGES_FILE, INTERVAL=7):
- ranges_file = file(RANGES_FILE, "w")
- ranges_file.write("date,country,minusers,maxusers\n")
- exclude = set(["all", "??", "date"])
- for c in tss.country_codes:
- if c in exclude:
- continue
- series = tss.get_country_series(c)
- for i, v in enumerate(series):
- if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None:
- minv = minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])
- maxv = maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])
- if not minv < maxv:
- print minv, maxv, series[i-INTERVAL], minc[i], maxc[i]
- assert minv < maxv
- ranges_file.write("%s,%s,%s,%s\n" % (tss.all_dates[i], c, minv, maxv))
- ranges_file.close()
-
-"""Return a URL that points to a graph in metrics.tpo that displays
-the number of direct Tor users in country 'country_code', for a
-'period'-days period.
-
-Let's hope that the metrics.tpo URL scheme doesn't change often.
-"""
-def get_tor_usage_graph_url_for_cc_and_date(country_code, dates, period):
- url = "https://metrics.torproject.org/users.html?graph=userstats-relay-country&start=%s&end=%s&country=%s&events=on#userstats-relay-country\n" % \
- (dates[-period], dates[-1], country_code)
- return url
-
-"""Write a file containing a short censorship report over the last
-'notification_period' days.
-"""
-def write_ml_report(tss, minx, maxx, INTERV, DAYS, notification_period=None):
- if notification_period is None:
- notification_period = DAYS
-
- report_file = codecs.open('short_censorship_report.txt', 'w', 'utf-8')
- file_prologue_written = False
-
- s = tss.get_largest(None) # no restrictions, get 'em all.
- scores = []
- for num, li in s:
- ds,us = censor_score(tss.get_country_series(li)[-DAYS:], minx[-DAYS:], maxx[-DAYS:], INTERV, notification_period)
- scores += [(ds,num, us, li)]
- scores.sort()
- scores.reverse()
-
- for downscores,users_n,upscores,country_code in scores:
- if (downscores > 0) or (upscores > 0):
- if not file_prologue_written:
- write_censorship_report_prologue(report_file, tss.all_dates, notification_period)
- file_prologue_written = True
-
- if ((upscores > 0) and (downscores == 0)):
- s = "We detected an unusual spike of Tor users in %s (%d upscores, %d users):\n" % \
- (get_country_name_from_cc(country_code), upscores, users_n)
- else:
- s = "We detected %d potential censorship events in %s (users: %d, upscores: %d):\n" % \
- (downscores, get_country_name_from_cc(country_code), users_n, upscores)
-
- # Also give out a link for the appropriate usage graph for a 90-days period.
- s += get_tor_usage_graph_url_for_cc_and_date(country_code, tss.all_dates, 90)
-
- report_file.write(s + "\n")
-
- report_file.close()
-
-# INTERV is the time interval to model connection rates;
-# consider maximum DAYS days back.
-def detect(CSV_FILE = "userstats-detector.csv",
- RANGES_FILE = "userstats-ranges.csv", GRAPH_DIR = "img",
- INTERV = 7, DAYS = 6 * 31, REPORT = True):
- tss = torstatstore(CSV_FILE)
- l = tss.get_largest_locations(50)
- minx, maxx = make_tendencies_minmax(l, INTERV)
- #plot_all(tss, minx, maxx, INTERV, DAYS, rdir=GRAPH_DIR)
- write_all(tss, minx, maxx, RANGES_FILE, INTERV)
-
- if REPORT:
- # Make our short report; only consider events of the last day
- write_ml_report(tss, minx, maxx, INTERV, DAYS, 1)
-
-def main():
- detect()
-
-if __name__ == "__main__":
- main()
diff --git a/detector/detector.sh b/detector/detector.sh
deleted file mode 100755
index 56f6886..0000000
--- a/detector/detector.sh
+++ /dev/null
@@ -1,6 +0,0 @@
-#!/bin/bash
-wget -qO direct-users.csv --no-check-certificate https://metrics.torproject.org/csv/direct-users.csv
-wget -qO userstats-detector.csv --no-check-certificate https://metrics.torproject.org/csv/userstats-detector.csv
-python detector.py
-cat short_censorship_report.txt | mail -E -s 'Possible censorship events' tor-censorship-events at lists.torproject.org
-
diff --git a/modules/clients/.gitignore b/modules/clients/.gitignore
new file mode 100644
index 0000000..29a7166
--- /dev/null
+++ b/modules/clients/.gitignore
@@ -0,0 +1,2 @@
+*.csv
+
diff --git a/modules/clients/build.xml b/modules/clients/build.xml
new file mode 100644
index 0000000..f90e138
--- /dev/null
+++ b/modules/clients/build.xml
@@ -0,0 +1,44 @@
+<project default="run" name="clients" basedir=".">
+
+ <property name="sources" value="src"/>
+ <property name="classes" value="classes"/>
+ <path id="classpath">
+ <pathelement path="${classes}"/>
+ <fileset dir="/usr/share/java">
+ <include name="commons-codec-1.6.jar"/>
+ <include name="commons-compress-1.4.1.jar"/>
+ <include name="commons-lang-2.6.jar"/>
+ </fileset>
+ <fileset dir="../../deps/metrics-lib">
+ <include name="descriptor.jar"/>
+ </fileset>
+ </path>
+
+ <target name="metrics-lib">
+ <ant dir="../../deps/metrics-lib"/>
+ </target>
+
+ <target name="compile" depends="metrics-lib">
+ <mkdir dir="${classes}"/>
+ <javac destdir="${classes}"
+ srcdir="${sources}"
+ source="1.6"
+ target="1.6"
+ debug="true"
+ deprecation="true"
+ optimize="false"
+ failonerror="true"
+ includeantruntime="false">
+ <classpath refid="classpath"/>
+ </javac>
+ </target>
+
+ <target name="run" depends="compile">
+ <java fork="true"
+ maxmemory="2g"
+ classname="org.torproject.metrics.clients.Main">
+ <classpath refid="classpath"/>
+ </java>
+ </target>
+</project>
+
diff --git a/modules/clients/country_info.py b/modules/clients/country_info.py
new file mode 100644
index 0000000..e23728e
--- /dev/null
+++ b/modules/clients/country_info.py
@@ -0,0 +1,252 @@
+# -*- coding: utf-8 -*-
+
+countries = {
+ "ad" : "Andorra",
+ "ae" : "the United Arab Emirates",
+ "af" : "Afghanistan",
+ "ag" : "Antigua and Barbuda",
+ "ai" : "Anguilla",
+ "al" : "Albania",
+ "am" : "Armenia",
+ "an" : "the Netherlands Antilles",
+ "ao" : "Angola",
+ "aq" : "Antarctica",
+ "ar" : "Argentina",
+ "as" : "American Samoa",
+ "at" : "Austria",
+ "au" : "Australia",
+ "aw" : "Aruba",
+ "ax" : "the Aland Islands",
+ "az" : "Azerbaijan",
+ "ba" : "Bosnia and Herzegovina",
+ "bb" : "Barbados",
+ "bd" : "Bangladesh",
+ "be" : "Belgium",
+ "bf" : "Burkina Faso",
+ "bg" : "Bulgaria",
+ "bh" : "Bahrain",
+ "bi" : "Burundi",
+ "bj" : "Benin",
+ "bl" : "Saint Bartelemey",
+ "bm" : "Bermuda",
+ "bn" : "Brunei",
+ "bo" : "Bolivia",
+ "br" : "Brazil",
+ "bs" : "the Bahamas",
+ "bt" : "Bhutan",
+ "bv" : "the Bouvet Island",
+ "bw" : "Botswana",
+ "by" : "Belarus",
+ "bz" : "Belize",
+ "ca" : "Canada",
+ "cc" : "the Cocos (Keeling) Islands",
+ "cd" : "the Democratic Republic of the Congo",
+ "cf" : "Central African Republic",
+ "cg" : "Congo",
+ "ch" : "Switzerland",
+ "ci" : u"Côte d'Ivoire",
+ "ck" : "the Cook Islands",
+ "cl" : "Chile",
+ "cm" : "Cameroon",
+ "cn" : "China",
+ "co" : "Colombia",
+ "cr" : "Costa Rica",
+ "cu" : "Cuba",
+ "cv" : "Cape Verde",
+ "cx" : "the Christmas Island",
+ "cy" : "Cyprus",
+ "cz" : "the Czech Republic",
+ "de" : "Germany",
+ "dj" : "Djibouti",
+ "dk" : "Denmark",
+ "dm" : "Dominica",
+ "do" : "the Dominican Republic",
+ "dz" : "Algeria",
+ "ec" : "Ecuador",
+ "ee" : "Estonia",
+ "eg" : "Egypt",
+ "eh" : "the Western Sahara",
+ "er" : "Eritrea",
+ "es" : "Spain",
+ "et" : "Ethiopia",
+ "fi" : "Finland",
+ "fj" : "Fiji",
+ "fk" : "the Falkland Islands (Malvinas)",
+ "fm" : "the Federated States of Micronesia",
+ "fo" : "the Faroe Islands",
+ "fr" : "France",
+ "fx" : "Metropolitan France",
+ "ga" : "Gabon",
+ "gb" : "the United Kingdom",
+ "gd" : "Grenada",
+ "ge" : "Georgia",
+ "gf" : "French Guiana",
+ "gg" : "Guernsey",
+ "gh" : "Ghana",
+ "gi" : "Gibraltar",
+ "gl" : "Greenland",
+ "gm" : "Gambia",
+ "gn" : "Guinea",
+ "gp" : "Guadeloupe",
+ "gq" : "Equatorial Guinea",
+ "gr" : "Greece",
+ "gs" : "South Georgia and the South Sandwich Islands",
+ "gt" : "Guatemala",
+ "gu" : "Guam",
+ "gw" : "Guinea-Bissau",
+ "gy" : "Guyana",
+ "hk" : "Hong Kong",
+ "hm" : "Heard Island and McDonald Islands",
+ "hn" : "Honduras",
+ "hr" : "Croatia",
+ "ht" : "Haiti",
+ "hu" : "Hungary",
+ "id" : "Indonesia",
+ "ie" : "Ireland",
+ "il" : "Israel",
+ "im" : "the Isle of Man",
+ "in" : "India",
+ "io" : "the British Indian Ocean Territory",
+ "iq" : "Iraq",
+ "ir" : "Iran",
+ "is" : "Iceland",
+ "it" : "Italy",
+ "je" : "Jersey",
+ "jm" : "Jamaica",
+ "jo" : "Jordan",
+ "jp" : "Japan",
+ "ke" : "Kenya",
+ "kg" : "Kyrgyzstan",
+ "kh" : "Cambodia",
+ "ki" : "Kiribati",
+ "km" : "Comoros",
+ "kn" : "Saint Kitts and Nevis",
+ "kp" : "North Korea",
+ "kr" : "the Republic of Korea",
+ "kw" : "Kuwait",
+ "ky" : "the Cayman Islands",
+ "kz" : "Kazakhstan",
+ "la" : "Laos",
+ "lb" : "Lebanon",
+ "lc" : "Saint Lucia",
+ "li" : "Liechtenstein",
+ "lk" : "Sri Lanka",
+ "lr" : "Liberia",
+ "ls" : "Lesotho",
+ "lt" : "Lithuania",
+ "lu" : "Luxembourg",
+ "lv" : "Latvia",
+ "ly" : "Libya",
+ "ma" : "Morocco",
+ "mc" : "Monaco",
+ "md" : "the Republic of Moldova",
+ "me" : "Montenegro",
+ "mf" : "Saint Martin",
+ "mg" : "Madagascar",
+ "mh" : "the Marshall Islands",
+ "mk" : "Macedonia",
+ "ml" : "Mali",
+ "mm" : "Burma",
+ "mn" : "Mongolia",
+ "mo" : "Macau",
+ "mp" : "the Northern Mariana Islands",
+ "mq" : "Martinique",
+ "mr" : "Mauritania",
+ "ms" : "Montserrat",
+ "mt" : "Malta",
+ "mu" : "Mauritius",
+ "mv" : "the Maldives",
+ "mw" : "Malawi",
+ "mx" : "Mexico",
+ "my" : "Malaysia",
+ "mz" : "Mozambique",
+ "na" : "Namibia",
+ "nc" : "New Caledonia",
+ "ne" : "Niger",
+ "nf" : "Norfolk Island",
+ "ng" : "Nigeria",
+ "ni" : "Nicaragua",
+ "nl" : "the Netherlands",
+ "no" : "Norway",
+ "np" : "Nepal",
+ "nr" : "Nauru",
+ "nu" : "Niue",
+ "nz" : "New Zealand",
+ "om" : "Oman",
+ "pa" : "Panama",
+ "pe" : "Peru",
+ "pf" : "French Polynesia",
+ "pg" : "Papua New Guinea",
+ "ph" : "the Philippines",
+ "pk" : "Pakistan",
+ "pl" : "Poland",
+ "pm" : "Saint Pierre and Miquelon",
+ "pn" : "the Pitcairn Islands",
+ "pr" : "Puerto Rico",
+ "ps" : "the Palestinian Territory",
+ "pt" : "Portugal",
+ "pw" : "Palau",
+ "py" : "Paraguay",
+ "qa" : "Qatar",
+ "re" : "Reunion",
+ "ro" : "Romania",
+ "rs" : "Serbia",
+ "ru" : "Russia",
+ "rw" : "Rwanda",
+ "sa" : "Saudi Arabia",
+ "sb" : "the Solomon Islands",
+ "sc" : "the Seychelles",
+ "sd" : "Sudan",
+ "se" : "Sweden",
+ "sg" : "Singapore",
+ "sh" : "Saint Helena",
+ "si" : "Slovenia",
+ "sj" : "Svalbard and Jan Mayen",
+ "sk" : "Slovakia",
+ "sl" : "Sierra Leone",
+ "sm" : "San Marino",
+ "sn" : "Senegal",
+ "so" : "Somalia",
+ "sr" : "Suriname",
+ "ss" : "South Sudan",
+ "st" : u"São Tomé and PrÃncipe",
+ "sv" : "El Salvador",
+ "sy" : "the Syrian Arab Republic",
+ "sz" : "Swaziland",
+ "tc" : "Turks and Caicos Islands",
+ "td" : "Chad",
+ "tf" : "the French Southern Territories",
+ "tg" : "Togo",
+ "th" : "Thailand",
+ "tj" : "Tajikistan",
+ "tk" : "Tokelau",
+ "tl" : "East Timor",
+ "tm" : "Turkmenistan",
+ "tn" : "Tunisia",
+ "to" : "Tonga",
+ "tr" : "Turkey",
+ "tt" : "Trinidad and Tobago",
+ "tv" : "Tuvalu",
+ "tw" : "Taiwan",
+ "tz" : "the United Republic of Tanzania",
+ "ua" : "Ukraine",
+ "ug" : "Uganda",
+ "um" : "the United States Minor Outlying Islands",
+ "us" : "the United States",
+ "uy" : "Uruguay",
+ "uz" : "Uzbekistan",
+ "va" : "Vatican City",
+ "vc" : "Saint Vincent and the Grenadines",
+ "ve" : "Venezuela",
+ "vg" : "the British Virgin Islands",
+ "vi" : "the United States Virgin Islands",
+ "vn" : "Vietnam",
+ "vu" : "Vanuatu",
+ "wf" : "Wallis and Futuna",
+ "ws" : "Samoa",
+ "ye" : "Yemen",
+ "yt" : "Mayotte",
+ "za" : "South Africa",
+ "zm" : "Zambia",
+ "zw" : "Zimbabwe"
+ }
diff --git a/modules/clients/detector.py b/modules/clients/detector.py
new file mode 100644
index 0000000..611f25b
--- /dev/null
+++ b/modules/clients/detector.py
@@ -0,0 +1,437 @@
+## Copyright (c) 2011 George Danezis <gdane at microsoft.com>
+##
+## All rights reserved.
+##
+## Redistribution and use in source and binary forms, with or without
+## modification, are permitted (subject to the limitations in the
+## disclaimer below) provided that the following conditions are met:
+##
+## * Redistributions of source code must retain the above copyright
+## notice, this list of conditions and the following disclaimer.
+##
+## * Redistributions in binary form must reproduce the above copyright
+## notice, this list of conditions and the following disclaimer in the
+## documentation and/or other materials provided with the
+## distribution.
+##
+## * Neither the name of <Owner Organization> nor the names of its
+## contributors may be used to endorse or promote products derived
+## from this software without specific prior written permission.
+##
+## NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE
+## GRANTED BY THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT
+## HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
+## WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+## MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+## DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+## LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+## CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+## SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+## BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+## WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+## OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
+## IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+##
+## (Clear BSD license: http://labs.metacarta.com/license-explanation.html#license)
+
+## This script reads a .csv file of the number of Tor users and finds
+## anomalies that might be indicative of censorship.
+
+# Dep: matplotlib
+from pylab import *
+import matplotlib
+
+# Dep: numpy
+import numpy
+
+# Dep: scipy
+import scipy.stats
+from scipy.stats.distributions import norm
+from scipy.stats.distributions import poisson
+
+# Std lib
+from datetime import date
+from datetime import timedelta
+import os.path
+
+# Country code -> Country names
+import country_info
+
+# write utf8 to file
+import codecs
+
+days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
+
+def get_country_name_from_cc(country_code):
+ if (country_code.lower() in country_info.countries):
+ return country_info.countries[country_code.lower()]
+ return country_code # if we didn't find the cc in our map
+
+"""
+Represents a .csv file containing information on the number of
+connecting Tor users per country.
+
+'store': Dictionary with (<country code>, <counter>) as key, and the number of users as value.
+ <country code> can also be "date"...
+'all_dates': List of the data intervals (with default timedelta: 1 day).
+'country_codes': List of all relevant country codes.
+'MAX_INDEX': Length of store, number of country codes etc.
+'date_min': The oldest date found in the .csv.
+'date_min': The latest date found in the .csv.
+"""
+class torstatstore:
+ def __init__(self, file_name):
+ f = file(file_name)
+ country_codes = f.readline()
+ country_codes = country_codes.strip().split(",")
+
+ store = {}
+ MAX_INDEX = 0
+ for i, line in enumerate(f):
+ MAX_INDEX += 1
+ line_parsed = line.strip().split(",")
+ for j, (ccode, val) in enumerate(zip(country_codes,line_parsed)):
+ processed_val = None
+ if ccode == "date":
+ try:
+ year, month, day = int(val[:4]), int(val[5:7]), int(val[8:10])
+ processed_val = date(year, month, day)
+ except Exception, e:
+ print "Parsing error (ignoring line %s):" % j
+ print "%s" % val,e
+ break
+
+ elif val != "NA":
+ processed_val = int(val)
+ store[(ccode, i)] = processed_val
+
+ # min and max
+ date_min = store[("date", 0)]
+ date_max = store[("date", i)]
+
+ all_dates = []
+ d = date_min
+ dt = timedelta(days=1)
+ while d <= date_max:
+ all_dates += [d]
+ d = d + dt
+
+ # Save for later
+ self.store = store
+ self.all_dates = all_dates
+ self.country_codes = country_codes
+ self.MAX_INDEX = MAX_INDEX
+ self.date_min = date_min
+ self.date_max = date_max
+
+ """Return a list representing a time series of 'ccode' with respect
+ to the number of connected users.
+ """
+ def get_country_series(self, ccode):
+ assert ccode in self.country_codes
+ series = {}
+ for d in self.all_dates:
+ series[d] = None
+ for i in range(self.MAX_INDEX):
+ series[self.store[("date", i)]] = self.store[(ccode, i)]
+ sx = []
+ for d in self.all_dates:
+ sx += [series[d]]
+ return sx
+
+ """Return an ordered list containing tuples of the form (<number of
+ users>, <country code>). The list is ordered with respect to the
+ number of users for each country.
+ """
+ def get_largest(self, number):
+ exclude = set(["all", "??", "date"])
+ l = [(self.store[(c, self.MAX_INDEX-1)], c) for c in self.country_codes if c not in exclude]
+ l.sort()
+ l.reverse()
+ return l[:number]
+
+ """Return a dictionary, with <country code> as key, and the time
+ series of the country code as the value.
+ """
+ def get_largest_locations(self, number):
+ l = self.get_largest(number)
+ res = {}
+ for _, ccode in l[:number]:
+ res[ccode] = self.get_country_series(ccode)
+ return res
+
+"""Return a list containing lists (?) where each such list contains
+the difference in users for a time delta of 'days'
+"""
+def n_day_rel(series, days):
+ rel = []
+ for i, v in enumerate(series):
+ if series[i] is None:
+ rel += [None]
+ continue
+
+ if i - days < 0 or series[i-days] is None or series[i-days] == 0:
+ rel += [None]
+ else:
+ rel += [ float(series[i]) / series[i-days]]
+ return rel
+
+# Main model: computes the expected min / max range of number of users
+def make_tendencies_minmax(l, INTERVAL = 1):
+ lminus1 = dict([(ccode, n_day_rel(l[ccode], INTERVAL)) for ccode in l])
+ c = lminus1[lminus1.keys()[0]]
+ dists = []
+ minx = []
+ maxx = []
+ for i in range(len(c)):
+ vals = [lminus1[ccode][i] for ccode in lminus1.keys() if lminus1[ccode][i] != None]
+ if len(vals) < 8:
+ dists += [None]
+ minx += [None]
+ maxx += [None]
+ else:
+ vals.sort()
+ median = vals[len(vals)/2]
+ q1 = vals[len(vals)/4]
+ q2 = vals[(3*len(vals))/4]
+ qd = q2 - q1
+ vals = [v for v in vals if median - qd*4 < v and v < median + qd*4]
+ if len(vals) < 8:
+ dists += [None]
+ minx += [None]
+ maxx += [None]
+ continue
+ mu, signma = norm.fit(vals)
+ dists += [(mu, signma)]
+ maxx += [norm.ppf(0.9999, mu, signma)]
+ minx += [norm.ppf(1 - 0.9999, mu, signma)]
+ ## print minx[-1], maxx[-1]
+ return minx, maxx
+
+# Makes pretty plots
+def raw_plot(series, minc, maxc, labels, xtitle):
+ assert len(xtitle) == 3
+ fname, stitle, slegend = xtitle
+
+ font = {'family' : 'Bitstream Vera Sans',
+ 'weight' : 'normal',
+ 'size' : 8}
+ matplotlib.rc('font', **font)
+
+ ylim( (-max(series)*0.1, max(series)*1.1) )
+ plot(labels, series, linewidth=1.0, label="Users")
+
+ wherefill = []
+ for mm,mx in zip(minc, maxc):
+ wherefill += [not (mm == None and mx == None)]
+ assert mm < mx or (mm == None and mx == None)
+
+ fill_between(labels, minc, maxc, where=wherefill, color="gray", label="Prediction")
+
+ vdown = []
+ vup = []
+ for i,v in enumerate(series):
+ if minc[i] != None and v < minc[i]:
+ vdown += [v]
+ vup += [None]
+ elif maxc[i] != None and v > maxc[i]:
+ vdown += [None]
+ vup += [v]
+ else:
+ vup += [None]
+ vdown += [None]
+
+ plot(labels, vdown, 'o', ms=10, lw=2, alpha=0.5, mfc='orange', label="Downturns")
+ plot(labels, vup, 'o', ms=10, lw=2, alpha=0.5, mfc='green', label="Upturns")
+
+ legend(loc=2)
+
+ xlabel('Time (days)')
+ ylabel('Users')
+ title(stitle)
+ grid(True)
+ F = gcf()
+
+ F.set_size_inches(10,5)
+ F.savefig(fname, format="png", dpi = (150))
+ close()
+
+def absolute_plot(series, minc, maxc, labels,INTERVAL, xtitle):
+ in_minc = []
+ in_maxc = []
+ for i, v in enumerate(series):
+ if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None:
+ in_minc += [minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])]
+ in_maxc += [maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])]
+ if not in_minc[-1] < in_maxc[-1]:
+ print in_minc[-1], in_maxc[-1], series[i-INTERVAL], minc[i], maxc[i]
+ assert in_minc[-1] < in_maxc[-1]
+ else:
+ in_minc += [None]
+ in_maxc += [None]
+ raw_plot(series, in_minc, in_maxc, labels, xtitle)
+
+"""Return the number of downscores and upscores of a time series
+'series', given tendencies 'minc' and 'maxc' for the time interval
+'INTERVAL'.
+
+If 'scoring_interval' is specifed we only consider upscore/downscore
+that happened in the latest 'scoring_interval' days.
+"""
+def censor_score(series, minc, maxc, INTERVAL, scoring_interval=None):
+ upscore = 0
+ downscore = 0
+
+ if scoring_interval is None:
+ scoring_interval = len(series)
+ assert(len(series) >= scoring_interval)
+
+ for i, v in enumerate(series):
+ if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None:
+ in_minc = minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])
+ in_maxc = maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])
+ if (i >= (len(series) - scoring_interval)):
+ downscore += 1 if minc[i] != None and v < in_minc else 0
+ upscore += 1 if maxc[i] != None and v > in_maxc else 0
+
+ return downscore, upscore
+
+def plot_target(tss, TARGET, xtitle, minx, maxx, DAYS=365, INTERV = 7):
+ ctarget = tss.get_country_series(TARGET)
+ c = n_day_rel(ctarget, INTERV)
+ absolute_plot(ctarget[-DAYS:], minx[-DAYS:], maxx[-DAYS:], tss.all_dates[-DAYS:],INTERV, xtitle = xtitle)
+
+def write_censorship_report_prologue(report_file, dates, notification_period):
+ if (notification_period == 1):
+ date_str = "%s" % (dates[-1]) # no need for date range if it's just one day
+ else:
+ date_str = "%s to %s" % (dates[-notification_period], dates[-1])
+
+ prologue = "=======================\n"
+ prologue += "Automatic Censorship Report for %s\n" % (date_str)
+ prologue += "=======================\n\n"
+ report_file.write(prologue)
+
+## Make a league table of censorship + nice graphs
+def plot_all(tss, minx, maxx, INTERV, DAYS=None, rdir="img"):
+ rdir = os.path.realpath(rdir)
+ if not os.path.exists(rdir) or not os.path.isdir(rdir):
+ print "ERROR: %s does not exist or is not a directory." % rdir
+ return
+
+ summary_file = file(os.path.join(rdir, "summary.txt"), "w")
+
+ if DAYS == None:
+ DAYS = 6*31
+
+ s = tss.get_largest(200)
+ scores = []
+ for num, li in s:
+ print ".",
+ ds,us = censor_score(tss.get_country_series(li)[-DAYS:], minx[-DAYS:], maxx[-DAYS:], INTERV)
+ # print ds, us
+ scores += [(ds,num, us, li)]
+ scores.sort()
+ scores.reverse()
+ s = "\n=======================\n"
+ s+= "Report for %s to %s\n" % (tss.all_dates[-DAYS], tss.all_dates[-1])
+ s+= "=======================\n"
+ print s
+ summary_file.write(s)
+ for a,nx, b,c in scores:
+ if a > 0:
+ s = "%s -- down: %2d (up: %2d affected: %s)" % (c, a, b, nx)
+ print s
+ summary_file.write(s + "\n")
+ xtitle = (os.path.join(rdir, "%03d-%s-censor.png" % (a,c)), "Tor report for %s -- down: %2d (up: %2d affected: %s)" % (c, a, b, nx),"")
+ plot_target(tss, c,xtitle, minx, maxx, DAYS, INTERV)
+ summary_file.close()
+
+"""Write a CSV report on the minimum/maximum users of each country per date."""
+def write_all(tss, minc, maxc, RANGES_FILE, INTERVAL=7):
+ ranges_file = file(RANGES_FILE, "w")
+ ranges_file.write("date,country,minusers,maxusers\n")
+ exclude = set(["all", "??", "date"])
+ for c in tss.country_codes:
+ if c in exclude:
+ continue
+ series = tss.get_country_series(c)
+ for i, v in enumerate(series):
+ if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None:
+ minv = minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])
+ maxv = maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])
+ if not minv < maxv:
+ print minv, maxv, series[i-INTERVAL], minc[i], maxc[i]
+ assert minv < maxv
+ ranges_file.write("%s,%s,%s,%s\n" % (tss.all_dates[i], c, minv, maxv))
+ ranges_file.close()
+
+"""Return a URL that points to a graph in metrics.tpo that displays
+the number of direct Tor users in country 'country_code', for a
+'period'-days period.
+
+Let's hope that the metrics.tpo URL scheme doesn't change often.
+"""
+def get_tor_usage_graph_url_for_cc_and_date(country_code, dates, period):
+ url = "https://metrics.torproject.org/users.html?graph=userstats-relay-country&start=%s&end=%s&country=%s&events=on#userstats-relay-country\n" % \
+ (dates[-period], dates[-1], country_code)
+ return url
+
+"""Write a file containing a short censorship report over the last
+'notification_period' days.
+"""
+def write_ml_report(tss, minx, maxx, INTERV, DAYS, notification_period=None):
+ if notification_period is None:
+ notification_period = DAYS
+
+ report_file = codecs.open('short_censorship_report.txt', 'w', 'utf-8')
+ file_prologue_written = False
+
+ s = tss.get_largest(None) # no restrictions, get 'em all.
+ scores = []
+ for num, li in s:
+ ds,us = censor_score(tss.get_country_series(li)[-DAYS:], minx[-DAYS:], maxx[-DAYS:], INTERV, notification_period)
+ scores += [(ds,num, us, li)]
+ scores.sort()
+ scores.reverse()
+
+ for downscores,users_n,upscores,country_code in scores:
+ if (downscores > 0) or (upscores > 0):
+ if not file_prologue_written:
+ write_censorship_report_prologue(report_file, tss.all_dates, notification_period)
+ file_prologue_written = True
+
+ if ((upscores > 0) and (downscores == 0)):
+ s = "We detected an unusual spike of Tor users in %s (%d upscores, %d users):\n" % \
+ (get_country_name_from_cc(country_code), upscores, users_n)
+ else:
+ s = "We detected %d potential censorship events in %s (users: %d, upscores: %d):\n" % \
+ (downscores, get_country_name_from_cc(country_code), users_n, upscores)
+
+ # Also give out a link for the appropriate usage graph for a 90-days period.
+ s += get_tor_usage_graph_url_for_cc_and_date(country_code, tss.all_dates, 90)
+
+ report_file.write(s + "\n")
+
+ report_file.close()
+
+# INTERV is the time interval to model connection rates;
+# consider maximum DAYS days back.
+def detect(CSV_FILE = "userstats-detector.csv",
+ RANGES_FILE = "userstats-ranges.csv", GRAPH_DIR = "img",
+ INTERV = 7, DAYS = 6 * 31, REPORT = True):
+ tss = torstatstore(CSV_FILE)
+ l = tss.get_largest_locations(50)
+ minx, maxx = make_tendencies_minmax(l, INTERV)
+ #plot_all(tss, minx, maxx, INTERV, DAYS, rdir=GRAPH_DIR)
+ write_all(tss, minx, maxx, RANGES_FILE, INTERV)
+
+ if REPORT:
+ # Make our short report; only consider events of the last day
+ write_ml_report(tss, minx, maxx, INTERV, DAYS, 1)
+
+def main():
+ detect()
+
+if __name__ == "__main__":
+ main()
diff --git a/modules/clients/init-userstats.sql b/modules/clients/init-userstats.sql
new file mode 100644
index 0000000..7c5df3d
--- /dev/null
+++ b/modules/clients/init-userstats.sql
@@ -0,0 +1,575 @@
+-- Copyright 2013 The Tor Project
+-- See LICENSE for licensing information
+
+-- Use enum types for dimensions that may only change if we write new code
+-- to support them. For example, if there's a new node type beyond relay
+-- and bridge, we'll have to write code to support it. This is in
+-- contrast to dimensions like country, transport, or version which don't
+-- have their possible values hard-coded anywhere.
+CREATE TYPE node AS ENUM ('relay', 'bridge');
+CREATE TYPE metric AS ENUM ('responses', 'bytes', 'status');
+
+-- All new data first goes into the imported table. The import tool
+-- should do some trivial checks for invalid or duplicate data, but
+-- ultimately, we're going to do these checks in the database. For
+-- example, the import tool could avoid importing data from the same
+-- descriptor more than once, but it's fine to import the same history
+-- string from distinct descriptors multiple times. The import tool must,
+-- however, make sure that stats_end is not greater than 00:00:00 of the
+-- day following stats_start. There are no constraints set on this table,
+-- because importing data should be really, really fast. Once the newly
+-- imported data is successfully processed, the imported table is emptied.
+CREATE TABLE imported (
+
+ -- The 40-character upper-case hex string identifies a descriptor
+ -- uniquely and is used to join metrics (responses, bytes, status)
+ -- published by the same node (relay or bridge).
+ fingerprint CHARACTER(40) NOT NULL,
+
+ -- The node type is used to decide the statistics that this entry will
+ -- be part of.
+ node node NOT NULL,
+
+ -- The metric of this entry describes the stored observation type.
+ -- We'll want to store different metrics published by a node:
+ -- - 'responses' are the number of v3 network status consensus requests
+ -- that the node responded to;
+ -- - 'bytes' are the number of bytes that the node wrote when answering
+ -- directory requests;
+ -- - 'status' are the intervals when the node was listed as running in
+ -- the network status published by either the directory authorities or
+ -- bridge authority.
+ metric metric NOT NULL,
+
+ -- The two-letter lower-case country code that the observation in this
+ -- entry can be attributed to; can be '??' if no country information is
+ -- known for this entry, or '' (empty string) if this entry summarizes
+ -- observations for all countries.
+ country CHARACTER VARYING(2) NOT NULL,
+
+ -- The pluggable transport name that the observation in this entry can
+ -- be attributed to; can be '<OR>' if no pluggable transport was used,
+ -- '<??>' if an unknown pluggable transport was used, or '' (empty
+ -- string) if this entry summarizes observations for all transports.
+ transport CHARACTER VARYING(20) NOT NULL,
+
+ -- The IP address version that the observation in this entry can be
+ -- attributed to; can be 'v4' or 'v6' or '' (empty string) if this entry
+ -- summarizes observations for all IP address versions.
+ version CHARACTER VARYING(2) NOT NULL,
+
+ -- The interval start of this observation.
+ stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+
+ -- The interval end of this observation. This timestamp must be greater
+ -- than stats_start and must not be greater than 00:00:00 of the day
+ -- following stats_start, which the import tool must make sure.
+ stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+
+ -- Finally, the observed value.
+ val DOUBLE PRECISION NOT NULL
+);
+
+-- After importing new data into the imported table, they are merged into
+-- the merged table using the merge() function. The merged table contains
+-- the same data as the imported table, except:
+-- (1) there are no duplicate or overlapping entries in the merged table
+-- with respect to stats_start and stats_end and the same fingerprint,
+-- node, metric, country, transport, and version columns;
+-- (2) all subsequent intervals with the same node, metric, country,
+-- transport, version, and stats_start date are compressed into a
+-- single entry.
+CREATE TABLE merged (
+
+ -- The unique key that is only used when merging newly imported data
+ -- into this table.
+ id SERIAL PRIMARY KEY,
+
+ -- All other columns have the same meaning as in the imported table.
+ fingerprint CHARACTER(40) NOT NULL,
+ node node NOT NULL,
+ metric metric NOT NULL,
+ country CHARACTER VARYING(2) NOT NULL,
+ transport CHARACTER VARYING(20) NOT NULL,
+ version CHARACTER VARYING(2) NOT NULL,
+ stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ val DOUBLE PRECISION NOT NULL
+);
+
+-- After merging new data into the merged table, they are aggregated to
+-- daily user number estimates using the aggregate() function. Only dates
+-- with new data in the imported table will be recomputed in the
+-- aggregated table. The aggregated components follow the algorithm
+-- proposed in Tor Tech Report 2012-10-001.
+CREATE TABLE aggregated (
+
+ -- The date of these aggregated observations.
+ date DATE NOT NULL,
+
+ -- The node, country, transport, and version columns all have the same
+ -- meaning as in the imported table.
+ node node NOT NULL,
+ country CHARACTER VARYING(2) NOT NULL DEFAULT '',
+ transport CHARACTER VARYING(20) NOT NULL DEFAULT '',
+ version CHARACTER VARYING(2) NOT NULL DEFAULT '',
+
+ -- Total number of reported responses, possibly broken down by country,
+ -- transport, or version if either of them is not ''. See r(R) in the
+ -- tech report.
+ rrx DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+ -- Total number of seconds of nodes reporting responses, possibly broken
+ -- down by country, transport, or version if either of them is not ''.
+ -- This would be referred to as n(R) in the tech report, though it's not
+ -- used there.
+ nrx DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+ -- Total number of reported bytes. See h(H) in the tech report.
+ hh DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+ -- Total number of seconds of nodes in the status. See n(N) in the tech
+ -- report.
+ nn DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+ -- Number of reported bytes of nodes that reported both responses and
+ -- bytes. See h(R intersect H) in the tech report.
+ hrh DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+ -- Number of seconds of nodes reporting bytes. See n(H) in the tech
+ -- report.
+ nh DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+ -- Number of seconds of nodes reporting responses but no bytes. See
+ -- n(R \ H) in the tech report.
+ nrh DOUBLE PRECISION NOT NULL DEFAULT 0
+);
+
+CREATE LANGUAGE plpgsql;
+
+-- Merge new entries from the imported table into the merged table, and
+-- compress them while doing so. This function first executes a query to
+-- match all entries in the imported table with adjacent or even
+-- overlapping entries in the merged table. It then loops over query
+-- results and either inserts or updates entries in the merged table. The
+-- idea is to leave query optimization to the database and only touch
+-- as few entries as possible while running this function.
+CREATE OR REPLACE FUNCTION merge() RETURNS VOID AS $$
+DECLARE
+
+ -- The current record that we're handling in the loop body.
+ cur RECORD;
+
+ -- Various information about the last record we processed, so that we
+ -- can merge the current record with the last one if possible.
+ last_fingerprint CHARACTER(40) := NULL;
+ last_node node;
+ last_metric metric;
+ last_country CHARACTER VARYING(2);
+ last_transport CHARACTER VARYING(20);
+ last_version CHARACTER VARYING(2);
+ last_start TIMESTAMP WITHOUT TIME ZONE;
+ last_end TIMESTAMP WITHOUT TIME ZONE;
+ last_id INTEGER;
+ last_val DOUBLE PRECISION;
+
+ -- Interval end and value of the last record before updating them in the
+ -- last loop step. In a few edge cases, we may update an entry and
+ -- learn in the next loop step that the updated entry overlaps with the
+ -- subsequent entry. In these cases we'll have to undo the update,
+ -- which is why we're storing the updated values.
+ undo_end TIMESTAMP WITHOUT TIME ZONE;
+ undo_val DOUBLE PRECISION;
+
+BEGIN
+ RAISE NOTICE '% Starting to merge.', timeofday();
+
+ -- TODO Maybe we'll have to materialize a merged_part table that only
+ -- contains dates IN (SELECT DISTINCT DATE(stats_start) FROM imported)
+ -- and use that in the query below.
+
+ -- Loop over results from a query that joins new entries in the imported
+ -- table with existing entries in the merged table.
+ FOR cur IN SELECT DISTINCT
+
+ -- Select id, interval start and end, and value of the existing entry
+ -- in merged; all these fields may be null if the imported entry is
+ -- not adjacent to an existing one.
+ merged.id AS merged_id,
+ merged.stats_start AS merged_start,
+ merged.stats_end AS merged_end,
+ merged.val AS merged_val,
+
+ -- Select interval start and end and value of the newly imported
+ -- entry.
+ imported.stats_start AS imported_start,
+ imported.stats_end AS imported_end,
+ imported.val AS imported_val,
+
+ -- Select columns that define the group of entries that can be merged
+ -- in the merged table.
+ imported.fingerprint AS fingerprint,
+ imported.node AS node,
+ imported.metric AS metric,
+ imported.country AS country,
+ imported.transport AS transport,
+ imported.version AS version
+
+ -- Select these columns from all entries in the imported table, plus
+ -- do an outer join on the merged table to find adjacent entries that
+ -- we might want to merge the new entries with. It's possible that we
+ -- handle the same imported entry twice, if it starts directly after
+ -- one existing entry and ends directly before another existing entry.
+ FROM imported LEFT JOIN merged
+
+ -- First two join conditions are to find adjacent intervals. In fact,
+ -- we also include overlapping intervals here, so that we can skip the
+ -- overlapping entry in the imported table.
+ ON imported.stats_end >= merged.stats_start AND
+ imported.stats_start <= merged.stats_end AND
+
+ -- Further join conditions are same date, fingerprint, node, etc.,
+ -- so that we don't merge entries that don't belong together.
+ DATE(imported.stats_start) = DATE(merged.stats_start) AND
+ imported.fingerprint = merged.fingerprint AND
+ imported.node = merged.node AND
+ imported.metric = merged.metric AND
+ imported.country = merged.country AND
+ imported.transport = merged.transport AND
+ imported.version = merged.version
+
+ -- Ordering is key, or our approach to merge subsequent entries is
+ -- going to break.
+ ORDER BY imported.fingerprint, imported.node, imported.metric,
+ imported.country, imported.transport, imported.version,
+ imported.stats_start, merged.stats_start, imported.stats_end
+
+ -- Now go through the results one by one.
+ LOOP
+
+ -- Log that we're done with the query and about to start merging.
+ IF last_fingerprint IS NULL THEN
+ RAISE NOTICE '% Query returned, now merging entries.', timeofday();
+ END IF;
+
+ -- If we're processing the very first entry or if we have reached a
+ -- new group of entries that belong together, (re-)set last_*
+ -- variables.
+ IF last_fingerprint IS NULL OR
+ DATE(cur.imported_start) <> DATE(last_start) OR
+ cur.fingerprint <> last_fingerprint OR
+ cur.node <> last_node OR
+ cur.metric <> last_metric OR
+ cur.country <> last_country OR
+ cur.transport <> last_transport OR
+ cur.version <> last_version THEN
+ last_id := -1;
+ last_start := '1970-01-01 00:00:00';
+ last_end := '1970-01-01 00:00:00';
+ last_val := -1;
+ END IF;
+
+ -- Remember all fields that determine the group of which entries
+ -- belong together.
+ last_fingerprint := cur.fingerprint;
+ last_node := cur.node;
+ last_metric := cur.metric;
+ last_country := cur.country;
+ last_transport := cur.transport;
+ last_version := cur.version;
+
+ -- If the existing entry that we're currently looking at starts before
+ -- the previous entry ends, we have created two overlapping entries in
+ -- the last iteration, and that is not allowed. Undo the previous
+ -- change.
+ IF cur.merged_start IS NOT NULL AND
+ cur.merged_start < last_end AND
+ undo_end IS NOT NULL AND undo_val IS NOT NULL THEN
+ UPDATE merged SET stats_end = undo_end, val = undo_val
+ WHERE id = last_id;
+ undo_end := NULL;
+ undo_val := NULL;
+
+ -- If there is no adjacent entry to the one we're about to merge,
+ -- insert it as new entry.
+ ELSIF cur.merged_end IS NULL THEN
+ IF cur.imported_start > last_end THEN
+ last_start := cur.imported_start;
+ last_end := cur.imported_end;
+ last_val := cur.imported_val;
+ INSERT INTO merged (fingerprint, node, metric, country, transport,
+ version, stats_start, stats_end, val)
+ VALUES (last_fingerprint, last_node, last_metric, last_country,
+ last_transport, last_version, last_start, last_end,
+ last_val)
+ RETURNING id INTO last_id;
+
+ -- If there was no adjacent entry before starting to merge, but
+ -- there is now one ending right before the new entry starts, merge
+ -- the new entry into the existing one.
+ ELSIF cur.imported_start = last_end THEN
+ last_val := last_val + cur.imported_val;
+ last_end := cur.imported_end;
+ UPDATE merged SET stats_end = last_end, val = last_val
+ WHERE id = last_id;
+ END IF;
+
+ -- There's no risk of this entry overlapping with the next.
+ undo_end := NULL;
+ undo_val := NULL;
+
+ -- If the new entry ends right when an existing entry starts, but
+ -- there's a gap between when the previously processed entry ends and
+ -- when the new entry starts, merge the new entry with the existing
+ -- entry we're currently looking at.
+ ELSIF cur.imported_end = cur.merged_start THEN
+ IF cur.imported_start > last_end THEN
+ last_id := cur.merged_id;
+ last_start := cur.imported_start;
+ last_end := cur.merged_end;
+ last_val := cur.imported_val + cur.merged_val;
+ UPDATE merged SET stats_start = last_start, val = last_val
+ WHERE id = last_id;
+
+ -- If the new entry ends right when an existing entry starts and
+ -- there's no gap between when the previousl processed entry ends
+ -- and when the new entry starts, merge the new entry with the other
+ -- two entries. This happens by deleting the previous entry and
+ -- expanding the subsequent entry to cover all three entries.
+ ELSIF cur.imported_start = last_end THEN
+ DELETE FROM merged WHERE id = last_id;
+ last_id := cur.merged_id;
+ last_end := cur.merged_end;
+ last_val := last_val + cur.merged_val;
+ UPDATE merged SET stats_start = last_start, val = last_val
+ WHERE id = last_id;
+ END IF;
+
+ -- There's no risk of this entry overlapping with the next.
+ undo_end := NULL;
+ undo_val := NULL;
+
+ -- If the new entry starts right when an existing entry ends, but
+ -- there's a gap between the previously processed entry and the
+ -- existing one, extend the existing entry. There's a special case
+ -- when this operation is false and must be undone, which is when the
+ -- newly added entry overlaps with the subsequent entry. That's why
+ -- we have to store the old interval end and value, so that this
+ -- operation can be undone in the next loop iteration.
+ ELSIF cur.imported_start = cur.merged_end THEN
+ IF last_end < cur.imported_start THEN
+ undo_end := cur.merged_end;
+ undo_val := cur.merged_val;
+ last_id := cur.merged_id;
+ last_start := cur.merged_start;
+ last_end := cur.imported_end;
+ last_val := cur.merged_val + cur.imported_val;
+ UPDATE merged SET stats_end = last_end, val = last_val
+ WHERE id = last_id;
+
+ -- If the new entry starts right when an existing entry ends and
+ -- there's no gap between the previously processed entry and the
+ -- existing entry, extend the existing entry. This is very similar
+ -- to the previous case. The same reasoning about possibly having
+ -- to undo this operation applies.
+ ELSE
+ undo_end := cur.merged_end;
+ undo_val := last_val;
+ last_end := cur.imported_end;
+ last_val := last_val + cur.imported_val;
+ UPDATE merged SET stats_end = last_end, val = last_val
+ WHERE id = last_id;
+ END IF;
+
+ -- If none of the cases above applies, there must have been an overlap
+ -- between the new entry and an existing one. Skip the new entry.
+ ELSE
+ last_id := cur.merged_id;
+ last_start := cur.merged_start;
+ last_end := cur.merged_end;
+ last_val := cur.merged_val;
+ undo_end := NULL;
+ undo_val := NULL;
+ END IF;
+ END LOOP;
+
+ -- That's it, we're done merging.
+ RAISE NOTICE '% Finishing merge.', timeofday();
+ RETURN;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Aggregate user estimates for all dates that have updated entries in the
+-- merged table. This function first creates a temporary table with
+-- new or updated observations, then removes all existing estimates for
+-- the dates to be updated, and finally inserts newly computed aggregates
+-- for these dates.
+CREATE OR REPLACE FUNCTION aggregate() RETURNS VOID AS $$
+BEGIN
+ RAISE NOTICE '% Starting aggregate step.', timeofday();
+
+ -- Create a new temporary table containing all relevant information
+ -- needed to update the aggregated table. In this table, we sum up all
+ -- observations of a given type by reporting node. This query is
+ -- (temporarily) materialized, because we need to combine its entries
+ -- multiple times in various ways. A (non-materialized) view would have
+ -- meant to re-compute this query multiple times.
+ CREATE TEMPORARY TABLE update AS
+ SELECT fingerprint, node, metric, country, transport, version,
+ DATE(stats_start), SUM(val) AS val,
+ SUM(CAST(EXTRACT(EPOCH FROM stats_end - stats_start)
+ AS DOUBLE PRECISION)) AS seconds
+ FROM merged
+ WHERE DATE(stats_start) IN (
+ SELECT DISTINCT DATE(stats_start) FROM imported)
+ GROUP BY fingerprint, node, metric, country, transport, version,
+ DATE(stats_start);
+
+ -- Delete all entries from the aggregated table that we're about to
+ -- re-compute.
+ DELETE FROM aggregated WHERE date IN (SELECT DISTINCT date FROM update);
+
+ -- Insert partly empty results for all existing combinations of date,
+ -- node ('relay' or 'bridge'), country, transport, and version. Only
+ -- the rrx and nrx fields will contain number and seconds of reported
+ -- responses for the given combination of date, node, etc., while the
+ -- other fields will be updated below.
+ INSERT INTO aggregated (date, node, country, transport, version, rrx,
+ nrx)
+ SELECT date, node, country, transport, version, SUM(val) AS rrx,
+ SUM(seconds) AS nrx
+ FROM update WHERE metric = 'responses'
+ GROUP BY date, node, country, transport, version;
+
+ -- Create another temporary table with only those entries that aren't
+ -- broken down by any dimension. This table is much smaller, so the
+ -- following operations are much faster.
+ CREATE TEMPORARY TABLE update_no_dimensions AS
+ SELECT fingerprint, node, metric, date, val, seconds FROM update
+ WHERE country = ''
+ AND transport = ''
+ AND version = '';
+
+ -- Update results in the aggregated table by setting aggregates based
+ -- on reported directory bytes. These aggregates are only based on
+ -- date and node, so that the same values are set for all combinations
+ -- of country, transport, and version.
+ UPDATE aggregated
+ SET hh = aggregated_bytes.hh, nh = aggregated_bytes.nh
+ FROM (
+ SELECT date, node, SUM(val) AS hh, SUM(seconds) AS nh
+ FROM update_no_dimensions
+ WHERE metric = 'bytes'
+ GROUP BY date, node
+ ) aggregated_bytes
+ WHERE aggregated.date = aggregated_bytes.date
+ AND aggregated.node = aggregated_bytes.node;
+
+ -- Update results based on nodes being contained in the network status.
+ UPDATE aggregated
+ SET nn = aggregated_status.nn
+ FROM (
+ SELECT date, node, SUM(seconds) AS nn
+ FROM update_no_dimensions
+ WHERE metric = 'status'
+ GROUP BY date, node
+ ) aggregated_status
+ WHERE aggregated.date = aggregated_status.date
+ AND aggregated.node = aggregated_status.node;
+
+ -- Update results based on nodes reporting both bytes and responses.
+ UPDATE aggregated
+ SET hrh = aggregated_bytes_responses.hrh
+ FROM (
+ SELECT bytes.date, bytes.node,
+ SUM((LEAST(bytes.seconds, responses.seconds)
+ * bytes.val) / bytes.seconds) AS hrh
+ FROM update_no_dimensions bytes
+ LEFT JOIN update_no_dimensions responses
+ ON bytes.date = responses.date
+ AND bytes.fingerprint = responses.fingerprint
+ AND bytes.node = responses.node
+ WHERE bytes.metric = 'bytes'
+ AND responses.metric = 'responses'
+ GROUP BY bytes.date, bytes.node
+ ) aggregated_bytes_responses
+ WHERE aggregated.date = aggregated_bytes_responses.date
+ AND aggregated.node = aggregated_bytes_responses.node;
+
+ -- Update results based on notes reporting responses but no bytes.
+ UPDATE aggregated
+ SET nrh = aggregated_responses_bytes.nrh
+ FROM (
+ SELECT responses.date, responses.node,
+ SUM(GREATEST(0, responses.seconds
+ - COALESCE(bytes.seconds, 0))) AS nrh
+ FROM update_no_dimensions responses
+ LEFT JOIN update_no_dimensions bytes
+ ON responses.date = bytes.date
+ AND responses.fingerprint = bytes.fingerprint
+ AND responses.node = bytes.node
+ WHERE responses.metric = 'responses'
+ AND bytes.metric = 'bytes'
+ GROUP BY responses.date, responses.node
+ ) aggregated_responses_bytes
+ WHERE aggregated.date = aggregated_responses_bytes.date
+ AND aggregated.node = aggregated_responses_bytes.node;
+
+ -- We're done aggregating new data.
+ RAISE NOTICE '% Finishing aggregate step.', timeofday();
+ RETURN;
+END;
+$$ LANGUAGE plpgsql;
+
+-- User-friendly view on the aggregated table that implements the
+-- algorithm proposed in Tor Tech Report 2012-10-001. This view returns
+-- user number estimates for both relay and bridge staistics, possibly
+-- broken down by country or transport or version.
+CREATE OR REPLACE VIEW estimated AS SELECT
+
+ -- The date of this user number estimate.
+ a.date,
+
+ -- The node type, which is either 'relay' or 'bridge'.
+ a.node,
+
+ -- The two-letter lower-case country code of this estimate; can be '??'
+ -- for an estimate of users that could not be resolved to any country,
+ -- or '' (empty string) for an estimate of all users, regardless of
+ -- country.
+ a.country,
+
+ -- The pluggable transport name of this estimate; can be '<OR>' for an
+ -- estimate of users that did not use any pluggable transport, '<??>'
+ -- for unknown pluggable transports, or '' (empty string) for an
+ -- estimate of all users, regardless of transport.
+ a.transport,
+
+ -- The IP address version of this estimate; can be 'v4' or 'v6', or ''
+ -- (empty string) for an estimate of all users, regardless of IP address
+ -- version.
+ a.version,
+
+ -- Estimated fraction of nodes reporting directory requests, which is
+ -- used to extrapolate observed requests to estimated total requests in
+ -- the network. The closer this fraction is to 1.0, the more precise
+ -- the estimation.
+ CAST(a.frac * 100 AS INTEGER) AS frac,
+
+ -- Finally, the estimate number of users.
+ CAST(a.rrx / (a.frac * 10) AS INTEGER) AS users
+
+ -- Implement the estimation method in a subquery, so that the ugly
+ -- formula only has to be written once.
+ FROM (
+ SELECT date, node, country, transport, version, rrx, nrx,
+ (hrh * nh + hh * nrh) / (hh * nn) AS frac
+ FROM aggregated WHERE hh * nn > 0.0) a
+
+ -- Only include estimates with at least 10% of nodes reporting directory
+ -- request statistics.
+ WHERE a.frac BETWEEN 0.1 AND 1.0
+
+ -- Order results.
+ ORDER BY date DESC, node, version, transport, country;
+
diff --git a/modules/clients/merge-clients.R b/modules/clients/merge-clients.R
new file mode 100644
index 0000000..cce7e9d
--- /dev/null
+++ b/modules/clients/merge-clients.R
@@ -0,0 +1,19 @@
+require(reshape)
+r <- read.csv("userstats-ranges.csv", stringsAsFactors = FALSE)
+r <- melt(r, id.vars = c("date", "country"))
+r <- data.frame(date = r$date, node = "relay", country = r$country,
+ transport = "", version = "",
+ variable = ifelse(r$variable == "maxusers", "upper", "lower"),
+ value = floor(r$value))
+u <- read.csv("userstats.csv", stringsAsFactors = FALSE)
+u <- melt(u, id.vars = c("date", "node", "country", "transport",
+ "version"))
+u <- data.frame(date = u$date, node = u$node, country = u$country,
+ transport = u$transport, version = u$version,
+ variable = ifelse(u$variable == "frac", "frac", "clients"),
+ value = u$value)
+c <- rbind(r, u)
+c <- cast(c, date + node + country + transport + version ~ variable)
+c <- c[order(as.Date(c$date), c$node, c$country, c$transport, c$version), ]
+write.csv(c, "clients.csv", quote = FALSE, row.names = FALSE, na = "")
+
diff --git a/modules/clients/src/org/torproject/metrics/clients/Main.java b/modules/clients/src/org/torproject/metrics/clients/Main.java
new file mode 100644
index 0000000..2e6712b
--- /dev/null
+++ b/modules/clients/src/org/torproject/metrics/clients/Main.java
@@ -0,0 +1,465 @@
+/* Copyright 2013 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.metrics.clients;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TimeZone;
+import java.util.TreeMap;
+
+import org.torproject.descriptor.BandwidthHistory;
+import org.torproject.descriptor.BridgeNetworkStatus;
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorFile;
+import org.torproject.descriptor.DescriptorReader;
+import org.torproject.descriptor.DescriptorSourceFactory;
+import org.torproject.descriptor.ExtraInfoDescriptor;
+import org.torproject.descriptor.NetworkStatusEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
+
+public class Main {
+
+ public static void main(String[] args) throws Exception {
+ parseArgs(args);
+ parseRelayDescriptors();
+ parseBridgeDescriptors();
+ closeOutputFiles();
+ }
+
+ private static boolean writeToSingleFile = true;
+ private static boolean byStatsDateNotByDescHour = false;
+
+ private static void parseArgs(String[] args) {
+ if (args.length == 0) {
+ writeToSingleFile = true;
+ } else if (args.length == 1 && args[0].equals("--stats-date")) {
+ writeToSingleFile = false;
+ byStatsDateNotByDescHour = true;
+ } else if (args.length == 1 && args[0].equals("--desc-hour")) {
+ writeToSingleFile = false;
+ byStatsDateNotByDescHour = false;
+ } else {
+ System.err.println("Usage: java " + Main.class.getName()
+ + " [ --stats-date | --desc-hour ]");
+ System.exit(1);
+ }
+ }
+
+ private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L,
+ ONE_DAY_MILLIS = 24L * ONE_HOUR_MILLIS,
+ ONE_WEEK_MILLIS = 7L * ONE_DAY_MILLIS;
+
+ private static void parseRelayDescriptors() throws Exception {
+ DescriptorReader descriptorReader =
+ DescriptorSourceFactory.createDescriptorReader();
+ descriptorReader.setExcludeFiles(new File(
+ "status/relay-descriptors"));
+ descriptorReader.addDirectory(new File(
+ "../../shared/in/recent/relay-descriptors"));
+ Iterator<DescriptorFile> descriptorFiles =
+ descriptorReader.readDescriptors();
+ while (descriptorFiles.hasNext()) {
+ DescriptorFile descriptorFile = descriptorFiles.next();
+ for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+ if (descriptor instanceof ExtraInfoDescriptor) {
+ parseRelayExtraInfoDescriptor((ExtraInfoDescriptor) descriptor);
+ } else if (descriptor instanceof RelayNetworkStatusConsensus) {
+ parseRelayNetworkStatusConsensus(
+ (RelayNetworkStatusConsensus) descriptor);
+ }
+ }
+ }
+ }
+
+ private static void parseRelayExtraInfoDescriptor(
+ ExtraInfoDescriptor descriptor) throws IOException {
+ long publishedMillis = descriptor.getPublishedMillis();
+ String fingerprint = descriptor.getFingerprint().
+ toUpperCase();
+ long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
+ long dirreqStatsIntervalLengthMillis =
+ descriptor.getDirreqStatsIntervalLength() * 1000L;
+ SortedMap<String, Integer> requests = descriptor.getDirreqV3Reqs();
+ BandwidthHistory dirreqWriteHistory =
+ descriptor.getDirreqWriteHistory();
+ parseRelayDirreqV3Reqs(fingerprint, publishedMillis,
+ dirreqStatsEndMillis, dirreqStatsIntervalLengthMillis, requests);
+ parseRelayDirreqWriteHistory(fingerprint, publishedMillis,
+ dirreqWriteHistory);
+ }
+
+ private static void parseRelayDirreqV3Reqs(String fingerprint,
+ long publishedMillis, long dirreqStatsEndMillis,
+ long dirreqStatsIntervalLengthMillis,
+ SortedMap<String, Integer> requests) throws IOException {
+ if (requests == null ||
+ publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS ||
+ dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) {
+ /* Cut off all observations that are one week older than
+ * the descriptor publication time, or we'll have to update
+ * weeks of aggregate values every hour. */
+ return;
+ }
+ long statsStartMillis = dirreqStatsEndMillis
+ - dirreqStatsIntervalLengthMillis;
+ long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS)
+ * ONE_DAY_MILLIS;
+ for (int i = 0; i < 2; i++) {
+ long fromMillis = i == 0 ? statsStartMillis
+ : utcBreakMillis;
+ long toMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis;
+ if (fromMillis >= toMillis) {
+ continue;
+ }
+ double intervalFraction = ((double) (toMillis - fromMillis))
+ / ((double) dirreqStatsIntervalLengthMillis);
+ double sum = 0L;
+ for (Map.Entry<String, Integer> e : requests.entrySet()) {
+ String country = e.getKey();
+ double reqs = ((double) e.getValue()) - 4.0;
+ sum += reqs;
+ writeOutputLine(fingerprint, "relay", "responses", country,
+ "", "", fromMillis, toMillis, reqs * intervalFraction,
+ publishedMillis);
+ }
+ writeOutputLine(fingerprint, "relay", "responses", "", "",
+ "", fromMillis, toMillis, sum * intervalFraction,
+ publishedMillis);
+ }
+ }
+
+ private static void parseRelayDirreqWriteHistory(String fingerprint,
+ long publishedMillis, BandwidthHistory dirreqWriteHistory)
+ throws IOException {
+ if (dirreqWriteHistory == null ||
+ publishedMillis - dirreqWriteHistory.getHistoryEndMillis()
+ > ONE_WEEK_MILLIS) {
+ return;
+ /* Cut off all observations that are one week older than
+ * the descriptor publication time, or we'll have to update
+ * weeks of aggregate values every hour. */
+ }
+ long intervalLengthMillis =
+ dirreqWriteHistory.getIntervalLength() * 1000L;
+ for (Map.Entry<Long, Long> e :
+ dirreqWriteHistory.getBandwidthValues().entrySet()) {
+ long intervalEndMillis = e.getKey();
+ long intervalStartMillis =
+ intervalEndMillis - intervalLengthMillis;
+ for (int i = 0; i < 2; i++) {
+ long fromMillis = intervalStartMillis;
+ long toMillis = intervalEndMillis;
+ double writtenBytes = (double) e.getValue();
+ if (intervalStartMillis / ONE_DAY_MILLIS <
+ intervalEndMillis / ONE_DAY_MILLIS) {
+ long utcBreakMillis = (intervalEndMillis
+ / ONE_DAY_MILLIS) * ONE_DAY_MILLIS;
+ if (i == 0) {
+ toMillis = utcBreakMillis;
+ } else if (i == 1) {
+ fromMillis = utcBreakMillis;
+ }
+ double intervalFraction = ((double) (toMillis - fromMillis))
+ / ((double) intervalLengthMillis);
+ writtenBytes *= intervalFraction;
+ } else if (i == 1) {
+ break;
+ }
+ writeOutputLine(fingerprint, "relay", "bytes", "", "", "",
+ fromMillis, toMillis, writtenBytes, publishedMillis);
+ }
+ }
+ }
+
+ private static void parseRelayNetworkStatusConsensus(
+ RelayNetworkStatusConsensus consensus) throws IOException {
+ long fromMillis = consensus.getValidAfterMillis();
+ long toMillis = consensus.getFreshUntilMillis();
+ for (NetworkStatusEntry statusEntry :
+ consensus.getStatusEntries().values()) {
+ String fingerprint = statusEntry.getFingerprint().
+ toUpperCase();
+ if (statusEntry.getFlags().contains("Running")) {
+ writeOutputLine(fingerprint, "relay", "status", "", "", "",
+ fromMillis, toMillis, 0.0, fromMillis);
+ }
+ }
+ }
+
+ private static void parseBridgeDescriptors() throws Exception {
+ DescriptorReader descriptorReader =
+ DescriptorSourceFactory.createDescriptorReader();
+ descriptorReader.setExcludeFiles(new File(
+ "status/bridge-descriptors"));
+ descriptorReader.addDirectory(new File(
+ "../../shared/in/recent/bridge-descriptors"));
+ Iterator<DescriptorFile> descriptorFiles =
+ descriptorReader.readDescriptors();
+ while (descriptorFiles.hasNext()) {
+ DescriptorFile descriptorFile = descriptorFiles.next();
+ for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+ if (descriptor instanceof ExtraInfoDescriptor) {
+ parseBridgeExtraInfoDescriptor(
+ (ExtraInfoDescriptor) descriptor);
+ } else if (descriptor instanceof BridgeNetworkStatus) {
+ parseBridgeNetworkStatus((BridgeNetworkStatus) descriptor);
+ }
+ }
+ }
+ }
+
+ private static void parseBridgeExtraInfoDescriptor(
+ ExtraInfoDescriptor descriptor) throws IOException {
+ String fingerprint = descriptor.getFingerprint().toUpperCase();
+ long publishedMillis = descriptor.getPublishedMillis();
+ long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
+ long dirreqStatsIntervalLengthMillis =
+ descriptor.getDirreqStatsIntervalLength() * 1000L;
+ parseBridgeDirreqV3Resp(fingerprint, publishedMillis,
+ dirreqStatsEndMillis, dirreqStatsIntervalLengthMillis,
+ descriptor.getDirreqV3Resp(),
+ descriptor.getBridgeIps(),
+ descriptor.getBridgeIpTransports(),
+ descriptor.getBridgeIpVersions());
+
+ parseBridgeDirreqWriteHistory(fingerprint, publishedMillis,
+ descriptor.getDirreqWriteHistory());
+ }
+
+ private static void parseBridgeDirreqV3Resp(String fingerprint,
+ long publishedMillis, long dirreqStatsEndMillis,
+ long dirreqStatsIntervalLengthMillis,
+ SortedMap<String, Integer> responses,
+ SortedMap<String, Integer> bridgeIps,
+ SortedMap<String, Integer> bridgeIpTransports,
+ SortedMap<String, Integer> bridgeIpVersions) throws IOException {
+ if (responses == null ||
+ publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS ||
+ dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) {
+ /* Cut off all observations that are one week older than
+ * the descriptor publication time, or we'll have to update
+ * weeks of aggregate values every hour. */
+ return;
+ }
+ long statsStartMillis = dirreqStatsEndMillis
+ - dirreqStatsIntervalLengthMillis;
+ long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS)
+ * ONE_DAY_MILLIS;
+ double resp = ((double) responses.get("ok")) - 4.0;
+ if (resp > 0.0) {
+ for (int i = 0; i < 2; i++) {
+ long fromMillis = i == 0 ? statsStartMillis
+ : utcBreakMillis;
+ long toMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis;
+ if (fromMillis >= toMillis) {
+ continue;
+ }
+ double intervalFraction = ((double) (toMillis - fromMillis))
+ / ((double) dirreqStatsIntervalLengthMillis);
+ writeOutputLine(fingerprint, "bridge", "responses", "", "",
+ "", fromMillis, toMillis, resp * intervalFraction,
+ publishedMillis);
+ parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
+ dirreqStatsIntervalLengthMillis, "country", bridgeIps,
+ publishedMillis);
+ parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
+ dirreqStatsIntervalLengthMillis, "transport",
+ bridgeIpTransports, publishedMillis);
+ parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
+ dirreqStatsIntervalLengthMillis, "version", bridgeIpVersions,
+ publishedMillis);
+ }
+ }
+ }
+
+ private static void parseBridgeRespByCategory(String fingerprint,
+ long fromMillis, long toMillis, double resp,
+ long dirreqStatsIntervalLengthMillis, String category,
+ SortedMap<String, Integer> frequencies, long publishedMillis)
+ throws IOException {
+ double total = 0.0;
+ SortedMap<String, Double> frequenciesCopy =
+ new TreeMap<String, Double>();
+ if (frequencies != null) {
+ for (Map.Entry<String, Integer> e : frequencies.entrySet()) {
+ if (e.getValue() < 4.0) {
+ continue;
+ }
+ double r = ((double) e.getValue()) - 4.0;
+ frequenciesCopy.put(e.getKey(), r);
+ total += r;
+ }
+ }
+ /* If we're not told any frequencies, or at least none of them are
+ * greater than 4, put in a default that we'll attribute all responses
+ * to. */
+ if (total == 0) {
+ if (category.equals("country")) {
+ frequenciesCopy.put("??", 4.0);
+ } else if (category.equals("transport")) {
+ frequenciesCopy.put("<OR>", 4.0);
+ } else if (category.equals("version")) {
+ frequenciesCopy.put("v4", 4.0);
+ }
+ total = 4.0;
+ }
+ for (Map.Entry<String, Double> e : frequenciesCopy.entrySet()) {
+ double intervalFraction = ((double) (toMillis - fromMillis))
+ / ((double) dirreqStatsIntervalLengthMillis);
+ double val = resp * intervalFraction * e.getValue() / total;
+ if (category.equals("country")) {
+ writeOutputLine(fingerprint, "bridge", "responses", e.getKey(),
+ "", "", fromMillis, toMillis, val, publishedMillis);
+ } else if (category.equals("transport")) {
+ writeOutputLine(fingerprint, "bridge", "responses", "",
+ e.getKey(), "", fromMillis, toMillis, val, publishedMillis);
+ } else if (category.equals("version")) {
+ writeOutputLine(fingerprint, "bridge", "responses", "", "",
+ e.getKey(), fromMillis, toMillis, val, publishedMillis);
+ }
+ }
+ }
+
+ private static void parseBridgeDirreqWriteHistory(String fingerprint,
+ long publishedMillis, BandwidthHistory dirreqWriteHistory)
+ throws IOException {
+ if (dirreqWriteHistory == null ||
+ publishedMillis - dirreqWriteHistory.getHistoryEndMillis()
+ > ONE_WEEK_MILLIS) {
+ /* Cut off all observations that are one week older than
+ * the descriptor publication time, or we'll have to update
+ * weeks of aggregate values every hour. */
+ return;
+ }
+ long intervalLengthMillis =
+ dirreqWriteHistory.getIntervalLength() * 1000L;
+ for (Map.Entry<Long, Long> e :
+ dirreqWriteHistory.getBandwidthValues().entrySet()) {
+ long intervalEndMillis = e.getKey();
+ long intervalStartMillis =
+ intervalEndMillis - intervalLengthMillis;
+ for (int i = 0; i < 2; i++) {
+ long fromMillis = intervalStartMillis;
+ long toMillis = intervalEndMillis;
+ double writtenBytes = (double) e.getValue();
+ if (intervalStartMillis / ONE_DAY_MILLIS <
+ intervalEndMillis / ONE_DAY_MILLIS) {
+ long utcBreakMillis = (intervalEndMillis
+ / ONE_DAY_MILLIS) * ONE_DAY_MILLIS;
+ if (i == 0) {
+ toMillis = utcBreakMillis;
+ } else if (i == 1) {
+ fromMillis = utcBreakMillis;
+ }
+ double intervalFraction = ((double) (toMillis - fromMillis))
+ / ((double) intervalLengthMillis);
+ writtenBytes *= intervalFraction;
+ } else if (i == 1) {
+ break;
+ }
+ writeOutputLine(fingerprint, "bridge", "bytes", "",
+ "", "", fromMillis, toMillis, writtenBytes, publishedMillis);
+ }
+ }
+ }
+
+ private static void parseBridgeNetworkStatus(BridgeNetworkStatus status)
+ throws IOException {
+ long publishedMillis = status.getPublishedMillis();
+ long fromMillis = (publishedMillis / ONE_HOUR_MILLIS)
+ * ONE_HOUR_MILLIS;
+ long toMillis = fromMillis + ONE_HOUR_MILLIS;
+ for (NetworkStatusEntry statusEntry :
+ status.getStatusEntries().values()) {
+ String fingerprint = statusEntry.getFingerprint().
+ toUpperCase();
+ if (statusEntry.getFlags().contains("Running")) {
+ writeOutputLine(fingerprint, "bridge", "status", "", "", "",
+ fromMillis, toMillis, 0.0, publishedMillis);
+ }
+ }
+ }
+
+ private static Map<String, BufferedWriter> openOutputFiles =
+ new HashMap<String, BufferedWriter>();
+ private static void writeOutputLine(String fingerprint, String node,
+ String metric, String country, String transport, String version,
+ long fromMillis, long toMillis, double val, long publishedMillis)
+ throws IOException {
+ if (fromMillis > toMillis) {
+ return;
+ }
+ String fromDateTime = formatDateTimeMillis(fromMillis);
+ String toDateTime = formatDateTimeMillis(toMillis);
+ BufferedWriter bw = getOutputFile(fromDateTime, publishedMillis);
+ bw.write(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%.1f\n",
+ fingerprint, node, metric, country, transport, version,
+ fromDateTime, toDateTime, val));
+ }
+
+ private static SimpleDateFormat dateTimeFormat = null;
+ private static String formatDateTimeMillis(long millis) {
+ if (dateTimeFormat == null) {
+ dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeFormat.setLenient(false);
+ dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+ return dateTimeFormat.format(millis);
+ }
+
+ private static BufferedWriter getOutputFile(String fromDateTime,
+ long publishedMillis) throws IOException {
+ String outputFileName;
+ if (writeToSingleFile) {
+ outputFileName = "out/userstats.sql";
+ } else if (byStatsDateNotByDescHour) {
+ outputFileName = "out/userstats-" + fromDateTime.substring(0, 10)
+ + ".sql";
+ } else {
+ String publishedHourDateTime = formatDateTimeMillis(
+ (publishedMillis / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS);
+ outputFileName = "out/userstats-"
+ + publishedHourDateTime.substring(0, 10) + "-"
+ + publishedHourDateTime.substring(11, 13) + ".sql";
+ }
+ BufferedWriter bw = openOutputFiles.get(outputFileName);
+ if (bw == null) {
+ bw = openOutputFile(outputFileName);
+ openOutputFiles.put(outputFileName, bw);
+ }
+ return bw;
+ }
+
+ private static BufferedWriter openOutputFile(String outputFileName)
+ throws IOException {
+ File outputFile = new File(outputFileName);
+ outputFile.getParentFile().mkdirs();
+ BufferedWriter bw = new BufferedWriter(new FileWriter(
+ outputFileName));
+ bw.write("BEGIN;\n");
+ bw.write("LOCK TABLE imported NOWAIT;\n");
+ bw.write("COPY imported (fingerprint, node, metric, country, "
+ + "transport, version, stats_start, stats_end, val) FROM "
+ + "stdin;\n");
+ return bw;
+ }
+
+ private static void closeOutputFiles() throws IOException {
+ for (BufferedWriter bw : openOutputFiles.values()) {
+ bw.write("\\.\n");
+ bw.write("SELECT merge();\n");
+ bw.write("SELECT aggregate();\n");
+ bw.write("TRUNCATE imported;\n");
+ bw.write("COMMIT;\n");
+ bw.close();
+ }
+ }
+}
+
diff --git a/modules/clients/test-userstats.sql b/modules/clients/test-userstats.sql
new file mode 100644
index 0000000..66f8b82
--- /dev/null
+++ b/modules/clients/test-userstats.sql
@@ -0,0 +1,478 @@
+BEGIN;
+SET search_path TO tap, public;
+SELECT plan(152);
+SET client_min_messages = warning;
+
+-- Make sure enums are as expected.
+SELECT has_enum('node');
+SELECT enum_has_labels('node', ARRAY['relay', 'bridge']);
+SELECT has_enum('metric');
+SELECT enum_has_labels('metric', ARRAY['responses', 'bytes', 'status']);
+
+-- Make sure that the imported table is exactly as the importer expects
+-- it.
+SELECT has_table('imported');
+SELECT has_column('imported', 'fingerprint');
+SELECT col_type_is('imported', 'fingerprint', 'CHARACTER(40)');
+SELECT col_not_null('imported', 'fingerprint');
+SELECT has_column('imported', 'node');
+SELECT col_type_is('imported', 'node', 'node');
+SELECT col_not_null('imported', 'node');
+SELECT has_column('imported', 'metric');
+SELECT col_type_is('imported', 'metric', 'metric');
+SELECT col_not_null('imported', 'metric');
+SELECT has_column('imported', 'country');
+SELECT col_type_is('imported', 'country', 'CHARACTER VARYING(2)');
+SELECT col_not_null('imported', 'country');
+SELECT has_column('imported', 'transport');
+SELECT col_type_is('imported', 'transport', 'CHARACTER VARYING(20)');
+SELECT col_not_null('imported', 'transport');
+SELECT has_column('imported', 'version');
+SELECT col_type_is('imported', 'version', 'CHARACTER VARYING(2)');
+SELECT col_not_null('imported', 'version');
+SELECT has_column('imported', 'stats_start');
+SELECT col_type_is('imported', 'stats_start',
+ 'TIMESTAMP WITHOUT TIME ZONE');
+SELECT col_not_null('imported', 'stats_start');
+SELECT has_column('imported', 'stats_end');
+SELECT col_type_is('imported', 'stats_end',
+ 'TIMESTAMP WITHOUT TIME ZONE');
+SELECT col_not_null('imported', 'stats_end');
+SELECT has_column('imported', 'val');
+SELECT col_type_is('imported', 'val', 'DOUBLE PRECISION');
+SELECT col_not_null('imported', 'val');
+SELECT hasnt_pk('imported');
+
+-- Make sure that the internally-used merged table is exactly as merge()
+-- expects it.
+SELECT has_table('merged');
+SELECT has_column('merged', 'id');
+SELECT col_type_is('merged', 'id', 'INTEGER');
+SELECT col_is_pk('merged', 'id');
+SELECT has_column('merged', 'fingerprint');
+SELECT col_type_is('merged', 'fingerprint', 'CHARACTER(40)');
+SELECT col_not_null('merged', 'fingerprint');
+SELECT has_column('merged', 'node');
+SELECT col_type_is('merged', 'node', 'node');
+SELECT col_not_null('merged', 'node');
+SELECT has_column('merged', 'metric');
+SELECT col_type_is('merged', 'metric', 'metric');
+SELECT col_not_null('merged', 'metric');
+SELECT has_column('merged', 'country');
+SELECT col_type_is('merged', 'country', 'CHARACTER VARYING(2)');
+SELECT col_not_null('merged', 'country');
+SELECT has_column('merged', 'transport');
+SELECT col_type_is('merged', 'transport', 'CHARACTER VARYING(20)');
+SELECT col_not_null('merged', 'transport');
+SELECT has_column('merged', 'version');
+SELECT col_type_is('merged', 'version', 'CHARACTER VARYING(2)');
+SELECT col_not_null('merged', 'version');
+SELECT has_column('merged', 'stats_start');
+SELECT col_type_is('merged', 'stats_start',
+ 'TIMESTAMP WITHOUT TIME ZONE');
+SELECT col_not_null('merged', 'stats_start');
+SELECT has_column('merged', 'stats_end');
+SELECT col_type_is('merged', 'stats_end',
+ 'TIMESTAMP WITHOUT TIME ZONE');
+SELECT col_not_null('merged', 'stats_end');
+SELECT has_column('merged', 'val');
+SELECT col_type_is('merged', 'val', 'DOUBLE PRECISION');
+SELECT col_not_null('merged', 'val');
+
+-- Make sure that the internally-used aggregated table is exactly as
+-- aggregate() expects it.
+SELECT has_table('aggregated');
+SELECT has_column('aggregated', 'date');
+SELECT col_type_is('aggregated', 'date', 'DATE');
+SELECT col_not_null('aggregated', 'date');
+SELECT has_column('aggregated', 'node');
+SELECT col_type_is('aggregated', 'node', 'node');
+SELECT col_not_null('aggregated', 'node');
+SELECT has_column('aggregated', 'country');
+SELECT col_type_is('aggregated', 'country', 'CHARACTER VARYING(2)');
+SELECT col_not_null('aggregated', 'country');
+SELECT col_default_is('aggregated', 'country', '');
+SELECT has_column('aggregated', 'transport');
+SELECT col_type_is('aggregated', 'transport', 'CHARACTER VARYING(20)');
+SELECT col_not_null('aggregated', 'transport');
+SELECT col_default_is('aggregated', 'transport', '');
+SELECT has_column('aggregated', 'version');
+SELECT col_type_is('aggregated', 'version', 'CHARACTER VARYING(2)');
+SELECT col_not_null('aggregated', 'version');
+SELECT col_default_is('aggregated', 'version', '');
+SELECT has_column('aggregated', 'rrx');
+SELECT col_type_is('aggregated', 'rrx', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'rrx');
+SELECT col_default_is('aggregated', 'rrx', 0);
+SELECT has_column('aggregated', 'nrx');
+SELECT col_type_is('aggregated', 'nrx', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'nrx');
+SELECT col_default_is('aggregated', 'nrx', 0);
+SELECT has_column('aggregated', 'hh');
+SELECT col_type_is('aggregated', 'hh', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'hh');
+SELECT col_default_is('aggregated', 'hh', 0);
+SELECT has_column('aggregated', 'nn');
+SELECT col_type_is('aggregated', 'nn', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'nn');
+SELECT col_default_is('aggregated', 'nn', 0);
+SELECT has_column('aggregated', 'hrh');
+SELECT col_type_is('aggregated', 'hrh', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'hrh');
+SELECT col_default_is('aggregated', 'hrh', 0);
+SELECT has_column('aggregated', 'nh');
+SELECT col_type_is('aggregated', 'nh', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'nh');
+SELECT col_default_is('aggregated', 'nh', 0);
+SELECT has_column('aggregated', 'nrh');
+SELECT col_type_is('aggregated', 'nrh', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'nrh');
+SELECT col_default_is('aggregated', 'nrh', 0);
+
+-- Create temporary tables that hide the actual tables, so that we don't
+-- have to care about existing data, not even in a transaction that we're
+-- going to roll back. Temporarily set log level to warning to avoid
+-- messages about implicitly created sequences and indexes.
+CREATE TEMPORARY TABLE imported (
+ fingerprint CHARACTER(40) NOT NULL,
+ node node NOT NULL,
+ metric metric NOT NULL,
+ country CHARACTER VARYING(2) NOT NULL,
+ transport CHARACTER VARYING(20) NOT NULL,
+ version CHARACTER VARYING(2) NOT NULL,
+ stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ val DOUBLE PRECISION NOT NULL
+);
+CREATE TEMPORARY TABLE merged (
+ id SERIAL PRIMARY KEY,
+ fingerprint CHARACTER(40) NOT NULL,
+ node node NOT NULL,
+ metric metric NOT NULL,
+ country CHARACTER VARYING(2) NOT NULL,
+ transport CHARACTER VARYING(20) NOT NULL,
+ version CHARACTER VARYING(2) NOT NULL,
+ stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ val DOUBLE PRECISION NOT NULL
+);
+CREATE TEMPORARY TABLE aggregated (
+ date DATE NOT NULL,
+ node node NOT NULL,
+ country CHARACTER VARYING(2) NOT NULL DEFAULT '',
+ transport CHARACTER VARYING(20) NOT NULL DEFAULT '',
+ version CHARACTER VARYING(2) NOT NULL DEFAULT '',
+ rrx DOUBLE PRECISION NOT NULL DEFAULT 0,
+ nrx DOUBLE PRECISION NOT NULL DEFAULT 0,
+ hh DOUBLE PRECISION NOT NULL DEFAULT 0,
+ nn DOUBLE PRECISION NOT NULL DEFAULT 0,
+ hrh DOUBLE PRECISION NOT NULL DEFAULT 0,
+ nh DOUBLE PRECISION NOT NULL DEFAULT 0,
+ nrh DOUBLE PRECISION NOT NULL DEFAULT 0
+);
+
+-- Test merging newly imported data.
+PREPARE new_imported(TIMESTAMP WITHOUT TIME ZONE,
+ TIMESTAMP WITHOUT TIME ZONE) AS INSERT INTO imported
+ (fingerprint, node, metric, country, transport, version, stats_start,
+ stats_end, val) VALUES ('1234567890123456789012345678901234567890',
+ 'relay', 'status', '', '', '', $1, $2, 0);
+PREPARE new_merged(TIMESTAMP WITHOUT TIME ZONE,
+ TIMESTAMP WITHOUT TIME ZONE) AS INSERT INTO merged
+ (fingerprint, node, metric, country, transport, version, stats_start,
+ stats_end, val) VALUES ('1234567890123456789012345678901234567890',
+ 'relay', 'status', '', '', '', $1, $2, 0);
+
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should insert new entry into empty table as is');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 14:00:00');
+EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 14:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+ ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should insert two non-contiguous entries');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 15:00:00');
+EXECUTE new_imported('2013-04-11 15:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should merge two contiguous entries');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts before and ends after the start of ' ||
+ 'another new entry');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 15:00:00');
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts at and ends after the start of ' ||
+ 'another new entry');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts after another new entry starts and ' ||
+ 'ends before that entry ends');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that has same start and end as another new entry');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 16:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts before and ends at the end of ' ||
+ 'another new entry');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 16:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+ ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should insert entry that ends before existing entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 15:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should merge entry that ends when existing entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 14:30:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_start FROM merged',
+ $$VALUES ('2013-04-11 14:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts before but ends after existing entry ' ||
+ 'starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 11:00:00', '2013-04-11 13:00:00');
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 15:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 13:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+ ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts when existing entry ends but ' ||
+ 'ends before another entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts when existing entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 15:00:00', '2013-04-11 16:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts after and ends before existing entry');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that is already contained');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that ends when existing entry ends');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 18:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts before but ends after existing entry ' ||
+ 'ends');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+EXECUTE new_merged('2013-04-11 18:00:00', '2013-04-11 19:00:00');
+EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 18:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+ ('2013-04-11 19:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts before existing entry ends and ends ' ||
+ 'when another entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 11:00:00', '2013-04-11 13:00:00');
+EXECUTE new_merged('2013-04-11 15:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 12:00:00', '2013-04-11 16:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 13:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+ ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts before existing entry ends and ends ' ||
+ 'after another entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+EXECUTE new_imported('2013-04-11 15:00:00', '2013-04-11 16:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should merge entry that ends when existing entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+ ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should insert entry that starts after existing entry ends');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 15:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts before existing entry starts and ' ||
+ 'ends after that entry ends');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 13:00:00', '2013-04-11 14:00:00');
+EXECUTE new_merged('2013-04-11 15:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 12:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 14:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+ ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should skip entry that starts before and ends after multiple ' ||
+ 'existing entries');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 23:00:00', '2013-04-12 00:00:00');
+EXECUTE new_imported('2013-04-12 00:00:00', '2013-04-12 01:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-12 00:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+ ('2013-04-12 01:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should insert two contiguous entries that end and start at midnight');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 12:00:00', '2013-04-11 17:00:00');
+INSERT INTO imported (fingerprint, node, metric, country, transport,
+ version, stats_start, stats_end, val) VALUES
+ ('9876543210987654321098765432109876543210', 'relay', 'status', '', '',
+ '', '2013-04-11 12:00:00', '2013-04-11 17:00:00', 0);
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+ ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should import two entries with different fingerprints and same ' ||
+ 'start and end');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 15:00:00');
+INSERT INTO imported (fingerprint, node, metric, country, transport,
+ version, stats_start, stats_end, val) VALUES
+ ('9876543210987654321098765432109876543210', 'relay', 'status', '', '',
+ '', '2013-04-11 14:00:00', '2013-04-11 16:00:00', 0);
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+ $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+ ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+ 'Should import two entries with overlapping starts and ends and ' ||
+ 'different fingerprints');
+DELETE FROM imported;
+DELETE FROM merged;
+
+-- TODO Test aggregating imported and merged data.
+
+-- Make sure that the results view has the exact definition as expected
+-- for the .csv export.
+SELECT has_view('estimated');
+SELECT has_column('estimated', 'date');
+SELECT col_type_is('estimated', 'date', 'DATE');
+SELECT has_column('estimated', 'node');
+SELECT col_type_is('estimated', 'node', 'node');
+SELECT has_column('estimated', 'country');
+SELECT col_type_is('estimated', 'country', 'CHARACTER VARYING(2)');
+SELECT has_column('estimated', 'transport');
+SELECT col_type_is('estimated', 'transport', 'CHARACTER VARYING(20)');
+SELECT has_column('estimated', 'version');
+SELECT col_type_is('estimated', 'version', 'CHARACTER VARYING(2)');
+SELECT has_column('estimated', 'frac');
+SELECT col_type_is('estimated', 'frac', 'INTEGER');
+SELECT has_column('estimated', 'users');
+SELECT col_type_is('estimated', 'users', 'INTEGER');
+
+-- TODO Test that frac and users are computed correctly in the view.
+
+-- Finish tests.
+SELECT * FROM finish();
+RESET client_min_messages;
+ROLLBACK;
+
diff --git a/modules/clients/userstats-detector.R b/modules/clients/userstats-detector.R
new file mode 100644
index 0000000..c3a9041
--- /dev/null
+++ b/modules/clients/userstats-detector.R
@@ -0,0 +1,18 @@
+library("reshape")
+export_userstats_detector <- function(path) {
+ c <- read.csv("userstats.csv", stringsAsFactors = FALSE)
+ c <- c[c$country != '' & c$transport == '' & c$version == '' &
+ c$node == 'relay', ]
+ u <- data.frame(country = c$country, date = c$date, users = c$users,
+ stringsAsFactors = FALSE)
+ u <- rbind(u, data.frame(country = "zy",
+ aggregate(list(users = u$users),
+ by = list(date = u$date), sum)))
+ u <- data.frame(date = u$date, country = u$country,
+ users = floor(u$users))
+ u <- cast(u, date ~ country, value = "users")
+ names(u)[names(u) == "zy"] <- "all"
+ write.csv(u, path, quote = FALSE, row.names = FALSE)
+}
+export_userstats_detector("userstats-detector.csv")
+
diff --git a/shared/bin/80-run-clients-stats.sh b/shared/bin/80-run-clients-stats.sh
new file mode 100755
index 0000000..325570c
--- /dev/null
+++ b/shared/bin/80-run-clients-stats.sh
@@ -0,0 +1,30 @@
+#!/bin/sh
+set -e
+
+cd modules/clients/
+
+echo `date` "Parsing descriptors."
+ant | grep "\[java\]"
+
+for i in $(ls out/*.sql)
+do
+ echo `date` "Importing $i."
+ psql -f $i userstats
+done
+
+echo `date` "Exporting results."
+psql -c 'COPY (SELECT * FROM estimated) TO STDOUT WITH CSV HEADER;' userstats > userstats.csv
+
+echo `date` "Running censorship detector."
+R --slave -f userstats-detector.R > /dev/null 2>&1
+python detector.py
+
+echo `date` "Merging censorship detector results."
+R --slave -f merge-clients.R > /dev/null 2>&1
+mkdir -p stats/
+cp clients.csv stats/
+
+echo `date` "Terminating."
+
+cd ../../
+
diff --git a/shared/bin/99-copy-stats-files.sh b/shared/bin/99-copy-stats-files.sh
index 5493cf8..4a30f24 100755
--- a/shared/bin/99-copy-stats-files.sh
+++ b/shared/bin/99-copy-stats-files.sh
@@ -3,4 +3,5 @@ mkdir -p shared/stats
cp -a modules/legacy/stats/*.csv shared/stats/
cp -a modules/advbwdist/stats/advbwdist.csv shared/stats/
cp -a modules/hidserv/stats/hidserv.csv shared/stats/
+cp -a modules/clients/stats/clients.csv shared/stats/
More information about the tor-commits
mailing list