[or-cvs] [metrics-db/master] Add bandwidth histories to database.
karsten at torproject.org
karsten at torproject.org
Thu Sep 23 14:57:43 UTC 2010
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date: Thu, 23 Sep 2010 12:16:01 +0200
Subject: Add bandwidth histories to database.
Commit: 2b5617e6f5ed80b3624740ef2a60feedaf1652e1
---
db/refresh.sql | 1 +
db/tordir.sql | 99 +++++++++++++++
.../ernie/db/RelayDescriptorDatabaseImporter.java | 128 ++++++++++++++++----
.../torproject/ernie/db/RelayDescriptorParser.java | 37 +++++-
4 files changed, 235 insertions(+), 30 deletions(-)
diff --git a/db/refresh.sql b/db/refresh.sql
index 9e55016..9b98b1a 100644
--- a/db/refresh.sql
+++ b/db/refresh.sql
@@ -13,6 +13,7 @@ SELECT * FROM refresh_network_size();
SELECT * FROM refresh_relay_platforms();
SELECT * FROM refresh_relay_versions();
SELECT * FROM refresh_total_bandwidth();
+SELECT * FROM refresh_total_bwhist();
-- Clear the updates table, since we have just updated everything.
DELETE FROM updates;
diff --git a/db/tordir.sql b/db/tordir.sql
index bb74a62..37654de 100644
--- a/db/tordir.sql
+++ b/db/tordir.sql
@@ -32,6 +32,25 @@ CREATE TABLE extrainfo (
CONSTRAINT extrainfo_pkey PRIMARY KEY (extrainfo)
);
+-- TABLE bandwidth
+-- Contains bandwidth histories contained in extra-info descriptors.
+-- Every row represents a 15-minute interval and can have read, written,
+-- dirread, and dirwritten set or not. We're making sure that there's only
+-- one interval for each extrainfo. However, it's possible that an
+-- interval is contained in another extra-info descriptor of the same
+-- relay. These duplicates need to be filtered when aggregating bandwidth
+-- histories.
+CREATE TABLE bwhist (
+ fingerprint CHARACTER(40) NOT NULL,
+ extrainfo CHARACTER(40) NOT NULL,
+ intervalend TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ read BIGINT,
+ written BIGINT,
+ dirread BIGINT,
+ dirwritten BIGINT,
+ CONSTRAINT bwhist_pkey PRIMARY KEY (extrainfo, intervalend)
+);
+
-- TABLE statusentry
-- Contains all of the consensus entries published by the directories.
-- Each statusentry references a valid descriptor.
@@ -134,6 +153,19 @@ CREATE TABLE total_bandwidth (
CONSTRAINT total_bandwidth_pkey PRIMARY KEY(date)
);
+-- TABLE total_bwhist
+-- Contains the total number of read/written and the number of dir bytes
+-- read/written by all relays in the network on a given day. The dir bytes
+-- are an estimate based on the subset of relays that count dir bytes.
+CREATE TABLE total_bwhist (
+ date DATE NOT NULL,
+ read BIGINT,
+ written BIGINT,
+ dirread BIGINT,
+ dirwritten BIGINT,
+ CONSTRAINT total_bwhist_pkey PRIMARY KEY(date)
+);
+
-- TABLE relay_statuses_per_day
-- A helper table which is commonly used to update the tables above in the
-- refresh_* functions.
@@ -227,6 +259,36 @@ AFTER INSERT OR UPDATE OR DELETE
ON descriptor
FOR EACH ROW EXECUTE PROCEDURE update_desc();
+-- FUNCTION update_bwhist
+-- This keeps the updates table up to date for the time graphs.
+CREATE OR REPLACE FUNCTION update_bwhist() RETURNS TRIGGER AS $$
+ BEGIN
+ IF (TG_OP='INSERT' OR TG_OP='UPDATE') THEN
+ IF (SELECT COUNT(*) FROM updates
+ WHERE DATE = DATE(NEW.intervalend)) = 0 THEN
+ INSERT INTO updates
+ VALUES (DATE(NEW.intervalend));
+ END IF;
+ END IF;
+ IF (TG_OP='DELETE' OR TG_OP='UPDATE') THEN
+ IF (SELECT COUNT(*) FROM updates
+ WHERE DATE = DATE(OLD.intervalend)) = 0 THEN
+ INSERT INTO updates
+ VALUES (DATE(OLD.intervalend));
+ END IF;
+ END IF;
+ RETURN NULL; -- result is ignored since this is an AFTER trigger
+END;
+$$ LANGUAGE plpgsql;
+
+-- TRIGGER update_desc
+-- This calls the function update_desc() each time a row is inserted,
+-- updated, or deleted from the descriptors table.
+CREATE TRIGGER update_bwhist
+AFTER INSERT OR UPDATE OR DELETE
+ON bwhist
+ FOR EACH ROW EXECUTE PROCEDURE update_bwhist();
+
-- FUNCTION refresh_relay_statuses_per_day()
-- Updates helper table which is used to refresh the aggregate tables.
CREATE OR REPLACE FUNCTION refresh_relay_statuses_per_day()
@@ -401,3 +463,40 @@ CREATE OR REPLACE FUNCTION refresh_total_bandwidth() RETURNS INTEGER AS $$
END;
$$ LANGUAGE plpgsql;
+-- FUNCTION refresh_network_size()
+CREATE OR REPLACE FUNCTION refresh_total_bwhist() RETURNS INTEGER AS $$
+ BEGIN
+ DELETE FROM total_bwhist WHERE date IN (SELECT * FROM updates);
+ INSERT INTO total_bwhist (date, read, written, dirread, dirwritten)
+ SELECT date,
+ SUM(read) AS read,
+ SUM(written) AS written,
+ SUM(CASE WHEN dirwritten IS NULL THEN NULL ELSE written END)
+ AS dirwritten,
+ SUM(CASE WHEN dirread IS NULL THEN NULL ELSE read END) AS dirread
+ FROM (
+ SELECT fingerprint,
+ DATE(intervalend) as date,
+ SUM(read) AS read,
+ SUM(written) AS written,
+ SUM(dirread) AS dirread,
+ SUM(dirwritten) AS dirwritten
+ FROM (
+ SELECT DISTINCT fingerprint,
+ intervalend,
+ read,
+ written,
+ dirread,
+ dirwritten
+ FROM bwhist
+ WHERE DATE(intervalend) >= (SELECT MIN(date) FROM updates)
+ AND DATE(intervalend) <= (SELECT MAX(date) FROM updates)
+ AND DATE(intervalend) IN (SELECT date FROM updates)
+ ) byinterval
+ GROUP BY fingerprint, DATE(intervalend)
+ ) byrelay
+ GROUP BY date;
+ RETURN 1;
+ END;
+$$ LANGUAGE plpgsql;
+
diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
index 588fe27..7eeb9d7 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
@@ -25,6 +25,7 @@ public final class RelayDescriptorDatabaseImporter {
*/
private int rdsCount = 0;
private int resCount = 0;
+ private int rhsCount = 0;
private int rrsCount = 0;
private int rcsCount = 0;
private int rvsCount = 0;
@@ -47,6 +48,12 @@ public final class RelayDescriptorDatabaseImporter {
private PreparedStatement psEs;
/**
+ * Prepared statement to check whether the bandwidth history of an
+ * extra-info descriptor has been imported into the database before.
+ */
+ private PreparedStatement psHs;
+
+ /**
* Prepared statement to check whether a given server descriptor has
* been imported into the database before.
*/
@@ -82,6 +89,12 @@ public final class RelayDescriptorDatabaseImporter {
private PreparedStatement psE;
/**
+ * Prepared statement to insert the bandwidth history of an extra-info
+ * descriptor into the database.
+ */
+ private PreparedStatement psH;
+
+ /**
* Prepared statement to insert a network status consensus into the
* database.
*/
@@ -104,6 +117,8 @@ public final class RelayDescriptorDatabaseImporter {
private BufferedWriter consensusOut;
private BufferedWriter voteOut;
+ private SimpleDateFormat dateTimeFormat;
+
/**
* Initialize database importer by connecting to the database and
* preparing statements.
@@ -130,6 +145,8 @@ public final class RelayDescriptorDatabaseImporter {
+ "FROM descriptor WHERE descriptor = ?");
this.psEs = conn.prepareStatement("SELECT COUNT(*) "
+ "FROM extrainfo WHERE extrainfo = ?");
+ this.psHs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM bwhist WHERE extrainfo = ?");
this.psCs = conn.prepareStatement("SELECT COUNT(*) "
+ "FROM consensus WHERE validafter = ?");
this.psVs = conn.prepareStatement("SELECT COUNT(*) "
@@ -151,6 +168,9 @@ public final class RelayDescriptorDatabaseImporter {
this.psE = conn.prepareStatement("INSERT INTO extrainfo "
+ "(extrainfo, nickname, fingerprint, published, rawdesc) "
+ "VALUES (?, ?, ?, ?, ?)");
+ this.psH = conn.prepareStatement("INSERT INTO bwhist "
+ + "(fingerprint, extrainfo, intervalend, read, written, "
+ + "dirread, dirwritten) VALUES (?, ?, ?, ?, ?, ?, ?)");
this.psC = conn.prepareStatement("INSERT INTO consensus "
+ "(validafter, rawdesc) VALUES (?, ?)");
this.psV = conn.prepareStatement("INSERT INTO vote "
@@ -178,6 +198,9 @@ public final class RelayDescriptorDatabaseImporter {
this.logger.log(Level.WARNING, "Could not open raw database "
+ "import files.", e);
}
+
+ this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
}
}
@@ -234,15 +257,12 @@ public final class RelayDescriptorDatabaseImporter {
}
}
if (this.statusentryOut != null) {
- SimpleDateFormat dateTimeFormat =
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
this.statusentryOut.write(
- dateTimeFormat.format(validAfter) + "\t" + nickname
+ this.dateTimeFormat.format(validAfter) + "\t" + nickname
+ "\t" + fingerprint.toLowerCase() + "\t"
+ descriptor.toLowerCase() + "\t"
- + dateTimeFormat.format(published) + "\t" + address + "\t"
- + orPort + "\t" + dirPort + "\t"
+ + this.dateTimeFormat.format(published) + "\t" + address
+ + "\t" + orPort + "\t" + dirPort + "\t"
+ (flags.contains("Authority") ? "t" : "f") + "\t"
+ (flags.contains("BadExit") ? "t" : "f") + "\t"
+ (flags.contains("BadDirectory") ? "t" : "f") + "\t"
@@ -312,16 +332,13 @@ public final class RelayDescriptorDatabaseImporter {
}
}
if (this.descriptorOut != null) {
- SimpleDateFormat dateTimeFormat =
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
this.descriptorOut.write(descriptor.toLowerCase() + "\t"
+ nickname + "\t" + address + "\t" + orPort + "\t" + dirPort
+ "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t"
+ bandwidthBurst + "\t" + bandwidthObserved + "\t"
+ (platform != null && platform.length() > 0
? new String(platform.getBytes(), "US-ASCII") : "\\N") + "\t"
- + dateTimeFormat.format(published) + "\t"
+ + this.dateTimeFormat.format(published) + "\t"
+ (uptime >= 0 ? uptime : "\\N") + "\t"
+ (extraInfoDigest != null ? extraInfoDigest : "\\N") + "\t");
this.descriptorOut.write(PGbytea.toPGString(rawDescriptor).
@@ -344,10 +361,10 @@ public final class RelayDescriptorDatabaseImporter {
*/
public void addExtraInfoDescriptor(String extraInfoDigest,
String nickname, String fingerprint, long published,
- byte[] rawDescriptor) {
+ byte[] rawDescriptor, SortedMap<String, String> bandwidthHistory) {
try {
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
if (this.psEs != null && this.psE != null) {
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
this.psEs.setString(1, extraInfoDigest);
ResultSet rs = psEs.executeQuery();
rs.next();
@@ -366,13 +383,81 @@ public final class RelayDescriptorDatabaseImporter {
}
}
}
+ if (this.psHs != null && this.psH != null) {
+ this.psHs.setString(1, extraInfoDigest);
+ ResultSet rs = this.psHs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) == 0) {
+ this.psH.clearParameters();
+ String lastIntervalEnd = null;
+ List<String> bandwidthHistoryValues = new ArrayList<String>();
+ bandwidthHistoryValues.addAll(bandwidthHistory.values());
+ bandwidthHistoryValues.add("EOL");
+ String readBytes = null, writtenBytes = null,
+ dirReadBytes = null, dirWrittenBytes = null;
+ for (String bandwidthHistoryValue : bandwidthHistoryValues) {
+ String[] entryParts = bandwidthHistoryValue.split(",");
+ String intervalEnd = entryParts[0];
+ if ((intervalEnd.equals("EOL") ||
+ !intervalEnd.equals(lastIntervalEnd)) &&
+ lastIntervalEnd != null) {
+ this.psH.setString(1, fingerprint);
+ this.psH.setString(2, extraInfoDigest);
+ try {
+ this.psH.setTimestamp(3, new Timestamp(Long.parseLong(
+ lastIntervalEnd)), cal);
+ if (readBytes != null) {
+ this.psH.setLong(4, Long.parseLong(readBytes));
+ } else {
+ this.psH.setNull(4, Types.BIGINT);
+ }
+ if (writtenBytes != null) {
+ this.psH.setLong(5, Long.parseLong(writtenBytes));
+ } else {
+ this.psH.setNull(5, Types.BIGINT);
+ }
+ if (dirReadBytes != null) {
+ this.psH.setLong(6, Long.parseLong(dirReadBytes));
+ } else {
+ this.psH.setNull(6, Types.BIGINT);
+ }
+ if (dirWrittenBytes != null) {
+ this.psH.setLong(7, Long.parseLong(dirWrittenBytes));
+ } else {
+ this.psH.setNull(7, Types.BIGINT);
+ }
+ } catch (NumberFormatException e) {
+ break;
+ }
+ this.psH.executeUpdate();
+ }
+ if (intervalEnd.equals("EOL")) {
+ break;
+ }
+ lastIntervalEnd = intervalEnd;
+ String type = entryParts[1];
+ String bytes = entryParts[2];
+ if (type.equals("read-history")) {
+ readBytes = bytes;
+ } else if (type.equals("write-history")) {
+ writtenBytes = bytes;
+ } else if (type.equals("dirreq-read-history")) {
+ dirReadBytes = bytes;
+ } else if (type.equals("dirreq-write-history")) {
+ dirWrittenBytes = bytes;
+ }
+ }
+ rhsCount++;
+ if (rhsCount % autoCommitCount == 0) {
+ this.conn.commit();
+ rhsCount = 0;
+ }
+ }
+ }
if (this.extrainfoOut != null) {
- SimpleDateFormat dateTimeFormat =
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
this.extrainfoOut.write(extraInfoDigest.toLowerCase() + "\t"
+ nickname + "\t" + fingerprint.toLowerCase() + "\t"
- + dateTimeFormat.format(published) + "\t");
+ + this.dateTimeFormat.format(published) + "\t");
this.extrainfoOut.write(PGbytea.toPGString(rawDescriptor).
replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
}
@@ -409,10 +494,8 @@ public final class RelayDescriptorDatabaseImporter {
}
}
if (this.consensusOut != null) {
- SimpleDateFormat dateTimeFormat =
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- this.consensusOut.write(dateTimeFormat.format(validAfter) + "\t");
+ this.consensusOut.write(this.dateTimeFormat.format(validAfter)
+ + "\t");
this.consensusOut.write(PGbytea.toPGString(rawDescriptor).
replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
}
@@ -452,10 +535,7 @@ public final class RelayDescriptorDatabaseImporter {
}
}
if (this.voteOut != null) {
- SimpleDateFormat dateTimeFormat =
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- this.voteOut.write(dateTimeFormat.format(validAfter) + "\t"
+ this.voteOut.write(this.dateTimeFormat.format(validAfter) + "\t"
+ dirSource + "\t");
this.voteOut.write(PGbytea.toPGString(rawDescriptor).
replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
diff --git a/src/org/torproject/ernie/db/RelayDescriptorParser.java b/src/org/torproject/ernie/db/RelayDescriptorParser.java
index 2e0e17f..d82b041 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorParser.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorParser.java
@@ -76,6 +76,8 @@ public class RelayDescriptorParser {
*/
private Logger logger;
+ private SimpleDateFormat dateTimeFormat;
+
/**
* Initializes this class.
*/
@@ -96,6 +98,9 @@ public class RelayDescriptorParser {
/* Initialize logger. */
this.logger = Logger.getLogger(RelayDescriptorParser.class.getName());
+
+ this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
}
public void setRelayDescriptorDownloader(
@@ -351,18 +356,38 @@ public class RelayDescriptorParser {
long published = -1L;
String dir = line.split(" ")[2];
String date = null, v3Reqs = null;
+ SortedMap<String, String> bandwidthHistory =
+ new TreeMap<String, String>();
boolean skip = false;
while ((line = br.readLine()) != null) {
if (line.startsWith("published ")) {
publishedTime = line.substring("published ".length());
published = parseFormat.parse(publishedTime).getTime();
+ } else if (line.startsWith("read-history ") ||
+ line.startsWith("write-history ") ||
+ line.startsWith("dirreq-read-history ") ||
+ line.startsWith("dirreq-write-history ")) {
+ String[] parts = line.split(" ");
+ if (parts.length == 6) {
+ String type = parts[0];
+ long intervalEnd = dateTimeFormat.parse(parts[1] + " "
+ + parts[2]).getTime();
+ try {
+ long intervalLength = Long.parseLong(parts[3].
+ substring(1));
+ String[] values = parts[5].split(",");
+ for (int i = values.length - 1; i >= 0; i--) {
+ Long.parseLong(values[i]);
+ bandwidthHistory.put(intervalEnd + "," + type,
+ intervalEnd + "," + type + "," + values[i]);
+ intervalEnd -= intervalLength * 1000L;
+ }
+ } catch (NumberFormatException e) {
+ break;
+ }
+ }
} else if (line.startsWith("dirreq-stats-end ")) {
date = line.split(" ")[1];
- // trusted had very strange dirreq-v3-shares here...
- // TODO don't check that here, but in DirreqStatsFileHandler
- skip = dir.equals("8522EB98C91496E80EC238E732594D1509158E77")
- && (date.equals("2009-09-10") ||
- date.equals("2009-09-11"));
} else if (line.startsWith("dirreq-v3-reqs ")
&& line.length() > "dirreq-v3-reqs ".length()) {
v3Reqs = line.split(" ")[1];
@@ -410,7 +435,7 @@ public class RelayDescriptorParser {
}
if (this.rddi != null && digest != null) {
this.rddi.addExtraInfoDescriptor(digest, nickname,
- dir.toLowerCase(), published, data);
+ dir.toLowerCase(), published, data, bandwidthHistory);
}
}
} catch (IOException e) {
--
1.7.1
More information about the tor-commits
mailing list