[tor-commits] [metrics-web/master] Create new clients module from metrics-task #8462.

karsten at torproject.org karsten at torproject.org
Sun Aug 16 10:55:26 UTC 2015


commit 14840ed2db075bbc1d0991b974becc3826a50969
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Fri Aug 14 13:46:12 2015 +0200

    Create new clients module from metrics-task #8462.
---
 detector/.gitignore                                |    2 -
 detector/country_info.py                           |  252 ---------
 detector/detector.py                               |  437 ---------------
 detector/detector.sh                               |    6 -
 modules/clients/.gitignore                         |    2 +
 modules/clients/build.xml                          |   44 ++
 modules/clients/country_info.py                    |  252 +++++++++
 modules/clients/detector.py                        |  437 +++++++++++++++
 modules/clients/init-userstats.sql                 |  575 ++++++++++++++++++++
 modules/clients/merge-clients.R                    |   19 +
 .../src/org/torproject/metrics/clients/Main.java   |  465 ++++++++++++++++
 modules/clients/test-userstats.sql                 |  478 ++++++++++++++++
 modules/clients/userstats-detector.R               |   18 +
 shared/bin/80-run-clients-stats.sh                 |   30 +
 shared/bin/99-copy-stats-files.sh                  |    1 +
 15 files changed, 2321 insertions(+), 697 deletions(-)

