[tor-commits] [onionoo/master] Split clients data writer into two classes.
karsten at torproject.org
karsten at torproject.org
Fri Apr 11 07:38:01 UTC 2014
commit 864c148564a6ec3be0d979b126b8ca88f01c4912
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date: Tue Apr 8 21:13:02 2014 +0200
Split clients data writer into two classes.
---
src/org/torproject/onionoo/ClientsDataWriter.java | 486 --------------------
.../torproject/onionoo/ClientsDocumentWriter.java | 302 ++++++++++++
.../torproject/onionoo/ClientsStatusUpdater.java | 230 +++++++++
src/org/torproject/onionoo/Main.java | 10 +-
4 files changed, 538 insertions(+), 490 deletions(-)
diff --git a/src/org/torproject/onionoo/ClientsDataWriter.java b/src/org/torproject/onionoo/ClientsDataWriter.java
deleted file mode 100644
index 7662f54..0000000
--- a/src/org/torproject/onionoo/ClientsDataWriter.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/* Copyright 2014 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.onionoo;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.torproject.descriptor.Descriptor;
-import org.torproject.descriptor.ExtraInfoDescriptor;
-
-/*
- * Example extra-info descriptor used as input:
- *
- * extra-info ndnop2 DE6397A047ABE5F78B4C87AF725047831B221AAB
- * dirreq-stats-end 2014-02-16 16:42:11 (86400 s)
- * dirreq-v3-resp ok=856,not-enough-sigs=0,unavailable=0,not-found=0,
- * not-modified=40,busy=0
- * bridge-stats-end 2014-02-16 16:42:17 (86400 s)
- * bridge-ips ??=8,in=8,se=8
- * bridge-ip-versions v4=8,v6=0
- *
- * Clients status file produced as intermediate output:
- *
- * 2014-02-15 16:42:11 2014-02-16 00:00:00
- * 259.042 in=86.347,se=86.347 v4=259.042
- * 2014-02-16 00:00:00 2014-02-16 16:42:11
- * 592.958 in=197.653,se=197.653 v4=592.958
- *
- * Clients document file produced as output:
- *
- * "1_month":{
- * "first":"2014-02-03 12:00:00",
- * "last":"2014-02-28 12:00:00",
- * "interval":86400,
- * "factor":0.139049349,
- * "count":26,
- * "values":[371,354,349,374,432,null,485,458,493,536,null,null,524,576,
- * 607,622,null,635,null,566,774,999,945,690,656,681],
- * "countries":{"cn":0.0192,"in":0.1768,"ir":0.2487,"ru":0.0104,
- * "se":0.1698,"sy":0.0325,"us":0.0406},
- * "transports":{"obfs2":0.4581},
- * "versions":{"v4":1.0000}}
- */
-public class ClientsDataWriter implements DescriptorListener,
- StatusUpdater, FingerprintListener, DocumentWriter {
-
- private DescriptorSource descriptorSource;
-
- private DocumentStore documentStore;
-
- private long now;
-
- public ClientsDataWriter(DescriptorSource descriptorSource,
- DocumentStore documentStore, Time time) {
- this.descriptorSource = descriptorSource;
- this.documentStore = documentStore;
- this.now = time.currentTimeMillis();
- this.registerDescriptorListeners();
- this.registerFingerprintListeners();
- }
-
- private void registerDescriptorListeners() {
- this.descriptorSource.registerDescriptorListener(this,
- DescriptorType.BRIDGE_EXTRA_INFOS);
- }
-
- private void registerFingerprintListeners() {
- this.descriptorSource.registerFingerprintListener(this,
- DescriptorType.BRIDGE_EXTRA_INFOS);
- }
-
- public void processDescriptor(Descriptor descriptor, boolean relay) {
- if (descriptor instanceof ExtraInfoDescriptor && !relay) {
- this.processBridgeExtraInfoDescriptor(
- (ExtraInfoDescriptor) descriptor);
- }
- }
-
- private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L,
- ONE_DAY_MILLIS = 24L * ONE_HOUR_MILLIS;
-
- private SortedMap<String, SortedSet<ClientsHistory>> newResponses =
- new TreeMap<String, SortedSet<ClientsHistory>>();
-
- private void processBridgeExtraInfoDescriptor(
- ExtraInfoDescriptor descriptor) {
- long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
- long dirreqStatsIntervalLengthMillis =
- descriptor.getDirreqStatsIntervalLength() * 1000L;
- SortedMap<String, Integer> responses = descriptor.getDirreqV3Resp();
- if (dirreqStatsEndMillis < 0L ||
- dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS ||
- responses == null || !responses.containsKey("ok")) {
- return;
- }
- double okResponses = (double) (responses.get("ok") - 4);
- if (okResponses < 0.0) {
- return;
- }
- String hashedFingerprint = descriptor.getFingerprint().toUpperCase();
- long dirreqStatsStartMillis = dirreqStatsEndMillis
- - dirreqStatsIntervalLengthMillis;
- long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS)
- * ONE_DAY_MILLIS;
- for (int i = 0; i < 2; i++) {
- long startMillis = i == 0 ? dirreqStatsStartMillis : utcBreakMillis;
- long endMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis;
- if (startMillis >= endMillis) {
- continue;
- }
- double totalResponses = okResponses
- * ((double) (endMillis - startMillis))
- / ((double) ONE_DAY_MILLIS);
- SortedMap<String, Double> responsesByCountry =
- this.weightResponsesWithUniqueIps(totalResponses,
- descriptor.getBridgeIps(), "??");
- SortedMap<String, Double> responsesByTransport =
- this.weightResponsesWithUniqueIps(totalResponses,
- descriptor.getBridgeIpTransports(), "<??>");
- SortedMap<String, Double> responsesByVersion =
- this.weightResponsesWithUniqueIps(totalResponses,
- descriptor.getBridgeIpVersions(), "");
- ClientsHistory newResponseHistory = new ClientsHistory(
- startMillis, endMillis, totalResponses, responsesByCountry,
- responsesByTransport, responsesByVersion);
- if (!this.newResponses.containsKey(hashedFingerprint)) {
- this.newResponses.put(hashedFingerprint,
- new TreeSet<ClientsHistory>());
- }
- this.newResponses.get(hashedFingerprint).add(
- newResponseHistory);
- }
- }
-
- private SortedMap<String, Double> weightResponsesWithUniqueIps(
- double totalResponses, SortedMap<String, Integer> uniqueIps,
- String omitString) {
- SortedMap<String, Double> weightedResponses =
- new TreeMap<String, Double>();
- int totalUniqueIps = 0;
- if (uniqueIps != null) {
- for (Map.Entry<String, Integer> e : uniqueIps.entrySet()) {
- if (e.getValue() > 4) {
- totalUniqueIps += e.getValue() - 4;
- }
- }
- }
- if (totalUniqueIps > 0) {
- for (Map.Entry<String, Integer> e : uniqueIps.entrySet()) {
- if (!e.getKey().equals(omitString) && e.getValue() > 4) {
- weightedResponses.put(e.getKey(),
- (((double) (e.getValue() - 4)) * totalResponses)
- / ((double) totalUniqueIps));
- }
- }
- }
- return weightedResponses;
- }
-
- public void updateStatuses() {
- for (Map.Entry<String, SortedSet<ClientsHistory>> e :
- this.newResponses.entrySet()) {
- String hashedFingerprint = e.getKey();
- ClientsStatus clientsStatus = this.documentStore.retrieve(
- ClientsStatus.class, true, hashedFingerprint);
- if (clientsStatus == null) {
- clientsStatus = new ClientsStatus();
- }
- this.addToHistory(clientsStatus, e.getValue());
- this.compressHistory(clientsStatus);
- this.documentStore.store(clientsStatus, hashedFingerprint);
- }
- Logger.printStatusTime("Updated clients status files");
- }
-
- private void addToHistory(ClientsStatus clientsStatus,
- SortedSet<ClientsHistory> newIntervals) {
- SortedSet<ClientsHistory> history = clientsStatus.history;
- for (ClientsHistory interval : newIntervals) {
- if ((history.headSet(interval).isEmpty() ||
- history.headSet(interval).last().endMillis <=
- interval.startMillis) &&
- (history.tailSet(interval).isEmpty() ||
- history.tailSet(interval).first().startMillis >=
- interval.endMillis)) {
- history.add(interval);
- }
- }
- }
-
- private void compressHistory(ClientsStatus clientsStatus) {
- SortedSet<ClientsHistory> history = clientsStatus.history;
- SortedSet<ClientsHistory> compressedHistory =
- new TreeSet<ClientsHistory>();
- ClientsHistory lastResponses = null;
- SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM");
- dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- String lastMonthString = "1970-01";
- for (ClientsHistory responses : history) {
- long intervalLengthMillis;
- if (this.now - responses.endMillis <=
- 92L * 24L * 60L * 60L * 1000L) {
- intervalLengthMillis = 24L * 60L * 60L * 1000L;
- } else if (this.now - responses.endMillis <=
- 366L * 24L * 60L * 60L * 1000L) {
- intervalLengthMillis = 2L * 24L * 60L * 60L * 1000L;
- } else {
- intervalLengthMillis = 10L * 24L * 60L * 60L * 1000L;
- }
- String monthString = dateTimeFormat.format(responses.startMillis);
- if (lastResponses != null &&
- lastResponses.endMillis == responses.startMillis &&
- ((lastResponses.endMillis - 1L) / intervalLengthMillis) ==
- ((responses.endMillis - 1L) / intervalLengthMillis) &&
- lastMonthString.equals(monthString)) {
- lastResponses.addResponses(responses);
- } else {
- if (lastResponses != null) {
- compressedHistory.add(lastResponses);
- }
- lastResponses = responses;
- }
- lastMonthString = monthString;
- }
- if (lastResponses != null) {
- compressedHistory.add(lastResponses);
- }
- clientsStatus.history = compressedHistory;
- }
-
- public void processFingerprints(SortedSet<String> fingerprints,
- boolean relay) {
- if (!relay) {
- this.updateDocuments.addAll(fingerprints);
- }
- }
-
- private SortedSet<String> updateDocuments = new TreeSet<String>();
-
- public void writeDocuments() {
- for (String hashedFingerprint : this.updateDocuments) {
- ClientsStatus clientsStatus = this.documentStore.retrieve(
- ClientsStatus.class, true, hashedFingerprint);
- if (clientsStatus == null) {
- continue;
- }
- SortedSet<ClientsHistory> history = clientsStatus.history;
- ClientsDocument clientsDocument = new ClientsDocument();
- clientsDocument.documentString = this.formatHistoryString(
- hashedFingerprint, history);
- this.documentStore.store(clientsDocument, hashedFingerprint);
- }
- Logger.printStatusTime("Wrote clients document files");
- }
-
- private String[] graphNames = new String[] {
- "1_week",
- "1_month",
- "3_months",
- "1_year",
- "5_years" };
-
- private long[] graphIntervals = new long[] {
- 7L * 24L * 60L * 60L * 1000L,
- 31L * 24L * 60L * 60L * 1000L,
- 92L * 24L * 60L * 60L * 1000L,
- 366L * 24L * 60L * 60L * 1000L,
- 5L * 366L * 24L * 60L * 60L * 1000L };
-
- private long[] dataPointIntervals = new long[] {
- 24L * 60L * 60L * 1000L,
- 24L * 60L * 60L * 1000L,
- 24L * 60L * 60L * 1000L,
- 2L * 24L * 60L * 60L * 1000L,
- 10L * 24L * 60L * 60L * 1000L };
-
- private String formatHistoryString(String hashedFingerprint,
- SortedSet<ClientsHistory> history) {
- StringBuilder sb = new StringBuilder();
- sb.append("{\"fingerprint\":\"" + hashedFingerprint + "\"");
- sb.append(",\n\"average_clients\":{");
- int graphIntervalsWritten = 0;
- for (int graphIntervalIndex = 0; graphIntervalIndex <
- this.graphIntervals.length; graphIntervalIndex++) {
- String timeline = this.formatTimeline(graphIntervalIndex, history);
- if (timeline != null) {
- sb.append((graphIntervalsWritten++ > 0 ? "," : "") + "\n"
- + timeline);
- }
- }
- sb.append("}");
- sb.append("\n}\n");
- return sb.toString();
- }
-
- private String formatTimeline(int graphIntervalIndex,
- SortedSet<ClientsHistory> history) {
- String graphName = this.graphNames[graphIntervalIndex];
- long graphInterval = this.graphIntervals[graphIntervalIndex];
- long dataPointInterval =
- this.dataPointIntervals[graphIntervalIndex];
- List<Double> dataPoints = new ArrayList<Double>();
- long intervalStartMillis = ((this.now - graphInterval)
- / dataPointInterval) * dataPointInterval;
- long millis = 0L;
- double responses = 0.0, totalResponses = 0.0;
- SortedMap<String, Double>
- totalResponsesByCountry = new TreeMap<String, Double>(),
- totalResponsesByTransport = new TreeMap<String, Double>(),
- totalResponsesByVersion = new TreeMap<String, Double>();
- for (ClientsHistory hist : history) {
- if (hist.endMillis < intervalStartMillis) {
- continue;
- }
- while ((intervalStartMillis / dataPointInterval) !=
- (hist.endMillis / dataPointInterval)) {
- dataPoints.add(millis * 2L < dataPointInterval
- ? -1.0 : responses * ((double) ONE_DAY_MILLIS)
- / (((double) millis) * 10.0));
- responses = 0.0;
- millis = 0L;
- intervalStartMillis += dataPointInterval;
- }
- responses += hist.totalResponses;
- totalResponses += hist.totalResponses;
- for (Map.Entry<String, Double> e :
- hist.responsesByCountry.entrySet()) {
- if (!totalResponsesByCountry.containsKey(e.getKey())) {
- totalResponsesByCountry.put(e.getKey(), 0.0);
- }
- totalResponsesByCountry.put(e.getKey(), e.getValue()
- + totalResponsesByCountry.get(e.getKey()));
- }
- for (Map.Entry<String, Double> e :
- hist.responsesByTransport.entrySet()) {
- if (!totalResponsesByTransport.containsKey(e.getKey())) {
- totalResponsesByTransport.put(e.getKey(), 0.0);
- }
- totalResponsesByTransport.put(e.getKey(), e.getValue()
- + totalResponsesByTransport.get(e.getKey()));
- }
- for (Map.Entry<String, Double> e :
- hist.responsesByVersion.entrySet()) {
- if (!totalResponsesByVersion.containsKey(e.getKey())) {
- totalResponsesByVersion.put(e.getKey(), 0.0);
- }
- totalResponsesByVersion.put(e.getKey(), e.getValue()
- + totalResponsesByVersion.get(e.getKey()));
- }
- millis += (hist.endMillis - hist.startMillis);
- }
- dataPoints.add(millis * 2L < dataPointInterval
- ? -1.0 : responses * ((double) ONE_DAY_MILLIS)
- / (((double) millis) * 10.0));
- double maxValue = 0.0;
- int firstNonNullIndex = -1, lastNonNullIndex = -1;
- for (int dataPointIndex = 0; dataPointIndex < dataPoints.size();
- dataPointIndex++) {
- double dataPoint = dataPoints.get(dataPointIndex);
- if (dataPoint >= 0.0) {
- if (firstNonNullIndex < 0) {
- firstNonNullIndex = dataPointIndex;
- }
- lastNonNullIndex = dataPointIndex;
- if (dataPoint > maxValue) {
- maxValue = dataPoint;
- }
- }
- }
- if (firstNonNullIndex < 0) {
- return null;
- }
- long firstDataPointMillis = (((this.now - graphInterval)
- / dataPointInterval) + firstNonNullIndex) * dataPointInterval
- + dataPointInterval / 2L;
- if (graphIntervalIndex > 0 && firstDataPointMillis >=
- this.now - graphIntervals[graphIntervalIndex - 1]) {
- /* Skip clients history object, because it doesn't contain
- * anything new that wasn't already contained in the last
- * clients history object(s). */
- return null;
- }
- long lastDataPointMillis = firstDataPointMillis
- + (lastNonNullIndex - firstNonNullIndex) * dataPointInterval;
- double factor = ((double) maxValue) / 999.0;
- int count = lastNonNullIndex - firstNonNullIndex + 1;
- StringBuilder sb = new StringBuilder();
- SimpleDateFormat dateTimeFormat = new SimpleDateFormat(
- "yyyy-MM-dd HH:mm:ss");
- dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- sb.append("\"" + graphName + "\":{"
- + "\"first\":\"" + dateTimeFormat.format(firstDataPointMillis)
- + "\",\"last\":\"" + dateTimeFormat.format(lastDataPointMillis)
- + "\",\"interval\":" + String.valueOf(dataPointInterval / 1000L)
- + ",\"factor\":" + String.format(Locale.US, "%.9f", factor)
- + ",\"count\":" + String.valueOf(count) + ",\"values\":[");
- int dataPointsWritten = 0, previousNonNullIndex = -2;
- boolean foundTwoAdjacentDataPoints = false;
- for (int dataPointIndex = firstNonNullIndex; dataPointIndex <=
- lastNonNullIndex; dataPointIndex++) {
- double dataPoint = dataPoints.get(dataPointIndex);
- if (dataPoint >= 0.0) {
- if (dataPointIndex - previousNonNullIndex == 1) {
- foundTwoAdjacentDataPoints = true;
- }
- previousNonNullIndex = dataPointIndex;
- }
- sb.append((dataPointsWritten++ > 0 ? "," : "")
- + (dataPoint < 0.0 ? "null" :
- String.valueOf((long) ((dataPoint * 999.0) / maxValue))));
- }
- sb.append("]");
- if (!totalResponsesByCountry.isEmpty()) {
- sb.append(",\"countries\":{");
- int written = 0;
- for (Map.Entry<String, Double> e :
- totalResponsesByCountry.entrySet()) {
- if (e.getValue() > totalResponses / 100.0) {
- sb.append((written++ > 0 ? "," : "") + "\"" + e.getKey()
- + "\":" + String.format(Locale.US, "%.4f",
- e.getValue() / totalResponses));
- }
- }
- sb.append("}");
- }
- if (!totalResponsesByTransport.isEmpty()) {
- sb.append(",\"transports\":{");
- int written = 0;
- for (Map.Entry<String, Double> e :
- totalResponsesByTransport.entrySet()) {
- if (e.getValue() > totalResponses / 100.0) {
- sb.append((written++ > 0 ? "," : "") + "\"" + e.getKey()
- + "\":" + String.format(Locale.US, "%.4f",
- e.getValue() / totalResponses));
- }
- }
- sb.append("}");
- }
- if (!totalResponsesByVersion.isEmpty()) {
- sb.append(",\"versions\":{");
- int written = 0;
- for (Map.Entry<String, Double> e :
- totalResponsesByVersion.entrySet()) {
- if (e.getValue() > totalResponses / 100.0) {
- sb.append((written++ > 0 ? "," : "") + "\"" + e.getKey()
- + "\":" + String.format(Locale.US, "%.4f",
- e.getValue() / totalResponses));
- }
- }
- sb.append("}");
- }
- sb.append("}");
- if (foundTwoAdjacentDataPoints) {
- return sb.toString();
- } else {
- return null;
- }
- }
-
- public String getStatsString() {
- int newIntervals = 0;
- for (SortedSet<ClientsHistory> hist : this.newResponses.values()) {
- newIntervals += hist.size();
- }
- StringBuilder sb = new StringBuilder();
- sb.append(" "
- + Logger.formatDecimalNumber(newIntervals / 2)
- + " client statistics processed from extra-info descriptors\n");
- sb.append(" "
- + Logger.formatDecimalNumber(this.newResponses.size())
- + " client status files updated\n");
- sb.append(" "
- + Logger.formatDecimalNumber(this.updateDocuments.size())
- + " client document files updated\n");
- return sb.toString();
- }
-}
-
diff --git a/src/org/torproject/onionoo/ClientsDocumentWriter.java b/src/org/torproject/onionoo/ClientsDocumentWriter.java
new file mode 100644
index 0000000..9d3b8dc
--- /dev/null
+++ b/src/org/torproject/onionoo/ClientsDocumentWriter.java
@@ -0,0 +1,302 @@
+/* Copyright 2014 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.onionoo;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/*
+ * Clients status file produced as intermediate output:
+ *
+ * 2014-02-15 16:42:11 2014-02-16 00:00:00
+ * 259.042 in=86.347,se=86.347 v4=259.042
+ * 2014-02-16 00:00:00 2014-02-16 16:42:11
+ * 592.958 in=197.653,se=197.653 v4=592.958
+ *
+ * Clients document file produced as output:
+ *
+ * "1_month":{
+ * "first":"2014-02-03 12:00:00",
+ * "last":"2014-02-28 12:00:00",
+ * "interval":86400,
+ * "factor":0.139049349,
+ * "count":26,
+ * "values":[371,354,349,374,432,null,485,458,493,536,null,null,524,576,
+ * 607,622,null,635,null,566,774,999,945,690,656,681],
+ * "countries":{"cn":0.0192,"in":0.1768,"ir":0.2487,"ru":0.0104,
+ * "se":0.1698,"sy":0.0325,"us":0.0406},
+ * "transports":{"obfs2":0.4581},
+ * "versions":{"v4":1.0000}}
+ */
+public class ClientsDocumentWriter implements FingerprintListener,
+ DocumentWriter {
+
+ private DescriptorSource descriptorSource;
+
+ private DocumentStore documentStore;
+
+ private long now;
+
+ public ClientsDocumentWriter(DescriptorSource descriptorSource,
+ DocumentStore documentStore, Time time) {
+ this.descriptorSource = descriptorSource;
+ this.documentStore = documentStore;
+ this.now = time.currentTimeMillis();
+ this.registerFingerprintListeners();
+ }
+
+ private void registerFingerprintListeners() {
+ this.descriptorSource.registerFingerprintListener(this,
+ DescriptorType.BRIDGE_EXTRA_INFOS);
+ }
+
+ private SortedSet<String> updateDocuments = new TreeSet<String>();
+
+ public void processFingerprints(SortedSet<String> fingerprints,
+ boolean relay) {
+ if (!relay) {
+ this.updateDocuments.addAll(fingerprints);
+ }
+ }
+
+ private int writtenDocuments = 0;
+
+ public void writeDocuments() {
+ for (String hashedFingerprint : this.updateDocuments) {
+ ClientsStatus clientsStatus = this.documentStore.retrieve(
+ ClientsStatus.class, true, hashedFingerprint);
+ if (clientsStatus == null) {
+ continue;
+ }
+ SortedSet<ClientsHistory> history = clientsStatus.history;
+ ClientsDocument clientsDocument = new ClientsDocument();
+ clientsDocument.documentString = this.formatHistoryString(
+ hashedFingerprint, history);
+ this.documentStore.store(clientsDocument, hashedFingerprint);
+ this.writtenDocuments++;
+ }
+ Logger.printStatusTime("Wrote clients document files");
+ }
+
+ private String[] graphNames = new String[] {
+ "1_week",
+ "1_month",
+ "3_months",
+ "1_year",
+ "5_years" };
+
+ private long[] graphIntervals = new long[] {
+ 7L * 24L * 60L * 60L * 1000L,
+ 31L * 24L * 60L * 60L * 1000L,
+ 92L * 24L * 60L * 60L * 1000L,
+ 366L * 24L * 60L * 60L * 1000L,
+ 5L * 366L * 24L * 60L * 60L * 1000L };
+
+ private long[] dataPointIntervals = new long[] {
+ 24L * 60L * 60L * 1000L,
+ 24L * 60L * 60L * 1000L,
+ 24L * 60L * 60L * 1000L,
+ 2L * 24L * 60L * 60L * 1000L,
+ 10L * 24L * 60L * 60L * 1000L };
+
+ private String formatHistoryString(String hashedFingerprint,
+ SortedSet<ClientsHistory> history) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{\"fingerprint\":\"" + hashedFingerprint + "\"");
+ sb.append(",\n\"average_clients\":{");
+ int graphIntervalsWritten = 0;
+ for (int graphIntervalIndex = 0; graphIntervalIndex <
+ this.graphIntervals.length; graphIntervalIndex++) {
+ String timeline = this.formatTimeline(graphIntervalIndex, history);
+ if (timeline != null) {
+ sb.append((graphIntervalsWritten++ > 0 ? "," : "") + "\n"
+ + timeline);
+ }
+ }
+ sb.append("}");
+ sb.append("\n}\n");
+ return sb.toString();
+ }
+
+ private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L,
+ ONE_DAY_MILLIS = 24L * ONE_HOUR_MILLIS;
+
+ private String formatTimeline(int graphIntervalIndex,
+ SortedSet<ClientsHistory> history) {
+ String graphName = this.graphNames[graphIntervalIndex];
+ long graphInterval = this.graphIntervals[graphIntervalIndex];
+ long dataPointInterval =
+ this.dataPointIntervals[graphIntervalIndex];
+ List<Double> dataPoints = new ArrayList<Double>();
+ long intervalStartMillis = ((this.now - graphInterval)
+ / dataPointInterval) * dataPointInterval;
+ long millis = 0L;
+ double responses = 0.0, totalResponses = 0.0;
+ SortedMap<String, Double>
+ totalResponsesByCountry = new TreeMap<String, Double>(),
+ totalResponsesByTransport = new TreeMap<String, Double>(),
+ totalResponsesByVersion = new TreeMap<String, Double>();
+ for (ClientsHistory hist : history) {
+ if (hist.endMillis < intervalStartMillis) {
+ continue;
+ }
+ while ((intervalStartMillis / dataPointInterval) !=
+ (hist.endMillis / dataPointInterval)) {
+ dataPoints.add(millis * 2L < dataPointInterval
+ ? -1.0 : responses * ((double) ONE_DAY_MILLIS)
+ / (((double) millis) * 10.0));
+ responses = 0.0;
+ millis = 0L;
+ intervalStartMillis += dataPointInterval;
+ }
+ responses += hist.totalResponses;
+ totalResponses += hist.totalResponses;
+ for (Map.Entry<String, Double> e :
+ hist.responsesByCountry.entrySet()) {
+ if (!totalResponsesByCountry.containsKey(e.getKey())) {
+ totalResponsesByCountry.put(e.getKey(), 0.0);
+ }
+ totalResponsesByCountry.put(e.getKey(), e.getValue()
+ + totalResponsesByCountry.get(e.getKey()));
+ }
+ for (Map.Entry<String, Double> e :
+ hist.responsesByTransport.entrySet()) {
+ if (!totalResponsesByTransport.containsKey(e.getKey())) {
+ totalResponsesByTransport.put(e.getKey(), 0.0);
+ }
+ totalResponsesByTransport.put(e.getKey(), e.getValue()
+ + totalResponsesByTransport.get(e.getKey()));
+ }
+ for (Map.Entry<String, Double> e :
+ hist.responsesByVersion.entrySet()) {
+ if (!totalResponsesByVersion.containsKey(e.getKey())) {
+ totalResponsesByVersion.put(e.getKey(), 0.0);
+ }
+ totalResponsesByVersion.put(e.getKey(), e.getValue()
+ + totalResponsesByVersion.get(e.getKey()));
+ }
+ millis += (hist.endMillis - hist.startMillis);
+ }
+ dataPoints.add(millis * 2L < dataPointInterval
+ ? -1.0 : responses * ((double) ONE_DAY_MILLIS)
+ / (((double) millis) * 10.0));
+ double maxValue = 0.0;
+ int firstNonNullIndex = -1, lastNonNullIndex = -1;
+ for (int dataPointIndex = 0; dataPointIndex < dataPoints.size();
+ dataPointIndex++) {
+ double dataPoint = dataPoints.get(dataPointIndex);
+ if (dataPoint >= 0.0) {
+ if (firstNonNullIndex < 0) {
+ firstNonNullIndex = dataPointIndex;
+ }
+ lastNonNullIndex = dataPointIndex;
+ if (dataPoint > maxValue) {
+ maxValue = dataPoint;
+ }
+ }
+ }
+ if (firstNonNullIndex < 0) {
+ return null;
+ }
+ long firstDataPointMillis = (((this.now - graphInterval)
+ / dataPointInterval) + firstNonNullIndex) * dataPointInterval
+ + dataPointInterval / 2L;
+ if (graphIntervalIndex > 0 && firstDataPointMillis >=
+ this.now - graphIntervals[graphIntervalIndex - 1]) {
+ /* Skip clients history object, because it doesn't contain
+ * anything new that wasn't already contained in the last
+ * clients history object(s). */
+ return null;
+ }
+ long lastDataPointMillis = firstDataPointMillis
+ + (lastNonNullIndex - firstNonNullIndex) * dataPointInterval;
+ double factor = ((double) maxValue) / 999.0;
+ int count = lastNonNullIndex - firstNonNullIndex + 1;
+ StringBuilder sb = new StringBuilder();
+ SimpleDateFormat dateTimeFormat = new SimpleDateFormat(
+ "yyyy-MM-dd HH:mm:ss");
+ dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ sb.append("\"" + graphName + "\":{"
+ + "\"first\":\"" + dateTimeFormat.format(firstDataPointMillis)
+ + "\",\"last\":\"" + dateTimeFormat.format(lastDataPointMillis)
+ + "\",\"interval\":" + String.valueOf(dataPointInterval / 1000L)
+ + ",\"factor\":" + String.format(Locale.US, "%.9f", factor)
+ + ",\"count\":" + String.valueOf(count) + ",\"values\":[");
+ int dataPointsWritten = 0, previousNonNullIndex = -2;
+ boolean foundTwoAdjacentDataPoints = false;
+ for (int dataPointIndex = firstNonNullIndex; dataPointIndex <=
+ lastNonNullIndex; dataPointIndex++) {
+ double dataPoint = dataPoints.get(dataPointIndex);
+ if (dataPoint >= 0.0) {
+ if (dataPointIndex - previousNonNullIndex == 1) {
+ foundTwoAdjacentDataPoints = true;
+ }
+ previousNonNullIndex = dataPointIndex;
+ }
+ sb.append((dataPointsWritten++ > 0 ? "," : "")
+ + (dataPoint < 0.0 ? "null" :
+ String.valueOf((long) ((dataPoint * 999.0) / maxValue))));
+ }
+ sb.append("]");
+ if (!totalResponsesByCountry.isEmpty()) {
+ sb.append(",\"countries\":{");
+ int written = 0;
+ for (Map.Entry<String, Double> e :
+ totalResponsesByCountry.entrySet()) {
+ if (e.getValue() > totalResponses / 100.0) {
+ sb.append((written++ > 0 ? "," : "") + "\"" + e.getKey()
+ + "\":" + String.format(Locale.US, "%.4f",
+ e.getValue() / totalResponses));
+ }
+ }
+ sb.append("}");
+ }
+ if (!totalResponsesByTransport.isEmpty()) {
+ sb.append(",\"transports\":{");
+ int written = 0;
+ for (Map.Entry<String, Double> e :
+ totalResponsesByTransport.entrySet()) {
+ if (e.getValue() > totalResponses / 100.0) {
+ sb.append((written++ > 0 ? "," : "") + "\"" + e.getKey()
+ + "\":" + String.format(Locale.US, "%.4f",
+ e.getValue() / totalResponses));
+ }
+ }
+ sb.append("}");
+ }
+ if (!totalResponsesByVersion.isEmpty()) {
+ sb.append(",\"versions\":{");
+ int written = 0;
+ for (Map.Entry<String, Double> e :
+ totalResponsesByVersion.entrySet()) {
+ if (e.getValue() > totalResponses / 100.0) {
+ sb.append((written++ > 0 ? "," : "") + "\"" + e.getKey()
+ + "\":" + String.format(Locale.US, "%.4f",
+ e.getValue() / totalResponses));
+ }
+ }
+ sb.append("}");
+ }
+ sb.append("}");
+ if (foundTwoAdjacentDataPoints) {
+ return sb.toString();
+ } else {
+ return null;
+ }
+ }
+
+ public String getStatsString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(" " + Logger.formatDecimalNumber(this.writtenDocuments)
+ + " clients document files updated\n");
+ return sb.toString();
+ }
+}
diff --git a/src/org/torproject/onionoo/ClientsStatusUpdater.java b/src/org/torproject/onionoo/ClientsStatusUpdater.java
new file mode 100644
index 0000000..e15c11a
--- /dev/null
+++ b/src/org/torproject/onionoo/ClientsStatusUpdater.java
@@ -0,0 +1,230 @@
+/* Copyright 2014 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.onionoo;
+
+import java.text.SimpleDateFormat;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.ExtraInfoDescriptor;
+
+/*
+ * Example extra-info descriptor used as input:
+ *
+ * extra-info ndnop2 DE6397A047ABE5F78B4C87AF725047831B221AAB
+ * dirreq-stats-end 2014-02-16 16:42:11 (86400 s)
+ * dirreq-v3-resp ok=856,not-enough-sigs=0,unavailable=0,not-found=0,
+ * not-modified=40,busy=0
+ * bridge-stats-end 2014-02-16 16:42:17 (86400 s)
+ * bridge-ips ??=8,in=8,se=8
+ * bridge-ip-versions v4=8,v6=0
+ *
+ * Clients status file produced as intermediate output:
+ *
+ * 2014-02-15 16:42:11 2014-02-16 00:00:00
+ * 259.042 in=86.347,se=86.347 v4=259.042
+ * 2014-02-16 00:00:00 2014-02-16 16:42:11
+ * 592.958 in=197.653,se=197.653 v4=592.958
+ */
+public class ClientsStatusUpdater implements DescriptorListener,
+ StatusUpdater {
+
+ private DescriptorSource descriptorSource;
+
+ private DocumentStore documentStore;
+
+ private long now;
+
+ public ClientsStatusUpdater(DescriptorSource descriptorSource,
+ DocumentStore documentStore, Time time) {
+ this.descriptorSource = descriptorSource;
+ this.documentStore = documentStore;
+ this.now = time.currentTimeMillis();
+ this.registerDescriptorListeners();
+ }
+
+ private void registerDescriptorListeners() {
+ this.descriptorSource.registerDescriptorListener(this,
+ DescriptorType.BRIDGE_EXTRA_INFOS);
+ }
+
+ public void processDescriptor(Descriptor descriptor, boolean relay) {
+ if (descriptor instanceof ExtraInfoDescriptor && !relay) {
+ this.processBridgeExtraInfoDescriptor(
+ (ExtraInfoDescriptor) descriptor);
+ }
+ }
+
+ private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L,
+ ONE_DAY_MILLIS = 24L * ONE_HOUR_MILLIS;
+
+ private SortedMap<String, SortedSet<ClientsHistory>> newResponses =
+ new TreeMap<String, SortedSet<ClientsHistory>>();
+
+ private void processBridgeExtraInfoDescriptor(
+ ExtraInfoDescriptor descriptor) {
+ long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
+ long dirreqStatsIntervalLengthMillis =
+ descriptor.getDirreqStatsIntervalLength() * 1000L;
+ SortedMap<String, Integer> responses = descriptor.getDirreqV3Resp();
+ if (dirreqStatsEndMillis < 0L ||
+ dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS ||
+ responses == null || !responses.containsKey("ok")) {
+ return;
+ }
+ double okResponses = (double) (responses.get("ok") - 4);
+ if (okResponses < 0.0) {
+ return;
+ }
+ String hashedFingerprint = descriptor.getFingerprint().toUpperCase();
+ long dirreqStatsStartMillis = dirreqStatsEndMillis
+ - dirreqStatsIntervalLengthMillis;
+ long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS)
+ * ONE_DAY_MILLIS;
+ for (int i = 0; i < 2; i++) {
+ long startMillis = i == 0 ? dirreqStatsStartMillis : utcBreakMillis;
+ long endMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis;
+ if (startMillis >= endMillis) {
+ continue;
+ }
+ double totalResponses = okResponses
+ * ((double) (endMillis - startMillis))
+ / ((double) ONE_DAY_MILLIS);
+ SortedMap<String, Double> responsesByCountry =
+ this.weightResponsesWithUniqueIps(totalResponses,
+ descriptor.getBridgeIps(), "??");
+ SortedMap<String, Double> responsesByTransport =
+ this.weightResponsesWithUniqueIps(totalResponses,
+ descriptor.getBridgeIpTransports(), "<??>");
+ SortedMap<String, Double> responsesByVersion =
+ this.weightResponsesWithUniqueIps(totalResponses,
+ descriptor.getBridgeIpVersions(), "");
+ ClientsHistory newResponseHistory = new ClientsHistory(
+ startMillis, endMillis, totalResponses, responsesByCountry,
+ responsesByTransport, responsesByVersion);
+ if (!this.newResponses.containsKey(hashedFingerprint)) {
+ this.newResponses.put(hashedFingerprint,
+ new TreeSet<ClientsHistory>());
+ }
+ this.newResponses.get(hashedFingerprint).add(
+ newResponseHistory);
+ }
+ }
+
+ private SortedMap<String, Double> weightResponsesWithUniqueIps(
+ double totalResponses, SortedMap<String, Integer> uniqueIps,
+ String omitString) {
+ SortedMap<String, Double> weightedResponses =
+ new TreeMap<String, Double>();
+ int totalUniqueIps = 0;
+ if (uniqueIps != null) {
+ for (Map.Entry<String, Integer> e : uniqueIps.entrySet()) {
+ if (e.getValue() > 4) {
+ totalUniqueIps += e.getValue() - 4;
+ }
+ }
+ }
+ if (totalUniqueIps > 0) {
+ for (Map.Entry<String, Integer> e : uniqueIps.entrySet()) {
+ if (!e.getKey().equals(omitString) && e.getValue() > 4) {
+ weightedResponses.put(e.getKey(),
+ (((double) (e.getValue() - 4)) * totalResponses)
+ / ((double) totalUniqueIps));
+ }
+ }
+ }
+ return weightedResponses;
+ }
+
+ public void updateStatuses() {
+ for (Map.Entry<String, SortedSet<ClientsHistory>> e :
+ this.newResponses.entrySet()) {
+ String hashedFingerprint = e.getKey();
+ ClientsStatus clientsStatus = this.documentStore.retrieve(
+ ClientsStatus.class, true, hashedFingerprint);
+ if (clientsStatus == null) {
+ clientsStatus = new ClientsStatus();
+ }
+ this.addToHistory(clientsStatus, e.getValue());
+ this.compressHistory(clientsStatus);
+ this.documentStore.store(clientsStatus, hashedFingerprint);
+ }
+ Logger.printStatusTime("Updated clients status files");
+ }
+
+ private void addToHistory(ClientsStatus clientsStatus,
+ SortedSet<ClientsHistory> newIntervals) {
+ SortedSet<ClientsHistory> history = clientsStatus.history;
+ for (ClientsHistory interval : newIntervals) {
+ if ((history.headSet(interval).isEmpty() ||
+ history.headSet(interval).last().endMillis <=
+ interval.startMillis) &&
+ (history.tailSet(interval).isEmpty() ||
+ history.tailSet(interval).first().startMillis >=
+ interval.endMillis)) {
+ history.add(interval);
+ }
+ }
+ }
+
+ private void compressHistory(ClientsStatus clientsStatus) {
+ SortedSet<ClientsHistory> history = clientsStatus.history;
+ SortedSet<ClientsHistory> compressedHistory =
+ new TreeSet<ClientsHistory>();
+ ClientsHistory lastResponses = null;
+ SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM");
+ dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ String lastMonthString = "1970-01";
+ for (ClientsHistory responses : history) {
+ long intervalLengthMillis;
+ if (this.now - responses.endMillis <=
+ 92L * 24L * 60L * 60L * 1000L) {
+ intervalLengthMillis = 24L * 60L * 60L * 1000L;
+ } else if (this.now - responses.endMillis <=
+ 366L * 24L * 60L * 60L * 1000L) {
+ intervalLengthMillis = 2L * 24L * 60L * 60L * 1000L;
+ } else {
+ intervalLengthMillis = 10L * 24L * 60L * 60L * 1000L;
+ }
+ String monthString = dateTimeFormat.format(responses.startMillis);
+ if (lastResponses != null &&
+ lastResponses.endMillis == responses.startMillis &&
+ ((lastResponses.endMillis - 1L) / intervalLengthMillis) ==
+ ((responses.endMillis - 1L) / intervalLengthMillis) &&
+ lastMonthString.equals(monthString)) {
+ lastResponses.addResponses(responses);
+ } else {
+ if (lastResponses != null) {
+ compressedHistory.add(lastResponses);
+ }
+ lastResponses = responses;
+ }
+ lastMonthString = monthString;
+ }
+ if (lastResponses != null) {
+ compressedHistory.add(lastResponses);
+ }
+ clientsStatus.history = compressedHistory;
+ }
+
+ public String getStatsString() {
+ int newIntervals = 0;
+ for (SortedSet<ClientsHistory> hist : this.newResponses.values()) {
+ newIntervals += hist.size();
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append(" "
+ + Logger.formatDecimalNumber(newIntervals / 2)
+ + " client statistics processed from extra-info descriptors\n");
+ sb.append(" "
+ + Logger.formatDecimalNumber(this.newResponses.size())
+ + " client status files updated\n");
+ return sb.toString();
+ }
+}
+
diff --git a/src/org/torproject/onionoo/Main.java b/src/org/torproject/onionoo/Main.java
index 1a5d083..af33124 100644
--- a/src/org/torproject/onionoo/Main.java
+++ b/src/org/torproject/onionoo/Main.java
@@ -36,12 +36,14 @@ public class Main {
Logger.printStatusTime("Initialized bandwidth data writer");
WeightsDataWriter wdw = new WeightsDataWriter(dso, ds, t);
Logger.printStatusTime("Initialized weights data writer");
- ClientsDataWriter cdw = new ClientsDataWriter(dso, ds, t);
- Logger.printStatusTime("Initialized clients data writer");
+ ClientsStatusUpdater csu = new ClientsStatusUpdater(dso, ds, t);
+ Logger.printStatusTime("Initialized clients status updater");
UptimeStatusUpdater usu = new UptimeStatusUpdater(dso, ds);
Logger.printStatusTime("Initialized uptime status updater");
- StatusUpdater[] sus = new StatusUpdater[] { ndw, bdw, wdw, cdw, usu };
+ StatusUpdater[] sus = new StatusUpdater[] { ndw, bdw, wdw, csu, usu };
+ ClientsDocumentWriter cdw = new ClientsDocumentWriter(dso, ds, t);
+ Logger.printStatusTime("Initialized clients document writer");
UptimeDocumentWriter udw = new UptimeDocumentWriter(dso, ds, t);
Logger.printStatusTime("Initialized uptime document writer");
DocumentWriter[] dws = new DocumentWriter[] { ndw, bdw, wdw, cdw,
@@ -91,7 +93,7 @@ public class Main {
}
/* TODO Print status updater statistics for *all* status updaters once
* all data writers have been separated. */
- for (DocumentWriter dw : new DocumentWriter[] { udw }) {
+ for (DocumentWriter dw : new DocumentWriter[] { cdw, udw }) {
String statsString = dw.getStatsString();
if (statsString != null) {
Logger.printStatistics(dw.getClass().getSimpleName(),
More information about the tor-commits
mailing list