diff --git a/detector/.gitignore b/detector/.gitignore
deleted file mode 100644
index 29a7166..0000000
--- a/detector/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-*.csv
-
diff --git a/detector/country_info.py b/detector/country_info.py
deleted file mode 100644
index e23728e..0000000
--- a/detector/country_info.py
+++ /dev/null
@@ -1,252 +0,0 @@
-# -*- coding: utf-8 -*-
-
-countries = {
-    "ad" : "Andorra",
-    "ae" : "the United Arab Emirates",
-    "af" : "Afghanistan",
-    "ag" : "Antigua and Barbuda",
-    "ai" : "Anguilla",
-    "al" : "Albania",
-    "am" : "Armenia",
-    "an" : "the Netherlands Antilles",
-    "ao" : "Angola",
-    "aq" : "Antarctica",
-    "ar" : "Argentina",
-    "as" : "American Samoa",
-    "at" : "Austria",
-    "au" : "Australia",
-    "aw" : "Aruba",
-    "ax" : "the Aland Islands",
-    "az" : "Azerbaijan",
-    "ba" : "Bosnia and Herzegovina",
-    "bb" : "Barbados",
-    "bd" : "Bangladesh",
-    "be" : "Belgium",
-    "bf" : "Burkina Faso",
-    "bg" : "Bulgaria",
-    "bh" : "Bahrain",
-    "bi" : "Burundi",
-    "bj" : "Benin",
-    "bl" : "Saint Bartelemey",
-    "bm" : "Bermuda",
-    "bn" : "Brunei",
-    "bo" : "Bolivia",
-    "br" : "Brazil",
-    "bs" : "the Bahamas",
-    "bt" : "Bhutan",
-    "bv" : "the Bouvet Island",
-    "bw" : "Botswana",
-    "by" : "Belarus",
-    "bz" : "Belize",
-    "ca" : "Canada",
-    "cc" : "the Cocos (Keeling) Islands",
-    "cd" : "the Democratic Republic of the Congo",
-    "cf" : "Central African Republic",
-    "cg" : "Congo",
-    "ch" : "Switzerland",
-    "ci" :  u"Côte d'Ivoire",
-    "ck" : "the Cook Islands",
-    "cl" : "Chile",
-    "cm" : "Cameroon",
-    "cn" : "China",
-    "co" : "Colombia",
-    "cr" : "Costa Rica",
-    "cu" : "Cuba",
-    "cv" : "Cape Verde",
-    "cx" : "the Christmas Island",
-    "cy" : "Cyprus",
-    "cz" : "the Czech Republic",
-    "de" : "Germany",
-    "dj" : "Djibouti",
-    "dk" : "Denmark",
-    "dm" : "Dominica",
-    "do" : "the Dominican Republic",
-    "dz" : "Algeria",
-    "ec" : "Ecuador",
-    "ee" : "Estonia",
-    "eg" : "Egypt",
-    "eh" : "the Western Sahara",
-    "er" : "Eritrea",
-    "es" : "Spain",
-    "et" : "Ethiopia",
-    "fi" : "Finland",
-    "fj" : "Fiji",
-    "fk" : "the Falkland Islands (Malvinas)",
-    "fm" : "the Federated States of Micronesia",
-    "fo" : "the Faroe Islands",
-    "fr" : "France",
-    "fx" : "Metropolitan France",
-    "ga" : "Gabon",
-    "gb" : "the United Kingdom",
-    "gd" : "Grenada",
-    "ge" : "Georgia",
-    "gf" : "French Guiana",
-    "gg" : "Guernsey",
-    "gh" : "Ghana",
-    "gi" : "Gibraltar",
-    "gl" : "Greenland",
-    "gm" : "Gambia",
-    "gn" : "Guinea",
-    "gp" : "Guadeloupe",
-    "gq" : "Equatorial Guinea",
-    "gr" : "Greece",
-    "gs" : "South Georgia and the South Sandwich Islands",
-    "gt" : "Guatemala",
-    "gu" : "Guam",
-    "gw" : "Guinea-Bissau",
-    "gy" : "Guyana",
-    "hk" : "Hong Kong",
-    "hm" : "Heard Island and McDonald Islands",
-    "hn" : "Honduras",
-    "hr" : "Croatia",
-    "ht" : "Haiti",
-    "hu" : "Hungary",
-    "id" : "Indonesia",
-    "ie" : "Ireland",
-    "il" : "Israel",
-    "im" : "the Isle of Man",
-    "in" : "India",
-    "io" : "the British Indian Ocean Territory",
-    "iq" : "Iraq",
-    "ir" : "Iran",
-    "is" : "Iceland",
-    "it" : "Italy",
-    "je" : "Jersey",
-    "jm" : "Jamaica",
-    "jo" : "Jordan",
-    "jp" : "Japan",
-    "ke" : "Kenya",
-    "kg" : "Kyrgyzstan",
-    "kh" : "Cambodia",
-    "ki" : "Kiribati",
-    "km" : "Comoros",
-    "kn" : "Saint Kitts and Nevis",
-    "kp" : "North Korea",
-    "kr" : "the Republic of Korea",
-    "kw" : "Kuwait",
-    "ky" : "the Cayman Islands",
-    "kz" : "Kazakhstan",
-    "la" : "Laos",
-    "lb" : "Lebanon",
-    "lc" : "Saint Lucia",
-    "li" : "Liechtenstein",
-    "lk" : "Sri Lanka",
-    "lr" : "Liberia",
-    "ls" : "Lesotho",
-    "lt" : "Lithuania",
-    "lu" : "Luxembourg",
-    "lv" : "Latvia",
-    "ly" : "Libya",
-    "ma" : "Morocco",
-    "mc" : "Monaco",
-    "md" : "the Republic of Moldova",
-    "me" : "Montenegro",
-    "mf" : "Saint Martin",
-    "mg" : "Madagascar",
-    "mh" : "the Marshall Islands",
-    "mk" : "Macedonia",
-    "ml" : "Mali",
-    "mm" : "Burma",
-    "mn" : "Mongolia",
-    "mo" : "Macau",
-    "mp" : "the Northern Mariana Islands",
-    "mq" : "Martinique",
-    "mr" : "Mauritania",
-    "ms" : "Montserrat",
-    "mt" : "Malta",
-    "mu" : "Mauritius",
-    "mv" : "the Maldives",
-    "mw" : "Malawi",
-    "mx" : "Mexico",
-    "my" : "Malaysia",
-    "mz" : "Mozambique",
-    "na" : "Namibia",
-    "nc" : "New Caledonia",
-    "ne" : "Niger",
-    "nf" : "Norfolk Island",
-    "ng" : "Nigeria",
-    "ni" : "Nicaragua",
-    "nl" : "the Netherlands",
-    "no" : "Norway",
-    "np" : "Nepal",
-    "nr" : "Nauru",
-    "nu" : "Niue",
-    "nz" : "New Zealand",
-    "om" : "Oman",
-    "pa" : "Panama",
-    "pe" : "Peru",
-    "pf" : "French Polynesia",
-    "pg" : "Papua New Guinea",
-    "ph" : "the Philippines",
-    "pk" : "Pakistan",
-    "pl" : "Poland",
-    "pm" : "Saint Pierre and Miquelon",
-    "pn" : "the Pitcairn Islands",
-    "pr" : "Puerto Rico",
-    "ps" : "the Palestinian Territory",
-    "pt" : "Portugal",
-    "pw" : "Palau",
-    "py" : "Paraguay",
-    "qa" : "Qatar",
-    "re" : "Reunion",
-    "ro" : "Romania",
-    "rs" : "Serbia",
-    "ru" : "Russia",
-    "rw" : "Rwanda",
-    "sa" : "Saudi Arabia",
-    "sb" : "the Solomon Islands",
-    "sc" : "the Seychelles",
-    "sd" : "Sudan",
-    "se" : "Sweden",
-    "sg" : "Singapore",
-    "sh" : "Saint Helena",
-    "si" : "Slovenia",
-    "sj" : "Svalbard and Jan Mayen",
-    "sk" : "Slovakia",
-    "sl" : "Sierra Leone",
-    "sm" : "San Marino",
-    "sn" : "Senegal",
-    "so" : "Somalia",
-    "sr" : "Suriname",
-    "ss" : "South Sudan",
-    "st" : u"São Tomé and Príncipe",
-    "sv" : "El Salvador",
-    "sy" : "the Syrian Arab Republic",
-    "sz" : "Swaziland",
-    "tc" : "Turks and Caicos Islands",
-    "td" : "Chad",
-    "tf" : "the French Southern Territories",
-    "tg" : "Togo",
-    "th" : "Thailand",
-    "tj" : "Tajikistan",
-    "tk" : "Tokelau",
-    "tl" : "East Timor",
-    "tm" : "Turkmenistan",
-    "tn" : "Tunisia",
-    "to" : "Tonga",
-    "tr" : "Turkey",
-    "tt" : "Trinidad and Tobago",
-    "tv" : "Tuvalu",
-    "tw" : "Taiwan",
-    "tz" : "the United Republic of Tanzania",
-    "ua" : "Ukraine",
-    "ug" : "Uganda",
-    "um" : "the United States Minor Outlying Islands",
-    "us" : "the United States",
-    "uy" : "Uruguay",
-    "uz" : "Uzbekistan",
-    "va" : "Vatican City",
-    "vc" : "Saint Vincent and the Grenadines",
-    "ve" : "Venezuela",
-    "vg" : "the British Virgin Islands",
-    "vi" : "the United States Virgin Islands",
-    "vn" : "Vietnam",
-    "vu" : "Vanuatu",
-    "wf" : "Wallis and Futuna",
-    "ws" : "Samoa",
-    "ye" : "Yemen",
-    "yt" : "Mayotte",
-    "za" : "South Africa",
-    "zm" : "Zambia",
-    "zw" : "Zimbabwe"
-    }
diff --git a/detector/detector.py b/detector/detector.py
deleted file mode 100644
index 611f25b..0000000
--- a/detector/detector.py
+++ /dev/null
@@ -1,437 +0,0 @@
-##  Copyright (c) 2011 George Danezis <gdane at microsoft.com>
-##
-##  All rights reserved.
-##
-##  Redistribution and use in source and binary forms, with or without
-##  modification, are permitted (subject to the limitations in the
-##  disclaimer below) provided that the following conditions are met:
-##
-##   * Redistributions of source code must retain the above copyright
-##     notice, this list of conditions and the following disclaimer.
-##
-##   * Redistributions in binary form must reproduce the above copyright
-##     notice, this list of conditions and the following disclaimer in the
-##     documentation and/or other materials provided with the
-##     distribution.
-##
-##   * Neither the name of <Owner Organization> nor the names of its
-##     contributors may be used to endorse or promote products derived
-##     from this software without specific prior written permission.
-##
-##  NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE
-##  GRANTED BY THIS LICENSE.  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT
-##  HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
-##  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
-##  MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-##  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-##  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-##  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-##  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
-##  BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
-##  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
-##  OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
-##  IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-##
-##  (Clear BSD license: http://labs.metacarta.com/license-explanation.html#license)
-
-##  This script reads a .csv file of the number of Tor users and finds
-##  anomalies that might be indicative of censorship.
-
-# Dep: matplotlib
-from pylab import *
-import matplotlib
-
-# Dep: numpy
-import numpy
-
-# Dep: scipy
-import scipy.stats
-from scipy.stats.distributions import norm
-from scipy.stats.distributions import poisson
-
-# Std lib
-from datetime import date
-from datetime import timedelta
-import os.path
-
-# Country code -> Country names
-import country_info
-
-# write utf8 to file
-import codecs
-
-days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
-
-def get_country_name_from_cc(country_code):
-  if (country_code.lower() in country_info.countries):
-    return country_info.countries[country_code.lower()]
-  return country_code # if we didn't find the cc in our map
-
-"""
-Represents a .csv file containing information on the number of
-connecting Tor users per country.
-
-'store': Dictionary with (<country code>, <counter>) as key, and the number of users as value.
-         <country code> can also be "date"...
-'all_dates': List of the data intervals (with default timedelta: 1 day).
-'country_codes': List of all relevant country codes.
-'MAX_INDEX': Length of store, number of country codes etc.
-'date_min': The oldest date found in the .csv.
-'date_min': The latest date found in the .csv.
-"""
-class torstatstore:
-  def __init__(self, file_name):
-    f = file(file_name)
-    country_codes = f.readline()
-    country_codes = country_codes.strip().split(",")
-
-    store = {}
-    MAX_INDEX = 0
-    for i, line in enumerate(f):
-        MAX_INDEX += 1
-        line_parsed = line.strip().split(",")
-        for j, (ccode, val) in enumerate(zip(country_codes,line_parsed)):
-            processed_val = None
-            if ccode == "date":
-                try:
-                    year, month, day = int(val[:4]), int(val[5:7]), int(val[8:10])
-                    processed_val = date(year, month, day)
-                except Exception, e:
-                    print "Parsing error (ignoring line %s):" % j
-                    print "%s" % val,e
-                    break
-
-            elif val != "NA":
-                processed_val = int(val)
-            store[(ccode, i)] = processed_val
-
-    # min and max
-    date_min = store[("date", 0)]
-    date_max = store[("date", i)]
-
-    all_dates = []
-    d = date_min
-    dt = timedelta(days=1)
-    while d <= date_max:
-        all_dates += [d]
-        d = d + dt
-
-    # Save for later
-    self.store = store
-    self.all_dates = all_dates
-    self.country_codes = country_codes
-    self.MAX_INDEX = MAX_INDEX
-    self.date_min = date_min
-    self.date_max = date_max
-
-  """Return a list representing a time series of 'ccode' with respect
-  to the number of connected users.
-  """
-  def get_country_series(self, ccode):
-    assert ccode in self.country_codes
-    series = {}
-    for d in self.all_dates:
-        series[d] = None
-    for i in range(self.MAX_INDEX):
-        series[self.store[("date", i)]] = self.store[(ccode, i)]
-    sx = []
-    for d in self.all_dates:
-        sx += [series[d]]
-    return sx
-
-  """Return an ordered list containing tuples of the form (<number of
-  users>, <country code>). The list is ordered with respect to the
-  number of users for each country.
-  """
-  def get_largest(self, number):
-    exclude = set(["all", "??", "date"])
-    l = [(self.store[(c, self.MAX_INDEX-1)], c) for c in self.country_codes if c not in exclude]
-    l.sort()
-    l.reverse()
-    return l[:number]
-
-  """Return a dictionary, with <country code> as key, and the time
-  series of the country code as the value.
-  """
-  def get_largest_locations(self, number):
-    l = self.get_largest(number)
-    res = {}
-    for _, ccode in l[:number]:
-      res[ccode] = self.get_country_series(ccode)
-    return res
-
-"""Return a list containing lists (?) where each such list contains
-the difference in users for a time delta of 'days'
-"""
-def n_day_rel(series, days):
-  rel = []
-  for i, v in enumerate(series):
-    if series[i] is None:
-      rel += [None]
-      continue
-
-    if i - days < 0 or series[i-days] is None or series[i-days] == 0:
-      rel += [None]
-    else:
-      rel += [ float(series[i]) / series[i-days]]
-  return rel
-
-# Main model: computes the expected min / max range of number of users
-def make_tendencies_minmax(l, INTERVAL = 1):
-  lminus1 = dict([(ccode, n_day_rel(l[ccode], INTERVAL)) for ccode in l])
-  c = lminus1[lminus1.keys()[0]]
-  dists = []
-  minx = []
-  maxx = []
-  for i in range(len(c)):
-    vals = [lminus1[ccode][i] for ccode in lminus1.keys() if lminus1[ccode][i] != None]
-    if len(vals) < 8:
-      dists += [None]
-      minx += [None]
-      maxx += [None]
-    else:
-      vals.sort()
-      median = vals[len(vals)/2]
-      q1 = vals[len(vals)/4]
-      q2 = vals[(3*len(vals))/4]
-      qd = q2 - q1
-      vals = [v for v in vals if median - qd*4 < v and  v < median + qd*4]
-      if len(vals) < 8:
-        dists += [None]
-        minx += [None]
-        maxx += [None]
-        continue
-      mu, signma = norm.fit(vals)
-      dists += [(mu, signma)]
-      maxx += [norm.ppf(0.9999, mu, signma)]
-      minx += [norm.ppf(1 - 0.9999, mu, signma)]
-  ## print minx[-1], maxx[-1]
-  return minx, maxx
-
-# Makes pretty plots
-def raw_plot(series, minc, maxc, labels, xtitle):
-    assert len(xtitle) == 3
-    fname, stitle, slegend = xtitle
-
-    font = {'family' : 'Bitstream Vera Sans',
-        'weight' : 'normal',
-        'size'   : 8}
-    matplotlib.rc('font', **font)
-
-    ylim( (-max(series)*0.1, max(series)*1.1) )
-    plot(labels, series, linewidth=1.0, label="Users")
-
-    wherefill = []
-    for mm,mx in zip(minc, maxc):
-      wherefill += [not (mm == None and mx == None)]
-      assert mm < mx or (mm == None and mx == None)
-
-    fill_between(labels, minc, maxc, where=wherefill, color="gray", label="Prediction")
-
-    vdown = []
-    vup = []
-    for i,v in enumerate(series):
-      if minc[i] != None and v < minc[i]:
-        vdown += [v]
-        vup += [None]
-      elif maxc[i] != None and v > maxc[i]:
-        vdown += [None]
-        vup += [v]
-      else:
-        vup += [None]
-        vdown += [None]
-
-    plot(labels, vdown, 'o', ms=10, lw=2, alpha=0.5, mfc='orange', label="Downturns")
-    plot(labels, vup, 'o', ms=10, lw=2, alpha=0.5, mfc='green', label="Upturns")
-
-    legend(loc=2)
-
-    xlabel('Time (days)')
-    ylabel('Users')
-    title(stitle)
-    grid(True)
-    F = gcf()
-
-    F.set_size_inches(10,5)
-    F.savefig(fname,  format="png", dpi = (150))
-    close()
-
-def absolute_plot(series, minc, maxc, labels,INTERVAL, xtitle):
-  in_minc = []
-  in_maxc = []
-  for i, v in enumerate(series):
-    if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None:
-      in_minc += [minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])]
-      in_maxc += [maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])]
-      if not in_minc[-1] < in_maxc[-1]:
-        print in_minc[-1], in_maxc[-1], series[i-INTERVAL], minc[i], maxc[i]
-      assert in_minc[-1] < in_maxc[-1]
-    else:
-      in_minc += [None]
-      in_maxc += [None]
-  raw_plot(series, in_minc, in_maxc, labels, xtitle)
-
-"""Return the number of downscores and upscores of a time series
-'series', given tendencies 'minc' and 'maxc' for the time interval
-'INTERVAL'.
-
-If 'scoring_interval' is specifed we only consider upscore/downscore
-that happened in the latest 'scoring_interval' days.
-"""
-def censor_score(series, minc, maxc, INTERVAL, scoring_interval=None):
-  upscore = 0
-  downscore = 0
-
-  if scoring_interval is None:
-    scoring_interval = len(series)
-  assert(len(series) >= scoring_interval)
-
-  for i, v in enumerate(series):
-    if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None:
-      in_minc = minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])
-      in_maxc = maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])
-      if (i >= (len(series) - scoring_interval)):
-        downscore += 1 if minc[i] != None and v < in_minc else 0
-        upscore += 1 if maxc[i] != None and v > in_maxc else 0
-
-  return downscore, upscore
-
-def plot_target(tss, TARGET, xtitle, minx, maxx, DAYS=365, INTERV = 7):
-  ctarget = tss.get_country_series(TARGET)
-  c = n_day_rel(ctarget, INTERV)
-  absolute_plot(ctarget[-DAYS:], minx[-DAYS:], maxx[-DAYS:], tss.all_dates[-DAYS:],INTERV, xtitle = xtitle)
-
-def write_censorship_report_prologue(report_file, dates, notification_period):
-  if (notification_period == 1):
-    date_str = "%s" % (dates[-1]) # no need for date range if it's just one day
-  else:
-    date_str = "%s to %s" % (dates[-notification_period], dates[-1])
-
-  prologue = "=======================\n"
-  prologue += "Automatic Censorship Report for %s\n" % (date_str)
-  prologue += "=======================\n\n"
-  report_file.write(prologue)
-
-## Make a league table of censorship + nice graphs
-def plot_all(tss, minx, maxx, INTERV, DAYS=None, rdir="img"):
-  rdir = os.path.realpath(rdir)
-  if not os.path.exists(rdir) or not os.path.isdir(rdir):
-    print "ERROR: %s does not exist or is not a directory." % rdir
-    return
-
-  summary_file = file(os.path.join(rdir, "summary.txt"), "w")
-
-  if DAYS == None:
-    DAYS = 6*31
-
-  s = tss.get_largest(200)
-  scores = []
-  for num, li in s:
-    print ".",
-    ds,us = censor_score(tss.get_country_series(li)[-DAYS:], minx[-DAYS:], maxx[-DAYS:], INTERV)
-    # print ds, us
-    scores += [(ds,num, us, li)]
-  scores.sort()
-  scores.reverse()
-  s = "\n=======================\n"
-  s+= "Report for %s to %s\n" % (tss.all_dates[-DAYS], tss.all_dates[-1])
-  s+= "=======================\n"
-  print s
-  summary_file.write(s)
-  for a,nx, b,c in scores:
-    if a > 0:
-      s = "%s -- down: %2d (up: %2d affected: %s)" % (c, a, b, nx)
-      print s
-      summary_file.write(s + "\n")
-      xtitle = (os.path.join(rdir, "%03d-%s-censor.png" % (a,c)), "Tor report for %s -- down: %2d (up: %2d affected: %s)" % (c, a, b, nx),"")
-      plot_target(tss, c,xtitle, minx, maxx, DAYS, INTERV)
-  summary_file.close()
-
-"""Write a CSV report on the minimum/maximum users of each country per date."""
-def write_all(tss, minc, maxc, RANGES_FILE, INTERVAL=7):
-  ranges_file = file(RANGES_FILE, "w")
-  ranges_file.write("date,country,minusers,maxusers\n")
-  exclude = set(["all", "??", "date"])
-  for c in tss.country_codes:
-    if c in exclude:
-      continue
-    series = tss.get_country_series(c)
-    for i, v in enumerate(series):
-      if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None:
-        minv = minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])
-        maxv = maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])
-        if not minv < maxv:
-          print minv, maxv, series[i-INTERVAL], minc[i], maxc[i]
-        assert minv < maxv
-        ranges_file.write("%s,%s,%s,%s\n" % (tss.all_dates[i], c, minv, maxv))
-  ranges_file.close()
-
-"""Return a URL that points to a graph in metrics.tpo that displays
-the number of direct Tor users in country 'country_code', for a
-'period'-days period.
-
-Let's hope that the metrics.tpo URL scheme doesn't change often.
-"""
-def get_tor_usage_graph_url_for_cc_and_date(country_code, dates, period):
-  url = "https://metrics.torproject.org/users.html?graph=userstats-relay-country&start=%s&end=%s&country=%s&events=on#userstats-relay-country\n" % \
-      (dates[-period], dates[-1], country_code)
-  return url
-
-"""Write a file containing a short censorship report over the last
-'notification_period' days.
-"""
-def write_ml_report(tss, minx, maxx, INTERV, DAYS, notification_period=None):
-  if notification_period is None:
-    notification_period = DAYS
-
-  report_file = codecs.open('short_censorship_report.txt', 'w', 'utf-8')
-  file_prologue_written = False
-
-  s = tss.get_largest(None) # no restrictions, get 'em all.
-  scores = []
-  for num, li in s:
-    ds,us = censor_score(tss.get_country_series(li)[-DAYS:], minx[-DAYS:], maxx[-DAYS:], INTERV, notification_period)
-    scores += [(ds,num, us, li)]
-  scores.sort()
-  scores.reverse()
-
-  for downscores,users_n,upscores,country_code in scores:
-    if (downscores > 0) or (upscores > 0):
-      if not file_prologue_written:
-        write_censorship_report_prologue(report_file, tss.all_dates, notification_period)
-        file_prologue_written = True
-
-      if ((upscores > 0) and (downscores == 0)):
-        s = "We detected an unusual spike of Tor users in %s (%d upscores, %d users):\n" % \
-            (get_country_name_from_cc(country_code), upscores, users_n)
-      else:
-        s = "We detected %d potential censorship events in %s (users: %d, upscores: %d):\n" % \
-            (downscores, get_country_name_from_cc(country_code), users_n, upscores)
-
-      # Also give out a link for the appropriate usage graph for a 90-days period.
-      s += get_tor_usage_graph_url_for_cc_and_date(country_code, tss.all_dates, 90)
-
-      report_file.write(s + "\n")
-
-  report_file.close()
-
-# INTERV is the time interval to model connection rates;
-# consider maximum DAYS days back.
-def detect(CSV_FILE = "userstats-detector.csv",
-           RANGES_FILE = "userstats-ranges.csv", GRAPH_DIR = "img",
-           INTERV = 7, DAYS = 6 * 31, REPORT = True):
-  tss = torstatstore(CSV_FILE)
-  l = tss.get_largest_locations(50)
-  minx, maxx = make_tendencies_minmax(l, INTERV)
-  #plot_all(tss, minx, maxx, INTERV, DAYS, rdir=GRAPH_DIR)
-  write_all(tss, minx, maxx, RANGES_FILE, INTERV)
-
-  if REPORT:
-    # Make our short report; only consider events of the last day
-    write_ml_report(tss, minx, maxx, INTERV, DAYS, 1)
-
-def main():
-  detect()
-
-if __name__ == "__main__":
-    main()
diff --git a/detector/detector.sh b/detector/detector.sh
deleted file mode 100755
index 56f6886..0000000
--- a/detector/detector.sh
+++ /dev/null
@@ -1,6 +0,0 @@
-#!/bin/bash
-wget -qO direct-users.csv --no-check-certificate https://metrics.torproject.org/csv/direct-users.csv
-wget -qO userstats-detector.csv --no-check-certificate https://metrics.torproject.org/csv/userstats-detector.csv
-python detector.py
-cat short_censorship_report.txt | mail -E -s 'Possible censorship events' tor-censorship-events at lists.torproject.org
-
diff --git a/modules/clients/.gitignore b/modules/clients/.gitignore
new file mode 100644
index 0000000..29a7166
--- /dev/null
+++ b/modules/clients/.gitignore
@@ -0,0 +1,2 @@
+*.csv
+
diff --git a/modules/clients/build.xml b/modules/clients/build.xml
new file mode 100644
index 0000000..f90e138
--- /dev/null
+++ b/modules/clients/build.xml
@@ -0,0 +1,44 @@
+<project default="run" name="clients" basedir=".">
+
+  <property name="sources" value="src"/>
+  <property name="classes" value="classes"/>
+  <path id="classpath">
+    <pathelement path="${classes}"/>
+    <fileset dir="/usr/share/java">
+      <include name="commons-codec-1.6.jar"/>
+      <include name="commons-compress-1.4.1.jar"/>
+      <include name="commons-lang-2.6.jar"/>
+    </fileset>
+    <fileset dir="../../deps/metrics-lib">
+      <include name="descriptor.jar"/>
+    </fileset>
+  </path>
+
+  <target name="metrics-lib">
+    <ant dir="../../deps/metrics-lib"/>
+  </target>
+
+  <target name="compile" depends="metrics-lib">
+    <mkdir dir="${classes}"/>
+    <javac destdir="${classes}"
+           srcdir="${sources}"
+           source="1.6"
+           target="1.6"
+           debug="true"
+           deprecation="true"
+           optimize="false"
+           failonerror="true"
+           includeantruntime="false">
+      <classpath refid="classpath"/>
+    </javac>
+  </target>
+
+  <target name="run" depends="compile">
+    <java fork="true"
+          maxmemory="2g"
+          classname="org.torproject.metrics.clients.Main">
+      <classpath refid="classpath"/>
+    </java>
+  </target>
+</project>
+
diff --git a/modules/clients/country_info.py b/modules/clients/country_info.py
new file mode 100644
index 0000000..e23728e
--- /dev/null
+++ b/modules/clients/country_info.py
@@ -0,0 +1,252 @@
+# -*- coding: utf-8 -*-
+
+countries = {
+    "ad" : "Andorra",
+    "ae" : "the United Arab Emirates",
+    "af" : "Afghanistan",
+    "ag" : "Antigua and Barbuda",
+    "ai" : "Anguilla",
+    "al" : "Albania",
+    "am" : "Armenia",
+    "an" : "the Netherlands Antilles",
+    "ao" : "Angola",
+    "aq" : "Antarctica",
+    "ar" : "Argentina",
+    "as" : "American Samoa",
+    "at" : "Austria",
+    "au" : "Australia",
+    "aw" : "Aruba",
+    "ax" : "the Aland Islands",
+    "az" : "Azerbaijan",
+    "ba" : "Bosnia and Herzegovina",
+    "bb" : "Barbados",
+    "bd" : "Bangladesh",
+    "be" : "Belgium",
+    "bf" : "Burkina Faso",
+    "bg" : "Bulgaria",
+    "bh" : "Bahrain",
+    "bi" : "Burundi",
+    "bj" : "Benin",
+    "bl" : "Saint Bartelemey",
+    "bm" : "Bermuda",
+    "bn" : "Brunei",
+    "bo" : "Bolivia",
+    "br" : "Brazil",
+    "bs" : "the Bahamas",
+    "bt" : "Bhutan",
+    "bv" : "the Bouvet Island",
+    "bw" : "Botswana",
+    "by" : "Belarus",
+    "bz" : "Belize",
+    "ca" : "Canada",
+    "cc" : "the Cocos (Keeling) Islands",
+    "cd" : "the Democratic Republic of the Congo",
+    "cf" : "Central African Republic",
+    "cg" : "Congo",
+    "ch" : "Switzerland",
+    "ci" :  u"Côte d'Ivoire",
+    "ck" : "the Cook Islands",
+    "cl" : "Chile",
+    "cm" : "Cameroon",
+    "cn" : "China",
+    "co" : "Colombia",
+    "cr" : "Costa Rica",
+    "cu" : "Cuba",
+    "cv" : "Cape Verde",
+    "cx" : "the Christmas Island",
+    "cy" : "Cyprus",
+    "cz" : "the Czech Republic",
+    "de" : "Germany",
+    "dj" : "Djibouti",
+    "dk" : "Denmark",
+    "dm" : "Dominica",
+    "do" : "the Dominican Republic",
+    "dz" : "Algeria",
+    "ec" : "Ecuador",
+    "ee" : "Estonia",
+    "eg" : "Egypt",
+    "eh" : "the Western Sahara",
+    "er" : "Eritrea",
+    "es" : "Spain",
+    "et" : "Ethiopia",
+    "fi" : "Finland",
+    "fj" : "Fiji",
+    "fk" : "the Falkland Islands (Malvinas)",
+    "fm" : "the Federated States of Micronesia",
+    "fo" : "the Faroe Islands",
+    "fr" : "France",
+    "fx" : "Metropolitan France",
+    "ga" : "Gabon",
+    "gb" : "the United Kingdom",
+    "gd" : "Grenada",
+    "ge" : "Georgia",
+    "gf" : "French Guiana",
+    "gg" : "Guernsey",
+    "gh" : "Ghana",
+    "gi" : "Gibraltar",
+    "gl" : "Greenland",
+    "gm" : "Gambia",
+    "gn" : "Guinea",
+    "gp" : "Guadeloupe",
+    "gq" : "Equatorial Guinea",
+    "gr" : "Greece",
+    "gs" : "South Georgia and the South Sandwich Islands",
+    "gt" : "Guatemala",
+    "gu" : "Guam",
+    "gw" : "Guinea-Bissau",
+    "gy" : "Guyana",
+    "hk" : "Hong Kong",
+    "hm" : "Heard Island and McDonald Islands",
+    "hn" : "Honduras",
+    "hr" : "Croatia",
+    "ht" : "Haiti",
+    "hu" : "Hungary",
+    "id" : "Indonesia",
+    "ie" : "Ireland",
+    "il" : "Israel",
+    "im" : "the Isle of Man",
+    "in" : "India",
+    "io" : "the British Indian Ocean Territory",
+    "iq" : "Iraq",
+    "ir" : "Iran",
+    "is" : "Iceland",
+    "it" : "Italy",
+    "je" : "Jersey",
+    "jm" : "Jamaica",
+    "jo" : "Jordan",
+    "jp" : "Japan",
+    "ke" : "Kenya",
+    "kg" : "Kyrgyzstan",
+    "kh" : "Cambodia",
+    "ki" : "Kiribati",
+    "km" : "Comoros",
+    "kn" : "Saint Kitts and Nevis",
+    "kp" : "North Korea",
+    "kr" : "the Republic of Korea",
+    "kw" : "Kuwait",
+    "ky" : "the Cayman Islands",
+    "kz" : "Kazakhstan",
+    "la" : "Laos",
+    "lb" : "Lebanon",
+    "lc" : "Saint Lucia",
+    "li" : "Liechtenstein",
+    "lk" : "Sri Lanka",
+    "lr" : "Liberia",
+    "ls" : "Lesotho",
+    "lt" : "Lithuania",
+    "lu" : "Luxembourg",
+    "lv" : "Latvia",
+    "ly" : "Libya",
+    "ma" : "Morocco",
+    "mc" : "Monaco",
+    "md" : "the Republic of Moldova",
+    "me" : "Montenegro",
+    "mf" : "Saint Martin",
+    "mg" : "Madagascar",
+    "mh" : "the Marshall Islands",
+    "mk" : "Macedonia",
+    "ml" : "Mali",
+    "mm" : "Burma",
+    "mn" : "Mongolia",
+    "mo" : "Macau",
+    "mp" : "the Northern Mariana Islands",
+    "mq" : "Martinique",
+    "mr" : "Mauritania",
+    "ms" : "Montserrat",
+    "mt" : "Malta",
+    "mu" : "Mauritius",
+    "mv" : "the Maldives",
+    "mw" : "Malawi",
+    "mx" : "Mexico",
+    "my" : "Malaysia",
+    "mz" : "Mozambique",
+    "na" : "Namibia",
+    "nc" : "New Caledonia",
+    "ne" : "Niger",
+    "nf" : "Norfolk Island",
+    "ng" : "Nigeria",
+    "ni" : "Nicaragua",
+    "nl" : "the Netherlands",
+    "no" : "Norway",
+    "np" : "Nepal",
+    "nr" : "Nauru",
+    "nu" : "Niue",
+    "nz" : "New Zealand",
+    "om" : "Oman",
+    "pa" : "Panama",
+    "pe" : "Peru",
+    "pf" : "French Polynesia",
+    "pg" : "Papua New Guinea",
+    "ph" : "the Philippines",
+    "pk" : "Pakistan",
+    "pl" : "Poland",
+    "pm" : "Saint Pierre and Miquelon",
+    "pn" : "the Pitcairn Islands",
+    "pr" : "Puerto Rico",
+    "ps" : "the Palestinian Territory",
+    "pt" : "Portugal",
+    "pw" : "Palau",
+    "py" : "Paraguay",
+    "qa" : "Qatar",
+    "re" : "Reunion",
+    "ro" : "Romania",
+    "rs" : "Serbia",
+    "ru" : "Russia",
+    "rw" : "Rwanda",
+    "sa" : "Saudi Arabia",
+    "sb" : "the Solomon Islands",
+    "sc" : "the Seychelles",
+    "sd" : "Sudan",
+    "se" : "Sweden",
+    "sg" : "Singapore",
+    "sh" : "Saint Helena",
+    "si" : "Slovenia",
+    "sj" : "Svalbard and Jan Mayen",
+    "sk" : "Slovakia",
+    "sl" : "Sierra Leone",
+    "sm" : "San Marino",
+    "sn" : "Senegal",
+    "so" : "Somalia",
+    "sr" : "Suriname",
+    "ss" : "South Sudan",
+    "st" : u"São Tomé and Príncipe",
+    "sv" : "El Salvador",
+    "sy" : "the Syrian Arab Republic",
+    "sz" : "Swaziland",
+    "tc" : "Turks and Caicos Islands",
+    "td" : "Chad",
+    "tf" : "the French Southern Territories",
+    "tg" : "Togo",
+    "th" : "Thailand",
+    "tj" : "Tajikistan",
+    "tk" : "Tokelau",
+    "tl" : "East Timor",
+    "tm" : "Turkmenistan",
+    "tn" : "Tunisia",
+    "to" : "Tonga",
+    "tr" : "Turkey",
+    "tt" : "Trinidad and Tobago",
+    "tv" : "Tuvalu",
+    "tw" : "Taiwan",
+    "tz" : "the United Republic of Tanzania",
+    "ua" : "Ukraine",
+    "ug" : "Uganda",
+    "um" : "the United States Minor Outlying Islands",
+    "us" : "the United States",
+    "uy" : "Uruguay",
+    "uz" : "Uzbekistan",
+    "va" : "Vatican City",
+    "vc" : "Saint Vincent and the Grenadines",
+    "ve" : "Venezuela",
+    "vg" : "the British Virgin Islands",
+    "vi" : "the United States Virgin Islands",
+    "vn" : "Vietnam",
+    "vu" : "Vanuatu",
+    "wf" : "Wallis and Futuna",
+    "ws" : "Samoa",
+    "ye" : "Yemen",
+    "yt" : "Mayotte",
+    "za" : "South Africa",
+    "zm" : "Zambia",
+    "zw" : "Zimbabwe"
+    }
diff --git a/modules/clients/detector.py b/modules/clients/detector.py
new file mode 100644
index 0000000..611f25b
--- /dev/null
+++ b/modules/clients/detector.py
@@ -0,0 +1,437 @@
+##  Copyright (c) 2011 George Danezis <gdane at microsoft.com>
+##
+##  All rights reserved.
+##
+##  Redistribution and use in source and binary forms, with or without
+##  modification, are permitted (subject to the limitations in the
+##  disclaimer below) provided that the following conditions are met:
+##
+##   * Redistributions of source code must retain the above copyright
+##     notice, this list of conditions and the following disclaimer.
+##
+##   * Redistributions in binary form must reproduce the above copyright
+##     notice, this list of conditions and the following disclaimer in the
+##     documentation and/or other materials provided with the
+##     distribution.
+##
+##   * Neither the name of <Owner Organization> nor the names of its
+##     contributors may be used to endorse or promote products derived
+##     from this software without specific prior written permission.
+##
+##  NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE
+##  GRANTED BY THIS LICENSE.  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT
+##  HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
+##  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+##  MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+##  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+##  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+##  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+##  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+##  BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+##  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+##  OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
+##  IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+##
+##  (Clear BSD license: http://labs.metacarta.com/license-explanation.html#license)
+
+##  This script reads a .csv file of the number of Tor users and finds
+##  anomalies that might be indicative of censorship.
+
+# Dep: matplotlib
+from pylab import *
+import matplotlib
+
+# Dep: numpy
+import numpy
+
+# Dep: scipy
+import scipy.stats
+from scipy.stats.distributions import norm
+from scipy.stats.distributions import poisson
+
+# Std lib
+from datetime import date
+from datetime import timedelta
+import os.path
+
+# Country code -> Country names
+import country_info
+
+# write utf8 to file
+import codecs
+
+days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
+
+def get_country_name_from_cc(country_code):
+  if (country_code.lower() in country_info.countries):
+    return country_info.countries[country_code.lower()]
+  return country_code # if we didn't find the cc in our map
+
+"""
+Represents a .csv file containing information on the number of
+connecting Tor users per country.
+
+'store': Dictionary with (<country code>, <counter>) as key, and the number of users as value.
+         <country code> can also be "date"...
+'all_dates': List of the data intervals (with default timedelta: 1 day).
+'country_codes': List of all relevant country codes.
+'MAX_INDEX': Length of store, number of country codes etc.
+'date_min': The oldest date found in the .csv.
+'date_min': The latest date found in the .csv.
+"""
+class torstatstore:
+  def __init__(self, file_name):
+    f = file(file_name)
+    country_codes = f.readline()
+    country_codes = country_codes.strip().split(",")
+
+    store = {}
+    MAX_INDEX = 0
+    for i, line in enumerate(f):
+        MAX_INDEX += 1
+        line_parsed = line.strip().split(",")
+        for j, (ccode, val) in enumerate(zip(country_codes,line_parsed)):
+            processed_val = None
+            if ccode == "date":
+                try:
+                    year, month, day = int(val[:4]), int(val[5:7]), int(val[8:10])
+                    processed_val = date(year, month, day)
+                except Exception, e:
+                    print "Parsing error (ignoring line %s):" % j
+                    print "%s" % val,e
+                    break
+
+            elif val != "NA":
+                processed_val = int(val)
+            store[(ccode, i)] = processed_val
+
+    # min and max
+    date_min = store[("date", 0)]
+    date_max = store[("date", i)]
+
+    all_dates = []
+    d = date_min
+    dt = timedelta(days=1)
+    while d <= date_max:
+        all_dates += [d]
+        d = d + dt
+
+    # Save for later
+    self.store = store
+    self.all_dates = all_dates
+    self.country_codes = country_codes
+    self.MAX_INDEX = MAX_INDEX
+    self.date_min = date_min
+    self.date_max = date_max
+
+  """Return a list representing a time series of 'ccode' with respect
+  to the number of connected users.
+  """
+  def get_country_series(self, ccode):
+    assert ccode in self.country_codes
+    series = {}
+    for d in self.all_dates:
+        series[d] = None
+    for i in range(self.MAX_INDEX):
+        series[self.store[("date", i)]] = self.store[(ccode, i)]
+    sx = []
+    for d in self.all_dates:
+        sx += [series[d]]
+    return sx
+
+  """Return an ordered list containing tuples of the form (<number of
+  users>, <country code>). The list is ordered with respect to the
+  number of users for each country.
+  """
+  def get_largest(self, number):
+    exclude = set(["all", "??", "date"])
+    l = [(self.store[(c, self.MAX_INDEX-1)], c) for c in self.country_codes if c not in exclude]
+    l.sort()
+    l.reverse()
+    return l[:number]
+
+  """Return a dictionary, with <country code> as key, and the time
+  series of the country code as the value.
+  """
+  def get_largest_locations(self, number):
+    l = self.get_largest(number)
+    res = {}
+    for _, ccode in l[:number]:
+      res[ccode] = self.get_country_series(ccode)
+    return res
+
+"""Return a list containing lists (?) where each such list contains
+the difference in users for a time delta of 'days'
+"""
+def n_day_rel(series, days):
+  rel = []
+  for i, v in enumerate(series):
+    if series[i] is None:
+      rel += [None]
+      continue
+
+    if i - days < 0 or series[i-days] is None or series[i-days] == 0:
+      rel += [None]
+    else:
+      rel += [ float(series[i]) / series[i-days]]
+  return rel
+
+# Main model: computes the expected min / max range of number of users
+def make_tendencies_minmax(l, INTERVAL = 1):
+  lminus1 = dict([(ccode, n_day_rel(l[ccode], INTERVAL)) for ccode in l])
+  c = lminus1[lminus1.keys()[0]]
+  dists = []
+  minx = []
+  maxx = []
+  for i in range(len(c)):
+    vals = [lminus1[ccode][i] for ccode in lminus1.keys() if lminus1[ccode][i] != None]
+    if len(vals) < 8:
+      dists += [None]
+      minx += [None]
+      maxx += [None]
+    else:
+      vals.sort()
+      median = vals[len(vals)/2]
+      q1 = vals[len(vals)/4]
+      q2 = vals[(3*len(vals))/4]
+      qd = q2 - q1
+      vals = [v for v in vals if median - qd*4 < v and  v < median + qd*4]
+      if len(vals) < 8:
+        dists += [None]
+        minx += [None]
+        maxx += [None]
+        continue
+      mu, signma = norm.fit(vals)
+      dists += [(mu, signma)]
+      maxx += [norm.ppf(0.9999, mu, signma)]
+      minx += [norm.ppf(1 - 0.9999, mu, signma)]
+  ## print minx[-1], maxx[-1]
+  return minx, maxx
+
+# Makes pretty plots
+def raw_plot(series, minc, maxc, labels, xtitle):
+    assert len(xtitle) == 3
+    fname, stitle, slegend = xtitle
+
+    font = {'family' : 'Bitstream Vera Sans',
+        'weight' : 'normal',
+        'size'   : 8}
+    matplotlib.rc('font', **font)
+
+    ylim( (-max(series)*0.1, max(series)*1.1) )
+    plot(labels, series, linewidth=1.0, label="Users")
+
+    wherefill = []
+    for mm,mx in zip(minc, maxc):
+      wherefill += [not (mm == None and mx == None)]
+      assert mm < mx or (mm == None and mx == None)
+
+    fill_between(labels, minc, maxc, where=wherefill, color="gray", label="Prediction")
+
+    vdown = []
+    vup = []
+    for i,v in enumerate(series):
+      if minc[i] != None and v < minc[i]:
+        vdown += [v]
+        vup += [None]
+      elif maxc[i] != None and v > maxc[i]:
+        vdown += [None]
+        vup += [v]
+      else:
+        vup += [None]
+        vdown += [None]
+
+    plot(labels, vdown, 'o', ms=10, lw=2, alpha=0.5, mfc='orange', label="Downturns")
+    plot(labels, vup, 'o', ms=10, lw=2, alpha=0.5, mfc='green', label="Upturns")
+
+    legend(loc=2)
+
+    xlabel('Time (days)')
+    ylabel('Users')
+    title(stitle)
+    grid(True)
+    F = gcf()
+
+    F.set_size_inches(10,5)
+    F.savefig(fname,  format="png", dpi = (150))
+    close()
+
+def absolute_plot(series, minc, maxc, labels,INTERVAL, xtitle):
+  in_minc = []
+  in_maxc = []
+  for i, v in enumerate(series):
+    if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None:
+      in_minc += [minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])]
+      in_maxc += [maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])]
+      if not in_minc[-1] < in_maxc[-1]:
+        print in_minc[-1], in_maxc[-1], series[i-INTERVAL], minc[i], maxc[i]
+      assert in_minc[-1] < in_maxc[-1]
+    else:
+      in_minc += [None]
+      in_maxc += [None]
+  raw_plot(series, in_minc, in_maxc, labels, xtitle)
+
+"""Return the number of downscores and upscores of a time series
+'series', given tendencies 'minc' and 'maxc' for the time interval
+'INTERVAL'.
+
+If 'scoring_interval' is specifed we only consider upscore/downscore
+that happened in the latest 'scoring_interval' days.
+"""
+def censor_score(series, minc, maxc, INTERVAL, scoring_interval=None):
+  upscore = 0
+  downscore = 0
+
+  if scoring_interval is None:
+    scoring_interval = len(series)
+  assert(len(series) >= scoring_interval)
+
+  for i, v in enumerate(series):
+    if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None:
+      in_minc = minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])
+      in_maxc = maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])
+      if (i >= (len(series) - scoring_interval)):
+        downscore += 1 if minc[i] != None and v < in_minc else 0
+        upscore += 1 if maxc[i] != None and v > in_maxc else 0
+
+  return downscore, upscore
+
+def plot_target(tss, TARGET, xtitle, minx, maxx, DAYS=365, INTERV = 7):
+  ctarget = tss.get_country_series(TARGET)
+  c = n_day_rel(ctarget, INTERV)
+  absolute_plot(ctarget[-DAYS:], minx[-DAYS:], maxx[-DAYS:], tss.all_dates[-DAYS:],INTERV, xtitle = xtitle)
+
+def write_censorship_report_prologue(report_file, dates, notification_period):
+  if (notification_period == 1):
+    date_str = "%s" % (dates[-1]) # no need for date range if it's just one day
+  else:
+    date_str = "%s to %s" % (dates[-notification_period], dates[-1])
+
+  prologue = "=======================\n"
+  prologue += "Automatic Censorship Report for %s\n" % (date_str)
+  prologue += "=======================\n\n"
+  report_file.write(prologue)
+
+## Make a league table of censorship + nice graphs
+def plot_all(tss, minx, maxx, INTERV, DAYS=None, rdir="img"):
+  rdir = os.path.realpath(rdir)
+  if not os.path.exists(rdir) or not os.path.isdir(rdir):
+    print "ERROR: %s does not exist or is not a directory." % rdir
+    return
+
+  summary_file = file(os.path.join(rdir, "summary.txt"), "w")
+
+  if DAYS == None:
+    DAYS = 6*31
+
+  s = tss.get_largest(200)
+  scores = []
+  for num, li in s:
+    print ".",
+    ds,us = censor_score(tss.get_country_series(li)[-DAYS:], minx[-DAYS:], maxx[-DAYS:], INTERV)
+    # print ds, us
+    scores += [(ds,num, us, li)]
+  scores.sort()
+  scores.reverse()
+  s = "\n=======================\n"
+  s+= "Report for %s to %s\n" % (tss.all_dates[-DAYS], tss.all_dates[-1])
+  s+= "=======================\n"
+  print s
+  summary_file.write(s)
+  for a,nx, b,c in scores:
+    if a > 0:
+      s = "%s -- down: %2d (up: %2d affected: %s)" % (c, a, b, nx)
+      print s
+      summary_file.write(s + "\n")
+      xtitle = (os.path.join(rdir, "%03d-%s-censor.png" % (a,c)), "Tor report for %s -- down: %2d (up: %2d affected: %s)" % (c, a, b, nx),"")
+      plot_target(tss, c,xtitle, minx, maxx, DAYS, INTERV)
+  summary_file.close()
+
+"""Write a CSV report on the minimum/maximum users of each country per date."""
+def write_all(tss, minc, maxc, RANGES_FILE, INTERVAL=7):
+  ranges_file = file(RANGES_FILE, "w")
+  ranges_file.write("date,country,minusers,maxusers\n")
+  exclude = set(["all", "??", "date"])
+  for c in tss.country_codes:
+    if c in exclude:
+      continue
+    series = tss.get_country_series(c)
+    for i, v in enumerate(series):
+      if i > 0 and i - INTERVAL >= 0 and series[i] != None and series[i-INTERVAL] != None and series[i-INTERVAL] != 0 and minc[i]!= None and maxc[i]!= None:
+        minv = minc[i] * poisson.ppf(1-0.9999, series[i-INTERVAL])
+        maxv = maxc[i] * poisson.ppf(0.9999, series[i-INTERVAL])
+        if not minv < maxv:
+          print minv, maxv, series[i-INTERVAL], minc[i], maxc[i]
+        assert minv < maxv
+        ranges_file.write("%s,%s,%s,%s\n" % (tss.all_dates[i], c, minv, maxv))
+  ranges_file.close()
+
+"""Return a URL that points to a graph in metrics.tpo that displays
+the number of direct Tor users in country 'country_code', for a
+'period'-days period.
+
+Let's hope that the metrics.tpo URL scheme doesn't change often.
+"""
+def get_tor_usage_graph_url_for_cc_and_date(country_code, dates, period):
+  url = "https://metrics.torproject.org/users.html?graph=userstats-relay-country&start=%s&end=%s&country=%s&events=on#userstats-relay-country\n" % \
+      (dates[-period], dates[-1], country_code)
+  return url
+
+"""Write a file containing a short censorship report over the last
+'notification_period' days.
+"""
+def write_ml_report(tss, minx, maxx, INTERV, DAYS, notification_period=None):
+  if notification_period is None:
+    notification_period = DAYS
+
+  report_file = codecs.open('short_censorship_report.txt', 'w', 'utf-8')
+  file_prologue_written = False
+
+  s = tss.get_largest(None) # no restrictions, get 'em all.
+  scores = []
+  for num, li in s:
+    ds,us = censor_score(tss.get_country_series(li)[-DAYS:], minx[-DAYS:], maxx[-DAYS:], INTERV, notification_period)
+    scores += [(ds,num, us, li)]
+  scores.sort()
+  scores.reverse()
+
+  for downscores,users_n,upscores,country_code in scores:
+    if (downscores > 0) or (upscores > 0):
+      if not file_prologue_written:
+        write_censorship_report_prologue(report_file, tss.all_dates, notification_period)
+        file_prologue_written = True
+
+      if ((upscores > 0) and (downscores == 0)):
+        s = "We detected an unusual spike of Tor users in %s (%d upscores, %d users):\n" % \
+            (get_country_name_from_cc(country_code), upscores, users_n)
+      else:
+        s = "We detected %d potential censorship events in %s (users: %d, upscores: %d):\n" % \
+            (downscores, get_country_name_from_cc(country_code), users_n, upscores)
+
+      # Also give out a link for the appropriate usage graph for a 90-days period.
+      s += get_tor_usage_graph_url_for_cc_and_date(country_code, tss.all_dates, 90)
+
+      report_file.write(s + "\n")
+
+  report_file.close()
+
+# INTERV is the time interval to model connection rates;
+# consider maximum DAYS days back.
+def detect(CSV_FILE = "userstats-detector.csv",
+           RANGES_FILE = "userstats-ranges.csv", GRAPH_DIR = "img",
+           INTERV = 7, DAYS = 6 * 31, REPORT = True):
+  tss = torstatstore(CSV_FILE)
+  l = tss.get_largest_locations(50)
+  minx, maxx = make_tendencies_minmax(l, INTERV)
+  #plot_all(tss, minx, maxx, INTERV, DAYS, rdir=GRAPH_DIR)
+  write_all(tss, minx, maxx, RANGES_FILE, INTERV)
+
+  if REPORT:
+    # Make our short report; only consider events of the last day
+    write_ml_report(tss, minx, maxx, INTERV, DAYS, 1)
+
+def main():
+  detect()
+
+if __name__ == "__main__":
+    main()
diff --git a/modules/clients/init-userstats.sql b/modules/clients/init-userstats.sql
new file mode 100644
index 0000000..7c5df3d
--- /dev/null
+++ b/modules/clients/init-userstats.sql
@@ -0,0 +1,575 @@
+-- Copyright 2013 The Tor Project
+-- See LICENSE for licensing information
+
+-- Use enum types for dimensions that may only change if we write new code
+-- to support them.  For example, if there's a new node type beyond relay
+-- and bridge, we'll have to write code to support it.  This is in
+-- contrast to dimensions like country, transport, or version which don't
+-- have their possible values hard-coded anywhere.
+CREATE TYPE node AS ENUM ('relay', 'bridge');
+CREATE TYPE metric AS ENUM ('responses', 'bytes', 'status');
+
+-- All new data first goes into the imported table.  The import tool
+-- should do some trivial checks for invalid or duplicate data, but
+-- ultimately, we're going to do these checks in the database.  For
+-- example, the import tool could avoid importing data from the same
+-- descriptor more than once, but it's fine to import the same history
+-- string from distinct descriptors multiple times.  The import tool must,
+-- however, make sure that stats_end is not greater than 00:00:00 of the
+-- day following stats_start.  There are no constraints set on this table,
+-- because importing data should be really, really fast.  Once the newly
+-- imported data is successfully processed, the imported table is emptied.
+CREATE TABLE imported (
+
+  -- The 40-character upper-case hex string identifies a descriptor
+  -- uniquely and is used to join metrics (responses, bytes, status)
+  -- published by the same node (relay or bridge).
+  fingerprint CHARACTER(40) NOT NULL,
+
+  -- The node type is used to decide the statistics that this entry will
+  -- be part of.
+  node node NOT NULL,
+
+  -- The metric of this entry describes the stored observation type.
+  -- We'll want to store different metrics published by a node:
+  -- - 'responses' are the number of v3 network status consensus requests
+  --   that the node responded to;
+  -- - 'bytes' are the number of bytes that the node wrote when answering
+  --   directory requests;
+  -- - 'status' are the intervals when the node was listed as running in
+  --   the network status published by either the directory authorities or
+  --   bridge authority.
+  metric metric NOT NULL,
+
+  -- The two-letter lower-case country code that the observation in this
+  -- entry can be attributed to; can be '??' if no country information is
+  -- known for this entry, or '' (empty string) if this entry summarizes
+  -- observations for all countries.
+  country CHARACTER VARYING(2) NOT NULL,
+
+  -- The pluggable transport name that the observation in this entry can
+  -- be attributed to; can be '<OR>' if no pluggable transport was used,
+  -- '<??>' if an unknown pluggable transport was used, or '' (empty
+  -- string) if this entry summarizes observations for all transports.
+  transport CHARACTER VARYING(20) NOT NULL,
+
+  -- The IP address version that the observation in this entry can be
+  -- attributed to; can be 'v4' or 'v6' or '' (empty string) if this entry
+  -- summarizes observations for all IP address versions.
+  version CHARACTER VARYING(2) NOT NULL,
+
+  -- The interval start of this observation.
+  stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+
+  -- The interval end of this observation.  This timestamp must be greater
+  -- than stats_start and must not be greater than 00:00:00 of the day
+  -- following stats_start, which the import tool must make sure.
+  stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+
+  -- Finally, the observed value.
+  val DOUBLE PRECISION NOT NULL
+);
+
+-- After importing new data into the imported table, they are merged into
+-- the merged table using the merge() function.  The merged table contains
+-- the same data as the imported table, except:
+-- (1) there are no duplicate or overlapping entries in the merged table
+--     with respect to stats_start and stats_end and the same fingerprint,
+--     node, metric, country, transport, and version columns;
+-- (2) all subsequent intervals with the same node, metric, country,
+--     transport, version, and stats_start date are compressed into a
+--     single entry.
+CREATE TABLE merged (
+
+  -- The unique key that is only used when merging newly imported data
+  -- into this table.
+  id SERIAL PRIMARY KEY,
+
+  -- All other columns have the same meaning as in the imported table.
+  fingerprint CHARACTER(40) NOT NULL,
+  node node NOT NULL,
+  metric metric NOT NULL,
+  country CHARACTER VARYING(2) NOT NULL,
+  transport CHARACTER VARYING(20) NOT NULL,
+  version CHARACTER VARYING(2) NOT NULL,
+  stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+  stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+  val DOUBLE PRECISION NOT NULL
+);
+
+-- After merging new data into the merged table, they are aggregated to
+-- daily user number estimates using the aggregate() function.  Only dates
+-- with new data in the imported table will be recomputed in the
+-- aggregated table.  The aggregated components follow the algorithm
+-- proposed in Tor Tech Report 2012-10-001.
+CREATE TABLE aggregated (
+
+  -- The date of these aggregated observations.
+  date DATE NOT NULL,
+
+  -- The node, country, transport, and version columns all have the same
+  -- meaning as in the imported table.
+  node node NOT NULL,
+  country CHARACTER VARYING(2) NOT NULL DEFAULT '',
+  transport CHARACTER VARYING(20) NOT NULL DEFAULT '',
+  version CHARACTER VARYING(2) NOT NULL DEFAULT '',
+
+  -- Total number of reported responses, possibly broken down by country,
+  -- transport, or version if either of them is not ''.  See r(R) in the
+  -- tech report.
+  rrx DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+  -- Total number of seconds of nodes reporting responses, possibly broken
+  -- down by country, transport, or version if either of them is not ''.
+  -- This would be referred to as n(R) in the tech report, though it's not
+  -- used there.
+  nrx DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+  -- Total number of reported bytes.  See h(H) in the tech report.
+  hh DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+  -- Total number of seconds of nodes in the status.  See n(N) in the tech
+  -- report.
+  nn DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+  -- Number of reported bytes of nodes that reported both responses and
+  -- bytes.  See h(R intersect H) in the tech report.
+  hrh DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+  -- Number of seconds of nodes reporting bytes.  See n(H) in the tech
+  -- report.
+  nh DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+  -- Number of seconds of nodes reporting responses but no bytes.  See
+  -- n(R \ H) in the tech report.
+  nrh DOUBLE PRECISION NOT NULL DEFAULT 0
+);
+
+CREATE LANGUAGE plpgsql;
+
+-- Merge new entries from the imported table into the merged table, and
+-- compress them while doing so.  This function first executes a query to
+-- match all entries in the imported table with adjacent or even
+-- overlapping entries in the merged table.  It then loops over query
+-- results and either inserts or updates entries in the merged table.  The
+-- idea is to leave query optimization to the database and only touch
+-- as few entries as possible while running this function.
+CREATE OR REPLACE FUNCTION merge() RETURNS VOID AS $$
+DECLARE
+
+  -- The current record that we're handling in the loop body.
+  cur RECORD;
+
+  -- Various information about the last record we processed, so that we
+  -- can merge the current record with the last one if possible.
+  last_fingerprint CHARACTER(40) := NULL;
+  last_node node;
+  last_metric metric;
+  last_country CHARACTER VARYING(2);
+  last_transport CHARACTER VARYING(20);
+  last_version CHARACTER VARYING(2);
+  last_start TIMESTAMP WITHOUT TIME ZONE;
+  last_end TIMESTAMP WITHOUT TIME ZONE;
+  last_id INTEGER;
+  last_val DOUBLE PRECISION;
+
+  -- Interval end and value of the last record before updating them in the
+  -- last loop step.  In a few edge cases, we may update an entry and
+  -- learn in the next loop step that the updated entry overlaps with the
+  -- subsequent entry.  In these cases we'll have to undo the update,
+  -- which is why we're storing the updated values.
+  undo_end TIMESTAMP WITHOUT TIME ZONE;
+  undo_val DOUBLE PRECISION;
+
+BEGIN
+  RAISE NOTICE '% Starting to merge.', timeofday();
+
+  -- TODO Maybe we'll have to materialize a merged_part table that only
+  -- contains dates IN (SELECT DISTINCT DATE(stats_start) FROM imported)
+  -- and use that in the query below.
+
+  -- Loop over results from a query that joins new entries in the imported
+  -- table with existing entries in the merged table.
+  FOR cur IN SELECT DISTINCT
+
+    -- Select id, interval start and end, and value of the existing entry
+    -- in merged; all these fields may be null if the imported entry is
+    -- not adjacent to an existing one.
+    merged.id AS merged_id,
+    merged.stats_start AS merged_start,
+    merged.stats_end AS merged_end,
+    merged.val AS merged_val,
+
+    -- Select interval start and end and value of the newly imported
+    -- entry.
+    imported.stats_start AS imported_start,
+    imported.stats_end AS imported_end,
+    imported.val AS imported_val,
+
+    -- Select columns that define the group of entries that can be merged
+    -- in the merged table.
+    imported.fingerprint AS fingerprint,
+    imported.node AS node,
+    imported.metric AS metric,
+    imported.country AS country,
+    imported.transport AS transport,
+    imported.version AS version
+
+    -- Select these columns from all entries in the imported table, plus
+    -- do an outer join on the merged table to find adjacent entries that
+    -- we might want to merge the new entries with.  It's possible that we
+    -- handle the same imported entry twice, if it starts directly after
+    -- one existing entry and ends directly before another existing entry.
+    FROM imported LEFT JOIN merged
+
+    -- First two join conditions are to find adjacent intervals.  In fact,
+    -- we also include overlapping intervals here, so that we can skip the
+    -- overlapping entry in the imported table.
+    ON imported.stats_end >= merged.stats_start AND
+       imported.stats_start <= merged.stats_end AND
+
+       -- Further join conditions are same date, fingerprint, node, etc.,
+       -- so that we don't merge entries that don't belong together.
+       DATE(imported.stats_start) = DATE(merged.stats_start) AND
+       imported.fingerprint = merged.fingerprint AND
+       imported.node = merged.node AND
+       imported.metric = merged.metric AND
+       imported.country = merged.country AND
+       imported.transport = merged.transport AND
+       imported.version = merged.version
+
+    -- Ordering is key, or our approach to merge subsequent entries is
+    -- going to break.
+    ORDER BY imported.fingerprint, imported.node, imported.metric,
+             imported.country, imported.transport, imported.version,
+             imported.stats_start, merged.stats_start, imported.stats_end
+
+  -- Now go through the results one by one.
+  LOOP
+
+    -- Log that we're done with the query and about to start merging.
+    IF last_fingerprint IS NULL THEN
+      RAISE NOTICE '% Query returned, now merging entries.', timeofday();
+    END IF;
+
+    -- If we're processing the very first entry or if we have reached a
+    -- new group of entries that belong together, (re-)set last_*
+    -- variables.
+    IF last_fingerprint IS NULL OR
+        DATE(cur.imported_start) <> DATE(last_start) OR
+        cur.fingerprint <> last_fingerprint OR
+        cur.node <> last_node OR
+        cur.metric <> last_metric OR
+        cur.country <> last_country OR
+        cur.transport <> last_transport OR
+        cur.version <> last_version THEN
+      last_id := -1;
+      last_start := '1970-01-01 00:00:00';
+      last_end := '1970-01-01 00:00:00';
+      last_val := -1;
+    END IF;
+
+    -- Remember all fields that determine the group of which entries
+    -- belong together.
+    last_fingerprint := cur.fingerprint;
+    last_node := cur.node;
+    last_metric := cur.metric;
+    last_country := cur.country;
+    last_transport := cur.transport;
+    last_version := cur.version;
+
+    -- If the existing entry that we're currently looking at starts before
+    -- the previous entry ends, we have created two overlapping entries in
+    -- the last iteration, and that is not allowed.  Undo the previous
+    -- change.
+    IF cur.merged_start IS NOT NULL AND
+        cur.merged_start < last_end AND
+        undo_end IS NOT NULL AND undo_val IS NOT NULL THEN
+      UPDATE merged SET stats_end = undo_end, val = undo_val
+        WHERE id = last_id;
+      undo_end := NULL;
+      undo_val := NULL;
+
+    -- If there is no adjacent entry to the one we're about to merge,
+    -- insert it as new entry.
+    ELSIF cur.merged_end IS NULL THEN
+      IF cur.imported_start > last_end THEN
+        last_start := cur.imported_start;
+        last_end := cur.imported_end;
+        last_val := cur.imported_val;
+        INSERT INTO merged (fingerprint, node, metric, country, transport,
+                            version, stats_start, stats_end, val)
+          VALUES (last_fingerprint, last_node, last_metric, last_country,
+                  last_transport, last_version, last_start, last_end,
+                  last_val)
+          RETURNING id INTO last_id;
+
+      -- If there was no adjacent entry before starting to merge, but
+      -- there is now one ending right before the new entry starts, merge
+      -- the new entry into the existing one.
+      ELSIF cur.imported_start = last_end THEN
+        last_val := last_val + cur.imported_val;
+        last_end := cur.imported_end;
+        UPDATE merged SET stats_end = last_end, val = last_val
+          WHERE id = last_id;
+      END IF;
+
+      -- There's no risk of this entry overlapping with the next.
+      undo_end := NULL;
+      undo_val := NULL;
+
+    -- If the new entry ends right when an existing entry starts, but
+    -- there's a gap between when the previously processed entry ends and
+    -- when the new entry starts, merge the new entry with the existing
+    -- entry we're currently looking at.
+    ELSIF cur.imported_end = cur.merged_start THEN
+      IF cur.imported_start > last_end THEN
+        last_id := cur.merged_id;
+        last_start := cur.imported_start;
+        last_end := cur.merged_end;
+        last_val := cur.imported_val + cur.merged_val;
+        UPDATE merged SET stats_start = last_start, val = last_val
+          WHERE id = last_id;
+
+      -- If the new entry ends right when an existing entry starts and
+      -- there's no gap between when the previousl processed entry ends
+      -- and when the new entry starts, merge the new entry with the other
+      -- two entries.  This happens by deleting the previous entry and
+      -- expanding the subsequent entry to cover all three entries.
+      ELSIF cur.imported_start = last_end THEN
+        DELETE FROM merged WHERE id = last_id;
+        last_id := cur.merged_id;
+        last_end := cur.merged_end;
+        last_val := last_val + cur.merged_val;
+        UPDATE merged SET stats_start = last_start, val = last_val
+          WHERE id = last_id;
+      END IF;
+
+      -- There's no risk of this entry overlapping with the next.
+      undo_end := NULL;
+      undo_val := NULL;
+
+    -- If the new entry starts right when an existing entry ends, but
+    -- there's a gap between the previously processed entry and the
+    -- existing one, extend the existing entry.  There's a special case
+    -- when this operation is false and must be undone, which is when the
+    -- newly added entry overlaps with the subsequent entry.  That's why
+    -- we have to store the old interval end and value, so that this
+    -- operation can be undone in the next loop iteration.
+    ELSIF cur.imported_start = cur.merged_end THEN
+      IF last_end < cur.imported_start THEN
+        undo_end := cur.merged_end;
+        undo_val := cur.merged_val;
+        last_id := cur.merged_id;
+        last_start := cur.merged_start;
+        last_end := cur.imported_end;
+        last_val := cur.merged_val + cur.imported_val;
+        UPDATE merged SET stats_end = last_end, val = last_val
+          WHERE id = last_id;
+
+      -- If the new entry starts right when an existing entry ends and
+      -- there's no gap between the previously processed entry and the
+      -- existing entry, extend the existing entry.  This is very similar
+      -- to the previous case.  The same reasoning about possibly having
+      -- to undo this operation applies.
+      ELSE
+        undo_end := cur.merged_end;
+        undo_val := last_val;
+        last_end := cur.imported_end;
+        last_val := last_val + cur.imported_val;
+        UPDATE merged SET stats_end = last_end, val = last_val
+          WHERE id = last_id;
+      END IF;
+
+    -- If none of the cases above applies, there must have been an overlap
+    -- between the new entry and an existing one.  Skip the new entry.
+    ELSE
+      last_id := cur.merged_id;
+      last_start := cur.merged_start;
+      last_end := cur.merged_end;
+      last_val := cur.merged_val;
+      undo_end := NULL;
+      undo_val := NULL;
+    END IF;
+  END LOOP;
+
+  -- That's it, we're done merging.
+  RAISE NOTICE '% Finishing merge.', timeofday();
+  RETURN;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Aggregate user estimates for all dates that have updated entries in the
+-- merged table.  This function first creates a temporary table with
+-- new or updated observations, then removes all existing estimates for
+-- the dates to be updated, and finally inserts newly computed aggregates
+-- for these dates.
+CREATE OR REPLACE FUNCTION aggregate() RETURNS VOID AS $$
+BEGIN
+  RAISE NOTICE '% Starting aggregate step.', timeofday();
+
+  -- Create a new temporary table containing all relevant information
+  -- needed to update the aggregated table.  In this table, we sum up all
+  -- observations of a given type by reporting node.  This query is
+  -- (temporarily) materialized, because we need to combine its entries
+  -- multiple times in various ways.  A (non-materialized) view would have
+  -- meant to re-compute this query multiple times.
+  CREATE TEMPORARY TABLE update AS
+    SELECT fingerprint, node, metric, country, transport, version,
+           DATE(stats_start), SUM(val) AS val,
+           SUM(CAST(EXTRACT(EPOCH FROM stats_end - stats_start)
+               AS DOUBLE PRECISION)) AS seconds
+    FROM merged
+    WHERE DATE(stats_start) IN (
+          SELECT DISTINCT DATE(stats_start) FROM imported)
+    GROUP BY fingerprint, node, metric, country, transport, version,
+             DATE(stats_start);
+
+  -- Delete all entries from the aggregated table that we're about to
+  -- re-compute.
+  DELETE FROM aggregated WHERE date IN (SELECT DISTINCT date FROM update);
+
+  -- Insert partly empty results for all existing combinations of date,
+  -- node ('relay' or 'bridge'), country, transport, and version.  Only
+  -- the rrx and nrx fields will contain number and seconds of reported
+  -- responses for the given combination of date, node, etc., while the
+  -- other fields will be updated below.
+  INSERT INTO aggregated (date, node, country, transport, version, rrx,
+      nrx)
+    SELECT date, node, country, transport, version, SUM(val) AS rrx,
+    SUM(seconds) AS nrx
+    FROM update WHERE metric = 'responses'
+    GROUP BY date, node, country, transport, version;
+
+  -- Create another temporary table with only those entries that aren't
+  -- broken down by any dimension.  This table is much smaller, so the
+  -- following operations are much faster.
+  CREATE TEMPORARY TABLE update_no_dimensions AS
+    SELECT fingerprint, node, metric, date, val, seconds FROM update
+    WHERE country = ''
+    AND transport = ''
+    AND version = '';
+
+  -- Update results in the aggregated table by setting aggregates based
+  -- on reported directory bytes.  These aggregates are only based on
+  -- date and node, so that the same values are set for all combinations
+  -- of country, transport, and version.
+  UPDATE aggregated
+    SET hh = aggregated_bytes.hh, nh = aggregated_bytes.nh
+    FROM (
+      SELECT date, node, SUM(val) AS hh, SUM(seconds) AS nh
+      FROM update_no_dimensions
+      WHERE metric = 'bytes'
+      GROUP BY date, node
+    ) aggregated_bytes
+    WHERE aggregated.date = aggregated_bytes.date
+    AND aggregated.node = aggregated_bytes.node;
+
+  -- Update results based on nodes being contained in the network status.
+  UPDATE aggregated
+    SET nn = aggregated_status.nn
+    FROM (
+      SELECT date, node, SUM(seconds) AS nn
+      FROM update_no_dimensions
+      WHERE metric = 'status'
+      GROUP BY date, node
+    ) aggregated_status
+    WHERE aggregated.date = aggregated_status.date
+    AND aggregated.node = aggregated_status.node;
+
+  -- Update results based on nodes reporting both bytes and responses.
+  UPDATE aggregated
+    SET hrh = aggregated_bytes_responses.hrh
+    FROM (
+      SELECT bytes.date, bytes.node,
+             SUM((LEAST(bytes.seconds, responses.seconds)
+                 * bytes.val) / bytes.seconds) AS hrh
+      FROM update_no_dimensions bytes
+      LEFT JOIN update_no_dimensions responses
+      ON bytes.date = responses.date
+      AND bytes.fingerprint = responses.fingerprint
+      AND bytes.node = responses.node
+      WHERE bytes.metric = 'bytes'
+      AND responses.metric = 'responses'
+      GROUP BY bytes.date, bytes.node
+    ) aggregated_bytes_responses
+    WHERE aggregated.date = aggregated_bytes_responses.date
+    AND aggregated.node = aggregated_bytes_responses.node;
+
+  -- Update results based on notes reporting responses but no bytes.
+  UPDATE aggregated
+    SET nrh = aggregated_responses_bytes.nrh
+    FROM (
+      SELECT responses.date, responses.node,
+             SUM(GREATEST(0, responses.seconds
+                             - COALESCE(bytes.seconds, 0))) AS nrh
+      FROM update_no_dimensions responses
+      LEFT JOIN update_no_dimensions bytes
+      ON responses.date = bytes.date
+      AND responses.fingerprint = bytes.fingerprint
+      AND responses.node = bytes.node
+      WHERE responses.metric = 'responses'
+      AND bytes.metric = 'bytes'
+      GROUP BY responses.date, responses.node
+    ) aggregated_responses_bytes
+    WHERE aggregated.date = aggregated_responses_bytes.date
+    AND aggregated.node = aggregated_responses_bytes.node;
+
+  -- We're done aggregating new data.
+  RAISE NOTICE '% Finishing aggregate step.', timeofday();
+  RETURN;
+END;
+$$ LANGUAGE plpgsql;
+
+-- User-friendly view on the aggregated table that implements the
+-- algorithm proposed in Tor Tech Report 2012-10-001.  This view returns
+-- user number estimates for both relay and bridge staistics, possibly
+-- broken down by country or transport or version.
+CREATE OR REPLACE VIEW estimated AS SELECT
+
+  -- The date of this user number estimate.
+  a.date,
+
+  -- The node type, which is either 'relay' or 'bridge'.
+  a.node,
+
+  -- The two-letter lower-case country code of this estimate; can be '??'
+  -- for an estimate of users that could not be resolved to any country,
+  -- or '' (empty string) for an estimate of all users, regardless of
+  -- country.
+  a.country,
+
+  -- The pluggable transport name of this estimate; can be '<OR>' for an
+  -- estimate of users that did not use any pluggable transport, '<??>'
+  -- for unknown pluggable transports, or '' (empty string) for an
+  -- estimate of all users, regardless of transport.
+  a.transport,
+
+  -- The IP address version of this estimate; can be 'v4' or 'v6', or ''
+  -- (empty string) for an estimate of all users, regardless of IP address
+  -- version.
+  a.version,
+
+  -- Estimated fraction of nodes reporting directory requests, which is
+  -- used to extrapolate observed requests to estimated total requests in
+  -- the network.  The closer this fraction is to 1.0, the more precise
+  -- the estimation.
+  CAST(a.frac * 100 AS INTEGER) AS frac,
+
+  -- Finally, the estimate number of users.
+  CAST(a.rrx / (a.frac * 10) AS INTEGER) AS users
+
+  -- Implement the estimation method in a subquery, so that the ugly
+  -- formula only has to be written once.
+  FROM (
+    SELECT date, node, country, transport, version, rrx, nrx,
+           (hrh * nh + hh * nrh) / (hh * nn) AS frac
+    FROM aggregated WHERE hh * nn > 0.0) a
+
+  -- Only include estimates with at least 10% of nodes reporting directory
+  -- request statistics.
+  WHERE a.frac BETWEEN 0.1 AND 1.0
+
+  -- Order results.
+  ORDER BY date DESC, node, version, transport, country;
+
diff --git a/modules/clients/merge-clients.R b/modules/clients/merge-clients.R
new file mode 100644
index 0000000..cce7e9d
--- /dev/null
+++ b/modules/clients/merge-clients.R
@@ -0,0 +1,19 @@
+require(reshape)
+r <- read.csv("userstats-ranges.csv", stringsAsFactors = FALSE)
+r <- melt(r, id.vars = c("date", "country"))
+r <- data.frame(date = r$date, node = "relay", country = r$country,
+  transport = "", version = "",
+  variable = ifelse(r$variable == "maxusers", "upper", "lower"),
+  value = floor(r$value))
+u <- read.csv("userstats.csv", stringsAsFactors = FALSE)
+u <- melt(u, id.vars = c("date", "node", "country", "transport",
+  "version"))
+u <- data.frame(date = u$date, node = u$node, country = u$country,
+  transport = u$transport, version = u$version,
+  variable = ifelse(u$variable == "frac", "frac", "clients"),
+  value = u$value)
+c <- rbind(r, u)
+c <- cast(c, date + node + country + transport + version ~ variable)
+c <- c[order(as.Date(c$date), c$node, c$country, c$transport, c$version), ]
+write.csv(c, "clients.csv", quote = FALSE, row.names = FALSE, na = "")
+
diff --git a/modules/clients/src/org/torproject/metrics/clients/Main.java b/modules/clients/src/org/torproject/metrics/clients/Main.java
new file mode 100644
index 0000000..2e6712b
--- /dev/null
+++ b/modules/clients/src/org/torproject/metrics/clients/Main.java
@@ -0,0 +1,465 @@
+/* Copyright 2013 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.metrics.clients;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TimeZone;
+import java.util.TreeMap;
+
+import org.torproject.descriptor.BandwidthHistory;
+import org.torproject.descriptor.BridgeNetworkStatus;
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorFile;
+import org.torproject.descriptor.DescriptorReader;
+import org.torproject.descriptor.DescriptorSourceFactory;
+import org.torproject.descriptor.ExtraInfoDescriptor;
+import org.torproject.descriptor.NetworkStatusEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
+
+public class Main {
+
+  public static void main(String[] args) throws Exception {
+    parseArgs(args);
+    parseRelayDescriptors();
+    parseBridgeDescriptors();
+    closeOutputFiles();
+  }
+
+  private static boolean writeToSingleFile = true;
+  private static boolean byStatsDateNotByDescHour = false;
+
+  private static void parseArgs(String[] args) {
+    if (args.length == 0) {
+      writeToSingleFile = true;
+    } else if (args.length == 1 && args[0].equals("--stats-date")) {
+      writeToSingleFile = false;
+      byStatsDateNotByDescHour = true;
+    } else if (args.length == 1 && args[0].equals("--desc-hour")) {
+      writeToSingleFile = false;
+      byStatsDateNotByDescHour = false;
+    } else {
+      System.err.println("Usage: java " + Main.class.getName()
+          + " [ --stats-date | --desc-hour ]");
+      System.exit(1);
+    }
+  }
+
+  private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L,
+      ONE_DAY_MILLIS = 24L * ONE_HOUR_MILLIS,
+      ONE_WEEK_MILLIS = 7L * ONE_DAY_MILLIS;
+
+  private static void parseRelayDescriptors() throws Exception {
+    DescriptorReader descriptorReader =
+        DescriptorSourceFactory.createDescriptorReader();
+    descriptorReader.setExcludeFiles(new File(
+        "status/relay-descriptors"));
+    descriptorReader.addDirectory(new File(
+        "../../shared/in/recent/relay-descriptors"));
+    Iterator<DescriptorFile> descriptorFiles =
+        descriptorReader.readDescriptors();
+    while (descriptorFiles.hasNext()) {
+      DescriptorFile descriptorFile = descriptorFiles.next();
+      for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+        if (descriptor instanceof ExtraInfoDescriptor) {
+          parseRelayExtraInfoDescriptor((ExtraInfoDescriptor) descriptor);
+        } else if (descriptor instanceof RelayNetworkStatusConsensus) {
+          parseRelayNetworkStatusConsensus(
+              (RelayNetworkStatusConsensus) descriptor);
+        }
+      }
+    }
+  }
+
+  private static void parseRelayExtraInfoDescriptor(
+      ExtraInfoDescriptor descriptor) throws IOException {
+    long publishedMillis = descriptor.getPublishedMillis();
+    String fingerprint = descriptor.getFingerprint().
+        toUpperCase();
+    long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
+    long dirreqStatsIntervalLengthMillis =
+        descriptor.getDirreqStatsIntervalLength() * 1000L;
+    SortedMap<String, Integer> requests = descriptor.getDirreqV3Reqs();
+    BandwidthHistory dirreqWriteHistory =
+        descriptor.getDirreqWriteHistory();
+    parseRelayDirreqV3Reqs(fingerprint, publishedMillis,
+        dirreqStatsEndMillis, dirreqStatsIntervalLengthMillis, requests);
+    parseRelayDirreqWriteHistory(fingerprint, publishedMillis,
+        dirreqWriteHistory);
+  }
+
+  private static void parseRelayDirreqV3Reqs(String fingerprint,
+      long publishedMillis, long dirreqStatsEndMillis,
+      long dirreqStatsIntervalLengthMillis,
+      SortedMap<String, Integer> requests) throws IOException {
+    if (requests == null ||
+        publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS ||
+        dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) {
+      /* Cut off all observations that are one week older than
+       * the descriptor publication time, or we'll have to update
+       * weeks of aggregate values every hour. */
+      return;
+    }
+    long statsStartMillis = dirreqStatsEndMillis
+        - dirreqStatsIntervalLengthMillis;
+    long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS)
+        * ONE_DAY_MILLIS;
+    for (int i = 0; i < 2; i++) {
+      long fromMillis = i == 0 ? statsStartMillis
+          : utcBreakMillis;
+      long toMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis;
+      if (fromMillis >= toMillis) {
+        continue;
+      }
+      double intervalFraction =  ((double) (toMillis - fromMillis))
+          / ((double) dirreqStatsIntervalLengthMillis);
+      double sum = 0L;
+      for (Map.Entry<String, Integer> e : requests.entrySet()) {
+        String country = e.getKey();
+        double reqs = ((double) e.getValue()) - 4.0;
+        sum += reqs;
+        writeOutputLine(fingerprint, "relay", "responses", country,
+            "", "", fromMillis, toMillis, reqs * intervalFraction,
+            publishedMillis);
+      }
+      writeOutputLine(fingerprint, "relay", "responses", "", "",
+          "", fromMillis, toMillis, sum * intervalFraction,
+          publishedMillis);
+    }
+  }
+
+  private static void parseRelayDirreqWriteHistory(String fingerprint,
+      long publishedMillis, BandwidthHistory dirreqWriteHistory)
+      throws IOException {
+    if (dirreqWriteHistory == null ||
+        publishedMillis - dirreqWriteHistory.getHistoryEndMillis()
+        > ONE_WEEK_MILLIS) {
+      return;
+      /* Cut off all observations that are one week older than
+       * the descriptor publication time, or we'll have to update
+       * weeks of aggregate values every hour. */
+    }
+    long intervalLengthMillis =
+        dirreqWriteHistory.getIntervalLength() * 1000L;
+    for (Map.Entry<Long, Long> e :
+        dirreqWriteHistory.getBandwidthValues().entrySet()) {
+      long intervalEndMillis = e.getKey();
+      long intervalStartMillis =
+          intervalEndMillis - intervalLengthMillis;
+      for (int i = 0; i < 2; i++) {
+        long fromMillis = intervalStartMillis;
+        long toMillis = intervalEndMillis;
+        double writtenBytes = (double) e.getValue();
+        if (intervalStartMillis / ONE_DAY_MILLIS <
+            intervalEndMillis / ONE_DAY_MILLIS) {
+          long utcBreakMillis = (intervalEndMillis
+              / ONE_DAY_MILLIS) * ONE_DAY_MILLIS;
+          if (i == 0) {
+            toMillis = utcBreakMillis;
+          } else if (i == 1) {
+            fromMillis = utcBreakMillis;
+          }
+          double intervalFraction = ((double) (toMillis - fromMillis))
+              / ((double) intervalLengthMillis);
+          writtenBytes *= intervalFraction;
+        } else if (i == 1) {
+          break;
+        }
+        writeOutputLine(fingerprint, "relay", "bytes", "", "", "",
+            fromMillis, toMillis, writtenBytes, publishedMillis);
+      }
+    }
+  }
+
+  private static void parseRelayNetworkStatusConsensus(
+      RelayNetworkStatusConsensus consensus) throws IOException {
+    long fromMillis = consensus.getValidAfterMillis();
+    long toMillis = consensus.getFreshUntilMillis();
+    for (NetworkStatusEntry statusEntry :
+        consensus.getStatusEntries().values()) {
+      String fingerprint = statusEntry.getFingerprint().
+          toUpperCase();
+      if (statusEntry.getFlags().contains("Running")) {
+        writeOutputLine(fingerprint, "relay", "status", "", "", "",
+            fromMillis, toMillis, 0.0, fromMillis);
+      }
+    }
+  }
+
+  private static void parseBridgeDescriptors() throws Exception {
+    DescriptorReader descriptorReader =
+        DescriptorSourceFactory.createDescriptorReader();
+    descriptorReader.setExcludeFiles(new File(
+        "status/bridge-descriptors"));
+    descriptorReader.addDirectory(new File(
+        "../../shared/in/recent/bridge-descriptors"));
+    Iterator<DescriptorFile> descriptorFiles =
+        descriptorReader.readDescriptors();
+    while (descriptorFiles.hasNext()) {
+      DescriptorFile descriptorFile = descriptorFiles.next();
+      for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+        if (descriptor instanceof ExtraInfoDescriptor) {
+          parseBridgeExtraInfoDescriptor(
+              (ExtraInfoDescriptor) descriptor);
+        } else if (descriptor instanceof BridgeNetworkStatus) {
+          parseBridgeNetworkStatus((BridgeNetworkStatus) descriptor);
+        }
+      }
+    }
+  }
+
+  private static void parseBridgeExtraInfoDescriptor(
+      ExtraInfoDescriptor descriptor) throws IOException {
+    String fingerprint = descriptor.getFingerprint().toUpperCase();
+    long publishedMillis = descriptor.getPublishedMillis();
+    long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
+    long dirreqStatsIntervalLengthMillis =
+        descriptor.getDirreqStatsIntervalLength() * 1000L;
+    parseBridgeDirreqV3Resp(fingerprint, publishedMillis,
+        dirreqStatsEndMillis, dirreqStatsIntervalLengthMillis,
+        descriptor.getDirreqV3Resp(),
+        descriptor.getBridgeIps(),
+        descriptor.getBridgeIpTransports(),
+        descriptor.getBridgeIpVersions());
+
+    parseBridgeDirreqWriteHistory(fingerprint, publishedMillis,
+          descriptor.getDirreqWriteHistory());
+  }
+
+  private static void parseBridgeDirreqV3Resp(String fingerprint,
+      long publishedMillis, long dirreqStatsEndMillis,
+      long dirreqStatsIntervalLengthMillis,
+      SortedMap<String, Integer> responses,
+      SortedMap<String, Integer> bridgeIps,
+      SortedMap<String, Integer> bridgeIpTransports,
+      SortedMap<String, Integer> bridgeIpVersions) throws IOException {
+    if (responses == null ||
+        publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS ||
+        dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) {
+      /* Cut off all observations that are one week older than
+       * the descriptor publication time, or we'll have to update
+       * weeks of aggregate values every hour. */
+      return;
+    }
+    long statsStartMillis = dirreqStatsEndMillis
+        - dirreqStatsIntervalLengthMillis;
+    long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS)
+        * ONE_DAY_MILLIS;
+    double resp = ((double) responses.get("ok")) - 4.0;
+    if (resp > 0.0) {
+      for (int i = 0; i < 2; i++) {
+        long fromMillis = i == 0 ? statsStartMillis
+            : utcBreakMillis;
+        long toMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis;
+        if (fromMillis >= toMillis) {
+          continue;
+        }
+        double intervalFraction = ((double) (toMillis - fromMillis))
+            / ((double) dirreqStatsIntervalLengthMillis);
+        writeOutputLine(fingerprint, "bridge", "responses", "", "",
+            "", fromMillis, toMillis, resp * intervalFraction,
+            publishedMillis);
+        parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
+            dirreqStatsIntervalLengthMillis, "country", bridgeIps,
+            publishedMillis);
+        parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
+            dirreqStatsIntervalLengthMillis, "transport",
+            bridgeIpTransports, publishedMillis);
+        parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
+            dirreqStatsIntervalLengthMillis, "version", bridgeIpVersions,
+            publishedMillis);
+      }
+    }
+  }
+
+  private static void parseBridgeRespByCategory(String fingerprint,
+      long fromMillis, long toMillis, double resp,
+      long dirreqStatsIntervalLengthMillis, String category,
+      SortedMap<String, Integer> frequencies, long publishedMillis)
+      throws IOException {
+    double total = 0.0;
+    SortedMap<String, Double> frequenciesCopy =
+        new TreeMap<String, Double>();
+    if (frequencies != null) {
+      for (Map.Entry<String, Integer> e : frequencies.entrySet()) {
+        if (e.getValue() < 4.0) {
+          continue;
+        }
+        double r = ((double) e.getValue()) - 4.0;
+        frequenciesCopy.put(e.getKey(), r);
+        total += r;
+      }
+    }
+    /* If we're not told any frequencies, or at least none of them are
+     * greater than 4, put in a default that we'll attribute all responses
+     * to. */
+    if (total == 0) {
+      if (category.equals("country")) {
+        frequenciesCopy.put("??", 4.0);
+      } else if (category.equals("transport")) {
+        frequenciesCopy.put("<OR>", 4.0);
+      } else if (category.equals("version")) {
+        frequenciesCopy.put("v4", 4.0);
+      }
+      total = 4.0;
+    }
+    for (Map.Entry<String, Double> e : frequenciesCopy.entrySet()) {
+      double intervalFraction = ((double) (toMillis - fromMillis))
+          / ((double) dirreqStatsIntervalLengthMillis);
+      double val = resp * intervalFraction * e.getValue() / total;
+      if (category.equals("country")) {
+        writeOutputLine(fingerprint, "bridge", "responses", e.getKey(),
+            "", "", fromMillis, toMillis, val, publishedMillis);
+      } else if (category.equals("transport")) {
+        writeOutputLine(fingerprint, "bridge", "responses", "",
+            e.getKey(), "", fromMillis, toMillis, val, publishedMillis);
+      } else if (category.equals("version")) {
+        writeOutputLine(fingerprint, "bridge", "responses", "", "",
+            e.getKey(), fromMillis, toMillis, val, publishedMillis);
+      }
+    }
+  }
+
+  private static void parseBridgeDirreqWriteHistory(String fingerprint,
+      long publishedMillis, BandwidthHistory dirreqWriteHistory)
+      throws IOException {
+    if (dirreqWriteHistory == null ||
+        publishedMillis - dirreqWriteHistory.getHistoryEndMillis()
+        > ONE_WEEK_MILLIS) {
+      /* Cut off all observations that are one week older than
+       * the descriptor publication time, or we'll have to update
+       * weeks of aggregate values every hour. */
+      return;
+    }
+    long intervalLengthMillis =
+        dirreqWriteHistory.getIntervalLength() * 1000L;
+    for (Map.Entry<Long, Long> e :
+        dirreqWriteHistory.getBandwidthValues().entrySet()) {
+      long intervalEndMillis = e.getKey();
+      long intervalStartMillis =
+          intervalEndMillis - intervalLengthMillis;
+      for (int i = 0; i < 2; i++) {
+        long fromMillis = intervalStartMillis;
+        long toMillis = intervalEndMillis;
+        double writtenBytes = (double) e.getValue();
+        if (intervalStartMillis / ONE_DAY_MILLIS <
+            intervalEndMillis / ONE_DAY_MILLIS) {
+          long utcBreakMillis = (intervalEndMillis
+              / ONE_DAY_MILLIS) * ONE_DAY_MILLIS;
+          if (i == 0) {
+            toMillis = utcBreakMillis;
+          } else if (i == 1) {
+            fromMillis = utcBreakMillis;
+          }
+          double intervalFraction = ((double) (toMillis - fromMillis))
+              / ((double) intervalLengthMillis);
+          writtenBytes *= intervalFraction;
+        } else if (i == 1) {
+          break;
+        }
+        writeOutputLine(fingerprint, "bridge", "bytes", "",
+            "", "", fromMillis, toMillis, writtenBytes, publishedMillis);
+      }
+    }
+  }
+
+  private static void parseBridgeNetworkStatus(BridgeNetworkStatus status)
+      throws IOException {
+    long publishedMillis = status.getPublishedMillis();
+    long fromMillis = (publishedMillis / ONE_HOUR_MILLIS)
+        * ONE_HOUR_MILLIS;
+    long toMillis = fromMillis + ONE_HOUR_MILLIS;
+    for (NetworkStatusEntry statusEntry :
+        status.getStatusEntries().values()) {
+      String fingerprint = statusEntry.getFingerprint().
+          toUpperCase();
+      if (statusEntry.getFlags().contains("Running")) {
+        writeOutputLine(fingerprint, "bridge", "status", "", "", "",
+            fromMillis, toMillis, 0.0, publishedMillis);
+      }
+    }
+  }
+
+  private static Map<String, BufferedWriter> openOutputFiles =
+      new HashMap<String, BufferedWriter>();
+  private static void writeOutputLine(String fingerprint, String node,
+      String metric, String country, String transport, String version,
+      long fromMillis, long toMillis, double val, long publishedMillis)
+      throws IOException {
+    if (fromMillis > toMillis) {
+      return;
+    }
+    String fromDateTime = formatDateTimeMillis(fromMillis);
+    String toDateTime = formatDateTimeMillis(toMillis);
+    BufferedWriter bw = getOutputFile(fromDateTime, publishedMillis);
+    bw.write(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%.1f\n",
+        fingerprint, node, metric, country, transport, version,
+        fromDateTime, toDateTime, val));
+  }
+
+  private static SimpleDateFormat dateTimeFormat = null;
+  private static String formatDateTimeMillis(long millis) {
+    if (dateTimeFormat == null) {
+      dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+      dateTimeFormat.setLenient(false);
+      dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    }
+    return dateTimeFormat.format(millis);
+  }
+
+  private static BufferedWriter getOutputFile(String fromDateTime,
+      long publishedMillis) throws IOException {
+    String outputFileName;
+    if (writeToSingleFile) {
+      outputFileName = "out/userstats.sql";
+    } else if (byStatsDateNotByDescHour) {
+      outputFileName = "out/userstats-" + fromDateTime.substring(0, 10)
+          + ".sql";
+    } else {
+      String publishedHourDateTime = formatDateTimeMillis(
+          (publishedMillis / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS);
+      outputFileName = "out/userstats-"
+          + publishedHourDateTime.substring(0, 10) + "-"
+          + publishedHourDateTime.substring(11, 13) + ".sql";
+    }
+    BufferedWriter bw = openOutputFiles.get(outputFileName);
+    if (bw == null) {
+      bw = openOutputFile(outputFileName);
+      openOutputFiles.put(outputFileName, bw);
+    }
+    return bw;
+  }
+
+  private static BufferedWriter openOutputFile(String outputFileName)
+      throws IOException {
+    File outputFile = new File(outputFileName);
+    outputFile.getParentFile().mkdirs();
+    BufferedWriter bw = new BufferedWriter(new FileWriter(
+        outputFileName));
+    bw.write("BEGIN;\n");
+    bw.write("LOCK TABLE imported NOWAIT;\n");
+    bw.write("COPY imported (fingerprint, node, metric, country, "
+        + "transport, version, stats_start, stats_end, val) FROM "
+        + "stdin;\n");
+    return bw;
+  }
+
+  private static void closeOutputFiles() throws IOException {
+    for (BufferedWriter bw : openOutputFiles.values()) {
+      bw.write("\\.\n");
+      bw.write("SELECT merge();\n");
+      bw.write("SELECT aggregate();\n");
+      bw.write("TRUNCATE imported;\n");
+      bw.write("COMMIT;\n");
+      bw.close();
+    }
+  }
+}
+
diff --git a/modules/clients/test-userstats.sql b/modules/clients/test-userstats.sql
new file mode 100644
index 0000000..66f8b82
--- /dev/null
+++ b/modules/clients/test-userstats.sql
@@ -0,0 +1,478 @@
+BEGIN;
+SET search_path TO tap, public;
+SELECT plan(152);
+SET client_min_messages = warning;
+
+-- Make sure enums are as expected.
+SELECT has_enum('node');
+SELECT enum_has_labels('node', ARRAY['relay', 'bridge']);
+SELECT has_enum('metric');
+SELECT enum_has_labels('metric', ARRAY['responses', 'bytes', 'status']);
+
+-- Make sure that the imported table is exactly as the importer expects
+-- it.
+SELECT has_table('imported');
+SELECT has_column('imported', 'fingerprint');
+SELECT col_type_is('imported', 'fingerprint', 'CHARACTER(40)');
+SELECT col_not_null('imported', 'fingerprint');
+SELECT has_column('imported', 'node');
+SELECT col_type_is('imported', 'node', 'node');
+SELECT col_not_null('imported', 'node');
+SELECT has_column('imported', 'metric');
+SELECT col_type_is('imported', 'metric', 'metric');
+SELECT col_not_null('imported', 'metric');
+SELECT has_column('imported', 'country');
+SELECT col_type_is('imported', 'country', 'CHARACTER VARYING(2)');
+SELECT col_not_null('imported', 'country');
+SELECT has_column('imported', 'transport');
+SELECT col_type_is('imported', 'transport', 'CHARACTER VARYING(20)');
+SELECT col_not_null('imported', 'transport');
+SELECT has_column('imported', 'version');
+SELECT col_type_is('imported', 'version', 'CHARACTER VARYING(2)');
+SELECT col_not_null('imported', 'version');
+SELECT has_column('imported', 'stats_start');
+SELECT col_type_is('imported', 'stats_start',
+  'TIMESTAMP WITHOUT TIME ZONE');
+SELECT col_not_null('imported', 'stats_start');
+SELECT has_column('imported', 'stats_end');
+SELECT col_type_is('imported', 'stats_end',
+  'TIMESTAMP WITHOUT TIME ZONE');
+SELECT col_not_null('imported', 'stats_end');
+SELECT has_column('imported', 'val');
+SELECT col_type_is('imported', 'val', 'DOUBLE PRECISION');
+SELECT col_not_null('imported', 'val');
+SELECT hasnt_pk('imported');
+
+-- Make sure that the internally-used merged table is exactly as merge()
+-- expects it.
+SELECT has_table('merged');
+SELECT has_column('merged', 'id');
+SELECT col_type_is('merged', 'id', 'INTEGER');
+SELECT col_is_pk('merged', 'id');
+SELECT has_column('merged', 'fingerprint');
+SELECT col_type_is('merged', 'fingerprint', 'CHARACTER(40)');
+SELECT col_not_null('merged', 'fingerprint');
+SELECT has_column('merged', 'node');
+SELECT col_type_is('merged', 'node', 'node');
+SELECT col_not_null('merged', 'node');
+SELECT has_column('merged', 'metric');
+SELECT col_type_is('merged', 'metric', 'metric');
+SELECT col_not_null('merged', 'metric');
+SELECT has_column('merged', 'country');
+SELECT col_type_is('merged', 'country', 'CHARACTER VARYING(2)');
+SELECT col_not_null('merged', 'country');
+SELECT has_column('merged', 'transport');
+SELECT col_type_is('merged', 'transport', 'CHARACTER VARYING(20)');
+SELECT col_not_null('merged', 'transport');
+SELECT has_column('merged', 'version');
+SELECT col_type_is('merged', 'version', 'CHARACTER VARYING(2)');
+SELECT col_not_null('merged', 'version');
+SELECT has_column('merged', 'stats_start');
+SELECT col_type_is('merged', 'stats_start',
+  'TIMESTAMP WITHOUT TIME ZONE');
+SELECT col_not_null('merged', 'stats_start');
+SELECT has_column('merged', 'stats_end');
+SELECT col_type_is('merged', 'stats_end',
+  'TIMESTAMP WITHOUT TIME ZONE');
+SELECT col_not_null('merged', 'stats_end');
+SELECT has_column('merged', 'val');
+SELECT col_type_is('merged', 'val', 'DOUBLE PRECISION');
+SELECT col_not_null('merged', 'val');
+
+-- Make sure that the internally-used aggregated table is exactly as
+-- aggregate() expects it.
+SELECT has_table('aggregated');
+SELECT has_column('aggregated', 'date');
+SELECT col_type_is('aggregated', 'date', 'DATE');
+SELECT col_not_null('aggregated', 'date');
+SELECT has_column('aggregated', 'node');
+SELECT col_type_is('aggregated', 'node', 'node');
+SELECT col_not_null('aggregated', 'node');
+SELECT has_column('aggregated', 'country');
+SELECT col_type_is('aggregated', 'country', 'CHARACTER VARYING(2)');
+SELECT col_not_null('aggregated', 'country');
+SELECT col_default_is('aggregated', 'country', '');
+SELECT has_column('aggregated', 'transport');
+SELECT col_type_is('aggregated', 'transport', 'CHARACTER VARYING(20)');
+SELECT col_not_null('aggregated', 'transport');
+SELECT col_default_is('aggregated', 'transport', '');
+SELECT has_column('aggregated', 'version');
+SELECT col_type_is('aggregated', 'version', 'CHARACTER VARYING(2)');
+SELECT col_not_null('aggregated', 'version');
+SELECT col_default_is('aggregated', 'version', '');
+SELECT has_column('aggregated', 'rrx');
+SELECT col_type_is('aggregated', 'rrx', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'rrx');
+SELECT col_default_is('aggregated', 'rrx', 0);
+SELECT has_column('aggregated', 'nrx');
+SELECT col_type_is('aggregated', 'nrx', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'nrx');
+SELECT col_default_is('aggregated', 'nrx', 0);
+SELECT has_column('aggregated', 'hh');
+SELECT col_type_is('aggregated', 'hh', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'hh');
+SELECT col_default_is('aggregated', 'hh', 0);
+SELECT has_column('aggregated', 'nn');
+SELECT col_type_is('aggregated', 'nn', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'nn');
+SELECT col_default_is('aggregated', 'nn', 0);
+SELECT has_column('aggregated', 'hrh');
+SELECT col_type_is('aggregated', 'hrh', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'hrh');
+SELECT col_default_is('aggregated', 'hrh', 0);
+SELECT has_column('aggregated', 'nh');
+SELECT col_type_is('aggregated', 'nh', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'nh');
+SELECT col_default_is('aggregated', 'nh', 0);
+SELECT has_column('aggregated', 'nrh');
+SELECT col_type_is('aggregated', 'nrh', 'DOUBLE PRECISION');
+SELECT col_not_null('aggregated', 'nrh');
+SELECT col_default_is('aggregated', 'nrh', 0);
+
+-- Create temporary tables that hide the actual tables, so that we don't
+-- have to care about existing data, not even in a transaction that we're
+-- going to roll back.  Temporarily set log level to warning to avoid
+-- messages about implicitly created sequences and indexes.
+CREATE TEMPORARY TABLE imported (
+  fingerprint CHARACTER(40) NOT NULL,
+  node node NOT NULL,
+  metric metric NOT NULL,
+  country CHARACTER VARYING(2) NOT NULL,
+  transport CHARACTER VARYING(20) NOT NULL,
+  version CHARACTER VARYING(2) NOT NULL,
+  stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+  stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+  val DOUBLE PRECISION NOT NULL
+);
+CREATE TEMPORARY TABLE merged (
+  id SERIAL PRIMARY KEY,
+  fingerprint CHARACTER(40) NOT NULL,
+  node node NOT NULL,
+  metric metric NOT NULL,
+  country CHARACTER VARYING(2) NOT NULL,
+  transport CHARACTER VARYING(20) NOT NULL,
+  version CHARACTER VARYING(2) NOT NULL,
+  stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+  stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+  val DOUBLE PRECISION NOT NULL
+);
+CREATE TEMPORARY TABLE aggregated (
+  date DATE NOT NULL,
+  node node NOT NULL,
+  country CHARACTER VARYING(2) NOT NULL DEFAULT '',
+  transport CHARACTER VARYING(20) NOT NULL DEFAULT '',
+  version CHARACTER VARYING(2) NOT NULL DEFAULT '',
+  rrx DOUBLE PRECISION NOT NULL DEFAULT 0,
+  nrx DOUBLE PRECISION NOT NULL DEFAULT 0,
+  hh DOUBLE PRECISION NOT NULL DEFAULT 0,
+  nn DOUBLE PRECISION NOT NULL DEFAULT 0,
+  hrh DOUBLE PRECISION NOT NULL DEFAULT 0,
+  nh DOUBLE PRECISION NOT NULL DEFAULT 0,
+  nrh DOUBLE PRECISION NOT NULL DEFAULT 0
+);
+
+-- Test merging newly imported data.
+PREPARE new_imported(TIMESTAMP WITHOUT TIME ZONE,
+  TIMESTAMP WITHOUT TIME ZONE) AS INSERT INTO imported
+  (fingerprint, node, metric, country, transport, version, stats_start,
+  stats_end, val) VALUES ('1234567890123456789012345678901234567890',
+  'relay', 'status', '', '', '', $1, $2, 0);
+PREPARE new_merged(TIMESTAMP WITHOUT TIME ZONE,
+  TIMESTAMP WITHOUT TIME ZONE) AS INSERT INTO merged
+  (fingerprint, node, metric, country, transport, version, stats_start,
+  stats_end, val) VALUES ('1234567890123456789012345678901234567890',
+  'relay', 'status', '', '', '', $1, $2, 0);
+
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should insert new entry into empty table as is');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 14:00:00');
+EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 14:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+           ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should insert two non-contiguous entries');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 15:00:00');
+EXECUTE new_imported('2013-04-11 15:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should merge two contiguous entries');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts before and ends after the start of ' ||
+  'another new entry');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 15:00:00');
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts at and ends after the start of ' ||
+  'another new entry');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts after another new entry starts and ' ||
+  'ends before that entry ends');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that has same start and end as another new entry');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 16:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts before and ends at the end of ' ||
+  'another new entry');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 16:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+           ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should insert entry that ends before existing entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 15:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should merge entry that ends when existing entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 14:30:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_start FROM merged',
+  $$VALUES ('2013-04-11 14:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts before but ends after existing entry ' ||
+  'starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 11:00:00', '2013-04-11 13:00:00');
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 15:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 13:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+           ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts when existing entry ends but ' ||
+  'ends before another entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts when existing entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 15:00:00', '2013-04-11 16:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts after and ends before existing entry');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that is already contained');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that ends when existing entry ends');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 18:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts before but ends after existing entry ' ||
+  'ends');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+EXECUTE new_merged('2013-04-11 18:00:00', '2013-04-11 19:00:00');
+EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 18:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+           ('2013-04-11 19:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts before existing entry ends and ends ' ||
+  'when another entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 11:00:00', '2013-04-11 13:00:00');
+EXECUTE new_merged('2013-04-11 15:00:00', '2013-04-11 17:00:00');
+EXECUTE new_imported('2013-04-11 12:00:00', '2013-04-11 16:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 13:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+           ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts before existing entry ends and ends ' ||
+  'after another entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+EXECUTE new_imported('2013-04-11 15:00:00', '2013-04-11 16:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should merge entry that ends when existing entry starts');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 14:00:00', '2013-04-11 15:00:00');
+EXECUTE new_imported('2013-04-11 16:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+           ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should insert entry that starts after existing entry ends');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 15:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 14:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts before existing entry starts and ' ||
+  'ends after that entry ends');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_merged('2013-04-11 13:00:00', '2013-04-11 14:00:00');
+EXECUTE new_merged('2013-04-11 15:00:00', '2013-04-11 16:00:00');
+EXECUTE new_imported('2013-04-11 12:00:00', '2013-04-11 17:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 14:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+           ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should skip entry that starts before and ends after multiple ' ||
+  'existing entries');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 23:00:00', '2013-04-12 00:00:00');
+EXECUTE new_imported('2013-04-12 00:00:00', '2013-04-12 01:00:00');
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-12 00:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+           ('2013-04-12 01:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should insert two contiguous entries that end and start at midnight');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 12:00:00', '2013-04-11 17:00:00');
+INSERT INTO imported (fingerprint, node, metric, country, transport,
+  version, stats_start, stats_end, val) VALUES
+  ('9876543210987654321098765432109876543210', 'relay', 'status', '', '',
+  '', '2013-04-11 12:00:00', '2013-04-11 17:00:00', 0);
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+           ('2013-04-11 17:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should import two entries with different fingerprints and same ' ||
+  'start and end');
+DELETE FROM imported;
+DELETE FROM merged;
+
+EXECUTE new_imported('2013-04-11 13:00:00', '2013-04-11 15:00:00');
+INSERT INTO imported (fingerprint, node, metric, country, transport,
+  version, stats_start, stats_end, val) VALUES
+  ('9876543210987654321098765432109876543210', 'relay', 'status', '', '',
+  '', '2013-04-11 14:00:00', '2013-04-11 16:00:00', 0);
+SELECT merge();
+SELECT bag_eq('SELECT stats_end FROM merged',
+  $$VALUES ('2013-04-11 15:00:00'::TIMESTAMP WITHOUT TIME ZONE),
+           ('2013-04-11 16:00:00'::TIMESTAMP WITHOUT TIME ZONE)$$,
+  'Should import two entries with overlapping starts and ends and ' ||
+  'different fingerprints');
+DELETE FROM imported;
+DELETE FROM merged;
+
+-- TODO Test aggregating imported and merged data.
+
+-- Make sure that the results view has the exact definition as expected
+-- for the .csv export.
+SELECT has_view('estimated');
+SELECT has_column('estimated', 'date');
+SELECT col_type_is('estimated', 'date', 'DATE');
+SELECT has_column('estimated', 'node');
+SELECT col_type_is('estimated', 'node', 'node');
+SELECT has_column('estimated', 'country');
+SELECT col_type_is('estimated', 'country', 'CHARACTER VARYING(2)');
+SELECT has_column('estimated', 'transport');
+SELECT col_type_is('estimated', 'transport', 'CHARACTER VARYING(20)');
+SELECT has_column('estimated', 'version');
+SELECT col_type_is('estimated', 'version', 'CHARACTER VARYING(2)');
+SELECT has_column('estimated', 'frac');
+SELECT col_type_is('estimated', 'frac', 'INTEGER');
+SELECT has_column('estimated', 'users');
+SELECT col_type_is('estimated', 'users', 'INTEGER');
+
+-- TODO Test that frac and users are computed correctly in the view.
+
+-- Finish tests.
+SELECT * FROM finish();
+RESET client_min_messages;
+ROLLBACK;
+
diff --git a/modules/clients/userstats-detector.R b/modules/clients/userstats-detector.R
new file mode 100644
index 0000000..c3a9041
--- /dev/null
+++ b/modules/clients/userstats-detector.R
@@ -0,0 +1,18 @@
+library("reshape")
+export_userstats_detector <- function(path) {
+  c <- read.csv("userstats.csv", stringsAsFactors = FALSE)
+  c <- c[c$country != '' & c$transport == '' & c$version == '' &
+         c$node == 'relay', ]
+  u <- data.frame(country = c$country, date = c$date, users = c$users,
+                  stringsAsFactors = FALSE)
+  u <- rbind(u, data.frame(country = "zy",
+                aggregate(list(users = u$users),
+                          by = list(date = u$date), sum)))
+  u <- data.frame(date = u$date, country = u$country,
+                  users = floor(u$users))
+  u <- cast(u, date ~ country, value = "users")
+  names(u)[names(u) == "zy"] <- "all"
+  write.csv(u, path, quote = FALSE, row.names = FALSE)
+}
+export_userstats_detector("userstats-detector.csv")
+
diff --git a/shared/bin/80-run-clients-stats.sh b/shared/bin/80-run-clients-stats.sh
new file mode 100755
index 0000000..325570c
--- /dev/null
+++ b/shared/bin/80-run-clients-stats.sh
@@ -0,0 +1,30 @@
+#!/bin/sh
+set -e
+
+cd modules/clients/
+
+echo `date` "Parsing descriptors."
+ant | grep "\[java\]"
+
+for i in $(ls out/*.sql)
+do
+  echo `date` "Importing $i."
+  psql -f $i userstats
+done
+
+echo `date` "Exporting results."
+psql -c 'COPY (SELECT * FROM estimated) TO STDOUT WITH CSV HEADER;' userstats > userstats.csv
+
+echo `date` "Running censorship detector."
+R --slave -f userstats-detector.R > /dev/null 2>&1
+python detector.py
+
+echo `date` "Merging censorship detector results."
+R --slave -f merge-clients.R > /dev/null 2>&1
+mkdir -p stats/
+cp clients.csv stats/
+
+echo `date` "Terminating."
+
+cd ../../
+
diff --git a/shared/bin/99-copy-stats-files.sh b/shared/bin/99-copy-stats-files.sh
index 5493cf8..4a30f24 100755
--- a/shared/bin/99-copy-stats-files.sh
+++ b/shared/bin/99-copy-stats-files.sh
@@ -3,4 +3,5 @@ mkdir -p shared/stats
 cp -a modules/legacy/stats/*.csv shared/stats/
 cp -a modules/advbwdist/stats/advbwdist.csv shared/stats/
 cp -a modules/hidserv/stats/hidserv.csv shared/stats/
+cp -a modules/clients/stats/clients.csv shared/stats/
 





More information about the tor-commits mailing list