[tor-commits] [metrics-db/master] Remove database import from metrics-db.
karsten at torproject.org
karsten at torproject.org
Wed Mar 2 15:45:00 UTC 2011
commit 0cb4eefde5bc6e8ae98cb823bda25229ff755e47
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date: Tue Mar 1 16:41:26 2011 +0100
Remove database import from metrics-db.
---
config.template | 11 -
src/org/torproject/ernie/db/Configuration.java | 28 +-
src/org/torproject/ernie/db/Main.java | 29 +-
.../ernie/db/RelayDescriptorDatabaseImporter.java | 1213 --------------------
.../torproject/ernie/db/RelayDescriptorParser.java | 184 +---
5 files changed, 17 insertions(+), 1448 deletions(-)
diff --git a/config.template b/config.template
index 1ef915c..7385f7b 100644
--- a/config.template
+++ b/config.template
@@ -67,9 +67,6 @@
## Relative path to directory to write directory archives to
#DirectoryArchivesOutputDirectory directory-archive/
#
-## Write relay descriptors to a database for later evaluation
-#WriteRelayDescriptorDatabase 0
-#
## Write aggregate statistics (bridges and bridge users per day, directory
## clients per day, torperf results, packages requested from GetTor, etc.)
## to database for later evaluation
@@ -78,14 +75,6 @@
## JDBC string for relay descriptor database
#RelayDescriptorDatabaseJDBC jdbc:postgresql://localhost/tordir?user=ernie&password=password
#
-## Write relay descriptors to raw text files for importing them into a
-## database using PostgreSQL's \copy command
-#WriteRelayDescriptorsRawFiles 0
-#
-## Relative path to directory to write raw text files; note that existing
-## files will be overwritten!
-#RelayDescriptorRawFilesDirectory pg-import/
-#
## Write statistics about the current consensus and votes to the
## website
#WriteConsensusHealth 0
diff --git a/src/org/torproject/ernie/db/Configuration.java b/src/org/torproject/ernie/db/Configuration.java
index 262ec78..b8f8f39 100644
--- a/src/org/torproject/ernie/db/Configuration.java
+++ b/src/org/torproject/ernie/db/Configuration.java
@@ -23,12 +23,9 @@ public class Configuration {
private boolean importDirectoryArchives = false;
private String directoryArchivesDirectory = "archives/";
private boolean keepDirectoryArchiveImportHistory = false;
- private boolean writeRelayDescriptorDatabase = false;
private boolean writeAggregateStatsDatabase = false;
private String relayDescriptorDatabaseJdbc =
"jdbc:postgresql://localhost/tordir?user=ernie&password=password";
- private boolean writeRelayDescriptorsRawFiles = false;
- private String relayDescriptorRawFilesDirectory = "pg-import/";
private boolean writeSanitizedBridges = false;
private boolean replaceIPAddressesWithHashes = false;
private long limitBridgeDescriptorMappings = -1L;
@@ -98,19 +95,11 @@ public class Configuration {
} else if (line.startsWith("KeepDirectoryArchiveImportHistory")) {
this.keepDirectoryArchiveImportHistory = Integer.parseInt(
line.split(" ")[1]) != 0;
- } else if (line.startsWith("WriteRelayDescriptorDatabase")) {
- this.writeRelayDescriptorDatabase = Integer.parseInt(
- line.split(" ")[1]) != 0;
} else if (line.startsWith("WriteAggregateStatsDatabase")) {
this.writeAggregateStatsDatabase = Integer.parseInt(
line.split(" ")[1]) != 0;
} else if (line.startsWith("RelayDescriptorDatabaseJDBC")) {
this.relayDescriptorDatabaseJdbc = line.split(" ")[1];
- } else if (line.startsWith("WriteRelayDescriptorsRawFiles")) {
- this.writeRelayDescriptorsRawFiles = Integer.parseInt(
- line.split(" ")[1]) != 0;
- } else if (line.startsWith("RelayDescriptorRawFilesDirectory")) {
- this.relayDescriptorRawFilesDirectory = line.split(" ")[1];
} else if (line.startsWith("WriteSanitizedBridges")) {
this.writeSanitizedBridges = Integer.parseInt(
line.split(" ")[1]) != 0;
@@ -199,7 +188,6 @@ public class Configuration {
!this.importWriteTorperfStats &&
!this.downloadProcessGetTorStats && !this.downloadExitList &&
!this.writeDirectoryArchives &&
- !this.writeRelayDescriptorDatabase &&
!this.writeAggregateStatsDatabase &&
!this.writeSanitizedBridges && !this.writeConsensusStats &&
!this.writeBridgeStats) {
@@ -211,9 +199,7 @@ public class Configuration {
}
if ((this.importCachedRelayDescriptors ||
this.importDirectoryArchives || this.downloadRelayDescriptors) &&
- !(this.writeDirectoryArchives ||
- this.writeRelayDescriptorDatabase ||
- this.writeRelayDescriptorsRawFiles || this.writeConsensusStats ||
+ !(this.writeDirectoryArchives || this.writeConsensusStats ||
this.writeBridgeStats)) {
logger.warning("We are configured to import/download relay "
+ "descriptors, but we don't have a single data sink to write "
@@ -221,8 +207,7 @@ public class Configuration {
}
if (!(this.importCachedRelayDescriptors ||
this.importDirectoryArchives || this.downloadRelayDescriptors) &&
- (this.writeDirectoryArchives ||
- this.writeRelayDescriptorDatabase)) {
+ this.writeDirectoryArchives) {
logger.warning("We are configured to write relay descriptor to at "
+ "least one data sink, but we don't have a single data source "
+ "containing relay descriptors.");
@@ -281,21 +266,12 @@ public class Configuration {
public boolean getKeepDirectoryArchiveImportHistory() {
return this.keepDirectoryArchiveImportHistory;
}
- public boolean getWriteRelayDescriptorDatabase() {
- return this.writeRelayDescriptorDatabase;
- }
public boolean getWriteAggregateStatsDatabase() {
return this.writeAggregateStatsDatabase;
}
public String getRelayDescriptorDatabaseJDBC() {
return this.relayDescriptorDatabaseJdbc;
}
- public boolean getWriteRelayDescriptorsRawFiles() {
- return this.writeRelayDescriptorsRawFiles;
- }
- public String getRelayDescriptorRawFilesDirectory() {
- return this.relayDescriptorRawFilesDirectory;
- }
public boolean getWriteSanitizedBridges() {
return this.writeSanitizedBridges;
}
diff --git a/src/org/torproject/ernie/db/Main.java b/src/org/torproject/ernie/db/Main.java
index a495a03..4dced2c 100644
--- a/src/org/torproject/ernie/db/Main.java
+++ b/src/org/torproject/ernie/db/Main.java
@@ -44,23 +44,11 @@ public class Main {
new ArchiveWriter(
new File(config.getDirectoryArchivesOutputDirectory())) : null;
- // Prepare writing relay descriptors to database
- RelayDescriptorDatabaseImporter rddi =
- config.getWriteRelayDescriptorDatabase() ||
- config.getWriteRelayDescriptorsRawFiles() ?
- new RelayDescriptorDatabaseImporter(
- config.getWriteRelayDescriptorDatabase() ?
- config.getRelayDescriptorDatabaseJDBC() : null,
- config.getWriteRelayDescriptorsRawFiles() ?
- config.getRelayDescriptorRawFilesDirectory() : null) : null;
-
// Prepare relay descriptor parser (only if we are writing stats or
// directory archives to disk)
RelayDescriptorParser rdp = config.getWriteBridgeStats() ||
- config.getWriteDirectoryArchives() ||
- config.getWriteRelayDescriptorDatabase() ||
- config.getWriteRelayDescriptorsRawFiles() ?
- new RelayDescriptorParser(bsfh, aw, rddi) : null;
+ config.getWriteDirectoryArchives() ?
+ new RelayDescriptorParser(bsfh, aw) : null;
// Import/download relay descriptors from the various sources
if (rdp != null) {
@@ -68,12 +56,10 @@ public class Main {
if (config.getDownloadRelayDescriptors()) {
List<String> dirSources =
config.getDownloadFromDirectoryAuthorities();
- boolean downloadCurrentConsensus = aw != null || bsfh != null ||
- rddi != null;
+ boolean downloadCurrentConsensus = aw != null || bsfh != null;
boolean downloadCurrentVotes = aw != null;
- boolean downloadAllServerDescriptors = aw != null ||
- rddi != null;
- boolean downloadAllExtraInfos = aw != null || rddi != null;
+ boolean downloadAllServerDescriptors = aw != null;
+ boolean downloadAllExtraInfos = aw != null;
rdd = new RelayDescriptorDownloader(rdp, dirSources,
downloadCurrentConsensus, downloadCurrentVotes,
downloadAllServerDescriptors, downloadAllExtraInfos);
@@ -108,11 +94,6 @@ public class Main {
}
}
- // Close database connection (if active)
- if (rddi != null) {
- rddi.closeConnection();
- }
-
// Write output to disk that only depends on relay descriptors
if (aw != null) {
aw.dumpStats();
diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
deleted file mode 100644
index 3a06a58..0000000
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ /dev/null
@@ -1,1213 +0,0 @@
-/* Copyright 2010 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.ernie.db;
-
-import java.io.*;
-import java.sql.*;
-import java.text.*;
-import java.util.*;
-import java.util.logging.*;
-import org.postgresql.util.*;
-
-/**
- * Parse directory data.
- */
-
-public final class RelayDescriptorDatabaseImporter {
-
- /**
- * How many records to commit with each database transaction.
- */
- private final long autoCommitCount = 500;
-
- /**
- * Keep track of the number of records committed before each transaction
- */
- private int rdsCount = 0;
- private int resCount = 0;
- private int rhsCount = 0;
- private int rrsCount = 0;
- private int rcsCount = 0;
- private int rvsCount = 0;
- private int rbsCount = 0;
- private int rqsCount = 0;
-
- /**
- * Relay descriptor database connection.
- */
- private Connection conn;
-
- /**
- * Prepared statement to check whether any network status consensus
- * entries matching a given valid-after time have been imported into the
- * database before.
- */
- private PreparedStatement psSs;
-
- /**
- * Prepared statement to check whether a given network status consensus
- * entry has been imported into the database before.
- */
- private PreparedStatement psRs;
-
- /**
- * Prepared statement to check whether a given extra-info descriptor has
- * been imported into the database before.
- */
- private PreparedStatement psEs;
-
- /**
- * Prepared statement to check whether a given server descriptor has
- * been imported into the database before.
- */
- private PreparedStatement psDs;
-
- /**
- * Prepared statement to check whether a given network status consensus
- * has been imported into the database before.
- */
- private PreparedStatement psCs;
-
- /**
- * Prepared statement to check whether a given network status vote has
- * been imported into the database before.
- */
- private PreparedStatement psVs;
-
- /**
- * Prepared statement to check whether a given conn-bi-direct stats
- * string has been imported into the database before.
- */
- private PreparedStatement psBs;
-
- /**
- * Prepared statement to check whether a given dirreq stats string has
- * been imported into the database before.
- */
- private PreparedStatement psQs;
-
- /**
- * Set of dates that have been inserted into the database for being
- * included in the next refresh run.
- */
- private Set<Long> scheduledUpdates;
-
- /**
- * Prepared statement to insert a date into the database that shall be
- * included in the next refresh run.
- */
- private PreparedStatement psU;
-
- /**
- * Prepared statement to insert a network status consensus entry into
- * the database.
- */
- private PreparedStatement psR;
-
- /**
- * Prepared statement to insert a server descriptor into the database.
- */
- private PreparedStatement psD;
-
- /**
- * Prepared statement to insert an extra-info descriptor into the
- * database.
- */
- private PreparedStatement psE;
-
- /**
- * Callable statement to insert the bandwidth history of an extra-info
- * descriptor into the database.
- */
- private CallableStatement csH;
-
- /**
- * Prepared statement to insert a network status consensus into the
- * database.
- */
- private PreparedStatement psC;
-
- /**
- * Prepared statement to insert a network status vote into the
- * database.
- */
- private PreparedStatement psV;
-
- /**
- * Prepared statement to insert a conn-bi-direct stats string into the
- * database.
- */
- private PreparedStatement psB;
-
- /**
- * Prepared statement to insert a given dirreq stats string into the
- * database.
- */
- private PreparedStatement psQ;
-
- /**
- * Logger for this class.
- */
- private Logger logger;
-
- /**
- * Directory for writing raw import files.
- */
- private String rawFilesDirectory;
-
- /**
- * Raw import file containing status entries.
- */
- private BufferedWriter statusentryOut;
-
- /**
- * Raw import file containing server descriptors.
- */
- private BufferedWriter descriptorOut;
-
- /**
- * Raw import file containing extra-info descriptors.
- */
- private BufferedWriter extrainfoOut;
-
- /**
- * Raw import file containing bandwidth histories.
- */
- private BufferedWriter bwhistOut;
-
- /**
- * Raw import file containing consensuses.
- */
- private BufferedWriter consensusOut;
-
- /**
- * Raw import file containing votes.
- */
- private BufferedWriter voteOut;
-
- /**
- * Raw import file containing conn-bi-direct stats strings.
- */
- private BufferedWriter connBiDirectOut;
-
- /**
- * Raw import file containing dirreq stats.
- */
- private BufferedWriter dirReqOut;
-
- /**
- * Date format to parse timestamps.
- */
- private SimpleDateFormat dateTimeFormat;
-
- /**
- * The last valid-after time for which we checked whether they have been
- * any network status entries in the database.
- */
- private long lastCheckedStatusEntries;
-
- /**
- * Set of fingerprints that we imported for the valid-after time in
- * <code>lastCheckedStatusEntries</code>.
- */
- private Set<String> insertedStatusEntries;
-
- /**
- * Flag that tells us whether we need to check whether a network status
- * entry is already contained in the database or not.
- */
- private boolean separateStatusEntryCheckNecessary;
-
- private boolean importIntoDatabase;
- private boolean writeRawImportFiles;
-
- /**
- * Initialize database importer by connecting to the database and
- * preparing statements.
- */
- public RelayDescriptorDatabaseImporter(String connectionURL,
- String rawFilesDirectory) {
-
- /* Initialize logger. */
- this.logger = Logger.getLogger(
- RelayDescriptorDatabaseImporter.class.getName());
-
- if (connectionURL != null) {
- try {
- /* Connect to database. */
- this.conn = DriverManager.getConnection(connectionURL);
-
- /* Turn autocommit off */
- this.conn.setAutoCommit(false);
-
- /* Prepare statements. */
- this.psSs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM statusentry WHERE validafter = ?");
- this.psRs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM statusentry WHERE validafter = ? AND descriptor = ?");
- this.psDs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM descriptor WHERE descriptor = ?");
- this.psEs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM extrainfo WHERE extrainfo = ?");
- this.psCs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM consensus WHERE validafter = ?");
- this.psVs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM vote WHERE validafter = ? AND dirsource = ?");
- this.psBs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM connbidirect WHERE source = ? AND statsend = ?");
- this.psQs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM dirreq_stats WHERE source = ? AND statsend = ?");
- this.psR = conn.prepareStatement("INSERT INTO statusentry "
- + "(validafter, nickname, fingerprint, descriptor, "
- + "published, address, orport, dirport, isauthority, "
- + "isbadexit, isbaddirectory, isexit, isfast, isguard, "
- + "ishsdir, isnamed, isstable, isrunning, isunnamed, "
- + "isvalid, isv2dir, isv3dir, version, bandwidth, ports, "
- + "rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
- + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
- this.psD = conn.prepareStatement("INSERT INTO descriptor "
- + "(descriptor, nickname, address, orport, dirport, "
- + "fingerprint, bandwidthavg, bandwidthburst, "
- + "bandwidthobserved, platform, published, uptime, "
- + "extrainfo, rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
- + "?, ?, ?, ?)");
- this.psE = conn.prepareStatement("INSERT INTO extrainfo "
- + "(extrainfo, nickname, fingerprint, published, rawdesc) "
- + "VALUES (?, ?, ?, ?, ?)");
- this.csH = conn.prepareCall("{call insert_bwhist(?, ?, ?, ?, ?, "
- + "?)}");
- this.psC = conn.prepareStatement("INSERT INTO consensus "
- + "(validafter, rawdesc) VALUES (?, ?)");
- this.psV = conn.prepareStatement("INSERT INTO vote "
- + "(validafter, dirsource, rawdesc) VALUES (?, ?, ?)");
- this.psB = conn.prepareStatement("INSERT INTO connbidirect "
- + "(source, statsend, seconds, belownum, readnum, writenum, "
- + "bothnum) VALUES (?, ?, ?, ?, ?, ?, ?)");
- this.psQ = conn.prepareStatement("INSERT INTO dirreq_stats "
- + "(source, statsend, seconds, country, requests) VALUES "
- + "(?, ?, ?, ?, ?)");
- this.psU = conn.prepareStatement("INSERT INTO scheduled_updates "
- + "(date) VALUES (?)");
- this.scheduledUpdates = new HashSet<Long>();
- this.importIntoDatabase = true;
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not connect to database or "
- + "prepare statements.", e);
- }
-
- /* Initialize set of fingerprints to remember which status entries
- * we already imported. */
- this.insertedStatusEntries = new HashSet<String>();
- }
-
- /* Remember where we want to write raw import files. */
- if (rawFilesDirectory != null) {
- this.rawFilesDirectory = rawFilesDirectory;
- this.writeRawImportFiles = true;
- }
-
- /* Initialize date format, so that we can format timestamps. */
- this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- }
-
- private void addDateToScheduledUpdates(long timestamp)
- throws SQLException {
- if (!this.importIntoDatabase) {
- return;
- }
- long dateMillis = 0L;
- try {
- dateMillis = this.dateTimeFormat.parse(
- this.dateTimeFormat.format(timestamp).substring(0, 10)
- + " 00:00:00").getTime();
- } catch (ParseException e) {
- this.logger.log(Level.WARNING, "Internal parsing error.", e);
- return;
- }
- if (!this.scheduledUpdates.contains(dateMillis)) {
- this.psU.setDate(1, new java.sql.Date(dateMillis));
- this.psU.execute();
- this.scheduledUpdates.add(dateMillis);
- }
- }
-
- /**
- * Insert network status consensus entry into database.
- */
- public void addStatusEntry(long validAfter, String nickname,
- String fingerprint, String descriptor, long published,
- String address, long orPort, long dirPort,
- SortedSet<String> flags, String version, long bandwidth,
- String ports, byte[] rawDescriptor) {
- if (this.importIntoDatabase) {
- try {
- this.addDateToScheduledUpdates(validAfter);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- Timestamp validAfterTimestamp = new Timestamp(validAfter);
- if (lastCheckedStatusEntries != validAfter) {
- this.psSs.setTimestamp(1, validAfterTimestamp, cal);
- ResultSet rs = psSs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- separateStatusEntryCheckNecessary = false;
- insertedStatusEntries.clear();
- } else {
- separateStatusEntryCheckNecessary = true;
- }
- rs.close();
- lastCheckedStatusEntries = validAfter;
- }
- boolean alreadyContained = false;
- if (separateStatusEntryCheckNecessary ||
- insertedStatusEntries.contains(fingerprint)) {
- this.psRs.setTimestamp(1, validAfterTimestamp, cal);
- this.psRs.setString(2, descriptor);
- ResultSet rs = psRs.executeQuery();
- rs.next();
- if (rs.getInt(1) > 0) {
- alreadyContained = true;
- }
- rs.close();
- } else {
- insertedStatusEntries.add(fingerprint);
- }
- if (!alreadyContained) {
- this.psR.clearParameters();
- this.psR.setTimestamp(1, validAfterTimestamp, cal);
- this.psR.setString(2, nickname);
- this.psR.setString(3, fingerprint);
- this.psR.setString(4, descriptor);
- this.psR.setTimestamp(5, new Timestamp(published), cal);
- this.psR.setString(6, address);
- this.psR.setLong(7, orPort);
- this.psR.setLong(8, dirPort);
- this.psR.setBoolean(9, flags.contains("Authority"));
- this.psR.setBoolean(10, flags.contains("BadExit"));
- this.psR.setBoolean(11, flags.contains("BadDirectory"));
- this.psR.setBoolean(12, flags.contains("Exit"));
- this.psR.setBoolean(13, flags.contains("Fast"));
- this.psR.setBoolean(14, flags.contains("Guard"));
- this.psR.setBoolean(15, flags.contains("HSDir"));
- this.psR.setBoolean(16, flags.contains("Named"));
- this.psR.setBoolean(17, flags.contains("Stable"));
- this.psR.setBoolean(18, flags.contains("Running"));
- this.psR.setBoolean(19, flags.contains("Unnamed"));
- this.psR.setBoolean(20, flags.contains("Valid"));
- this.psR.setBoolean(21, flags.contains("V2Dir"));
- this.psR.setBoolean(22, flags.contains("V3Dir"));
- this.psR.setString(23, version);
- this.psR.setLong(24, bandwidth);
- this.psR.setString(25, ports);
- this.psR.setBytes(26, rawDescriptor);
- this.psR.executeUpdate();
- rrsCount++;
- if (rrsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add network status "
- + "consensus entry. We won't make any further SQL requests "
- + "in this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.statusentryOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.statusentryOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/statusentry.sql"));
- this.statusentryOut.write(" COPY statusentry (validafter, "
- + "nickname, fingerprint, descriptor, published, address, "
- + "orport, dirport, isauthority, isbadExit, "
- + "isbaddirectory, isexit, isfast, isguard, ishsdir, "
- + "isnamed, isstable, isrunning, isunnamed, isvalid, "
- + "isv2dir, isv3dir, version, bandwidth, ports, rawdesc) "
- + "FROM stdin;\n");
- }
- this.statusentryOut.write(
- this.dateTimeFormat.format(validAfter) + "\t" + nickname
- + "\t" + fingerprint.toLowerCase() + "\t"
- + descriptor.toLowerCase() + "\t"
- + this.dateTimeFormat.format(published) + "\t" + address
- + "\t" + orPort + "\t" + dirPort + "\t"
- + (flags.contains("Authority") ? "t" : "f") + "\t"
- + (flags.contains("BadExit") ? "t" : "f") + "\t"
- + (flags.contains("BadDirectory") ? "t" : "f") + "\t"
- + (flags.contains("Exit") ? "t" : "f") + "\t"
- + (flags.contains("Fast") ? "t" : "f") + "\t"
- + (flags.contains("Guard") ? "t" : "f") + "\t"
- + (flags.contains("HSDir") ? "t" : "f") + "\t"
- + (flags.contains("Named") ? "t" : "f") + "\t"
- + (flags.contains("Stable") ? "t" : "f") + "\t"
- + (flags.contains("Running") ? "t" : "f") + "\t"
- + (flags.contains("Unnamed") ? "t" : "f") + "\t"
- + (flags.contains("Valid") ? "t" : "f") + "\t"
- + (flags.contains("V2Dir") ? "t" : "f") + "\t"
- + (flags.contains("V3Dir") ? "t" : "f") + "\t"
- + (version != null ? version : "\\N") + "\t"
- + (bandwidth >= 0 ? bandwidth : "\\N") + "\t"
- + (ports != null ? ports : "\\N") + "\t");
- this.statusentryOut.write(PGbytea.toPGString(rawDescriptor).
- replaceAll("\\\\", "\\\\\\\\") + "\n");
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not write network status "
- + "consensus entry to raw database import file. We won't "
- + "make any further attempts to write raw import files in "
- + "this execution.", e);
- this.writeRawImportFiles = false;
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write network status "
- + "consensus entry to raw database import file. We won't "
- + "make any further attempts to write raw import files in "
- + "this execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- /**
- * Insert server descriptor into database.
- */
- public void addServerDescriptor(String descriptor, String nickname,
- String address, int orPort, int dirPort, String relayIdentifier,
- long bandwidthAvg, long bandwidthBurst, long bandwidthObserved,
- String platform, long published, long uptime,
- String extraInfoDigest, byte[] rawDescriptor) {
- if (this.importIntoDatabase) {
- try {
- this.addDateToScheduledUpdates(published);
- this.addDateToScheduledUpdates(
- published + 24L * 60L * 60L * 1000L);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- this.psDs.setString(1, descriptor);
- ResultSet rs = psDs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- this.psD.clearParameters();
- this.psD.setString(1, descriptor);
- this.psD.setString(2, nickname);
- this.psD.setString(3, address);
- this.psD.setInt(4, orPort);
- this.psD.setInt(5, dirPort);
- this.psD.setString(6, relayIdentifier);
- this.psD.setLong(7, bandwidthAvg);
- this.psD.setLong(8, bandwidthBurst);
- this.psD.setLong(9, bandwidthObserved);
- this.psD.setString(10, new String(platform.getBytes(),
- "US-ASCII"));
- this.psD.setTimestamp(11, new Timestamp(published), cal);
- this.psD.setLong(12, uptime);
- this.psD.setString(13, extraInfoDigest);
- this.psD.setBytes(14, rawDescriptor);
- this.psD.executeUpdate();
- rdsCount++;
- if (rdsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (UnsupportedEncodingException e) {
- // US-ASCII is supported for sure
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add server "
- + "descriptor. We won't make any further SQL requests in "
- + "this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.descriptorOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.descriptorOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/descriptor.sql"));
- this.descriptorOut.write(" COPY descriptor (descriptor, "
- + "nickname, address, orport, dirport, fingerprint, "
- + "bandwidthavg, bandwidthburst, bandwidthobserved, "
- + "platform, published, uptime, extrainfo, rawdesc) FROM "
- + "stdin;\n");
- }
- this.descriptorOut.write(descriptor.toLowerCase() + "\t"
- + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort
- + "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t"
- + bandwidthBurst + "\t" + bandwidthObserved + "\t"
- + (platform != null && platform.length() > 0
- ? new String(platform.getBytes(), "US-ASCII") : "\\N")
- + "\t" + this.dateTimeFormat.format(published) + "\t"
- + (uptime >= 0 ? uptime : "\\N") + "\t"
- + (extraInfoDigest != null ? extraInfoDigest : "\\N")
- + "\t");
- this.descriptorOut.write(PGbytea.toPGString(rawDescriptor).
- replaceAll("\\\\", "\\\\\\\\") + "\n");
- } catch (UnsupportedEncodingException e) {
- // US-ASCII is supported for sure
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not write server "
- + "descriptor to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write server "
- + "descriptor to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- /**
- * Insert extra-info descriptor into database.
- */
- public void addExtraInfoDescriptor(String extraInfoDigest,
- String nickname, String fingerprint, long published,
- byte[] rawDescriptor, List<String> bandwidthHistoryLines) {
- if (this.importIntoDatabase) {
- try {
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- this.psEs.setString(1, extraInfoDigest);
- ResultSet rs = psEs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- this.psE.clearParameters();
- this.psE.setString(1, extraInfoDigest);
- this.psE.setString(2, nickname);
- this.psE.setString(3, fingerprint);
- this.psE.setTimestamp(4, new Timestamp(published), cal);
- this.psE.setBytes(5, rawDescriptor);
- this.psE.executeUpdate();
- resCount++;
- if (resCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add extra-info "
- + "descriptor. We won't make any further SQL requests in "
- + "this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.extrainfoOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.extrainfoOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/extrainfo.sql"));
- this.extrainfoOut.write(" COPY extrainfo (extrainfo, nickname, "
- + "fingerprint, published, rawdesc) FROM stdin;\n");
- }
- this.extrainfoOut.write(extraInfoDigest.toLowerCase() + "\t"
- + nickname + "\t" + fingerprint.toLowerCase() + "\t"
- + this.dateTimeFormat.format(published) + "\t");
- this.extrainfoOut.write(PGbytea.toPGString(rawDescriptor).
- replaceAll("\\\\", "\\\\\\\\") + "\n");
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write extra-info "
- + "descriptor to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not write extra-info "
- + "descriptor to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- if (!bandwidthHistoryLines.isEmpty()) {
- this.addBandwidthHistory(fingerprint.toLowerCase(), published,
- bandwidthHistoryLines);
- }
- }
-
- private static class BigIntArray implements java.sql.Array {
-
- private final String stringValue;
-
- public BigIntArray(long[] array, int offset) {
- if (array == null) {
- this.stringValue = "[-1:-1]={0}";
- } else {
- StringBuilder sb = new StringBuilder("[" + offset + ":"
- + (offset + array.length - 1) + "]={");
- for (int i = 0; i < array.length; i++) {
- sb.append((i > 0 ? "," : "") + array[i]);
- }
- sb.append('}');
- this.stringValue = sb.toString();
- }
- }
-
- public String toString() {
- return stringValue;
- }
-
- public String getBaseTypeName() {
- return "int8";
- }
-
- /* The other methods are never called; no need to implement them. */
- public void free() {
- throw new UnsupportedOperationException();
- }
- public Object getArray() {
- throw new UnsupportedOperationException();
- }
- public Object getArray(long index, int count) {
- throw new UnsupportedOperationException();
- }
- public Object getArray(long index, int count,
- Map<String, Class<?>> map) {
- throw new UnsupportedOperationException();
- }
- public Object getArray(Map<String, Class<?>> map) {
- throw new UnsupportedOperationException();
- }
- public int getBaseType() {
- throw new UnsupportedOperationException();
- }
- public ResultSet getResultSet() {
- throw new UnsupportedOperationException();
- }
- public ResultSet getResultSet(long index, int count) {
- throw new UnsupportedOperationException();
- }
- public ResultSet getResultSet(long index, int count,
- Map<String, Class<?>> map) {
- throw new UnsupportedOperationException();
- }
- public ResultSet getResultSet(Map<String, Class<?>> map) {
- throw new UnsupportedOperationException();
- }
- }
-
- public void addBandwidthHistory(String fingerprint, long published,
- List<String> bandwidthHistoryStrings) {
-
- /* Split history lines by date and rewrite them so that the date
- * comes first. */
- SortedSet<String> historyLinesByDate = new TreeSet<String>();
- for (String bandwidthHistoryString : bandwidthHistoryStrings) {
- String[] parts = bandwidthHistoryString.split(" ");
- if (parts.length != 6) {
- this.logger.finer("Bandwidth history line does not have expected "
- + "number of elements. Ignoring this line.");
- continue;
- }
- long intervalLength = 0L;
- try {
- intervalLength = Long.parseLong(parts[3].substring(1));
- } catch (NumberFormatException e) {
- this.logger.fine("Bandwidth history line does not have valid "
- + "interval length '" + parts[3] + " " + parts[4] + "'. "
- + "Ignoring this line.");
- continue;
- }
- if (intervalLength != 900L) {
- this.logger.fine("Bandwidth history line does not consist of "
- + "15-minute intervals. Ignoring this line.");
- continue;
- }
- String type = parts[0];
- String intervalEndTime = parts[1] + " " + parts[2];
- long intervalEnd, dateStart;
- try {
- intervalEnd = dateTimeFormat.parse(intervalEndTime).getTime();
- dateStart = dateTimeFormat.parse(parts[1] + " 00:00:00").
- getTime();
- } catch (ParseException e) {
- this.logger.fine("Parse exception while parsing timestamp in "
- + "bandwidth history line. Ignoring this line.");
- continue;
- }
- if (Math.abs(published - intervalEnd) >
- 7L * 24L * 60L * 60L * 1000L) {
- this.logger.fine("Extra-info descriptor publication time "
- + dateTimeFormat.format(published) + " and last interval "
- + "time " + intervalEndTime + " in " + type + " line differ "
- + "by more than 7 days! Not adding this line!");
- continue;
- }
- long currentIntervalEnd = intervalEnd;
- StringBuilder sb = new StringBuilder();
- String[] values = parts[5].split(",");
- SortedSet<String> newHistoryLines = new TreeSet<String>();
- try {
- for (int i = values.length - 1; i >= -1; i--) {
- if (i == -1 || currentIntervalEnd < dateStart) {
- sb.insert(0, intervalEndTime + " " + type + " ("
- + intervalLength + " s) ");
- sb.setLength(sb.length() - 1);
- String historyLine = sb.toString();
- newHistoryLines.add(historyLine);
- sb = new StringBuilder();
- dateStart -= 24L * 60L * 60L * 1000L;
- intervalEndTime = dateTimeFormat.format(currentIntervalEnd);
- }
- if (i == -1) {
- break;
- }
- Long.parseLong(values[i]);
- sb.insert(0, values[i] + ",");
- currentIntervalEnd -= intervalLength * 1000L;
- }
- } catch (NumberFormatException e) {
- this.logger.fine("Number format exception while parsing "
- + "bandwidth history line. Ignoring this line.");
- continue;
- }
- historyLinesByDate.addAll(newHistoryLines);
- }
-
- /* Add split history lines to database. */
- String lastDate = null;
- historyLinesByDate.add("EOL");
- long[] readArray = null, writtenArray = null, dirreadArray = null,
- dirwrittenArray = null;
- int readOffset = 0, writtenOffset = 0, dirreadOffset = 0,
- dirwrittenOffset = 0;
- for (String historyLine : historyLinesByDate) {
- String[] parts = historyLine.split(" ");
- String currentDate = parts[0];
- if (lastDate != null && (historyLine.equals("EOL") ||
- !currentDate.equals(lastDate))) {
- BigIntArray readIntArray = new BigIntArray(readArray,
- readOffset);
- BigIntArray writtenIntArray = new BigIntArray(writtenArray,
- writtenOffset);
- BigIntArray dirreadIntArray = new BigIntArray(dirreadArray,
- dirreadOffset);
- BigIntArray dirwrittenIntArray = new BigIntArray(dirwrittenArray,
- dirwrittenOffset);
- if (this.importIntoDatabase) {
- try {
- long dateMillis = dateTimeFormat.parse(lastDate
- + " 00:00:00").getTime();
- this.addDateToScheduledUpdates(dateMillis);
- this.csH.setString(1, fingerprint);
- this.csH.setDate(2, new java.sql.Date(dateMillis));
- this.csH.setArray(3, readIntArray);
- this.csH.setArray(4, writtenIntArray);
- this.csH.setArray(5, dirreadIntArray);
- this.csH.setArray(6, dirwrittenIntArray);
- this.csH.addBatch();
- rhsCount++;
- if (rhsCount % autoCommitCount == 0) {
- this.csH.executeBatch();
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not insert bandwidth "
- + "history line into database. We won't make any "
- + "further SQL requests in this execution.", e);
- this.importIntoDatabase = false;
- } catch (ParseException e) {
- this.logger.log(Level.WARNING, "Could not insert bandwidth "
- + "history line into database. We won't make any "
- + "further SQL requests in this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.bwhistOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.bwhistOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/bwhist.sql"));
- }
- this.bwhistOut.write("SELECT insert_bwhist('" + fingerprint
- + "','" + lastDate + "','" + readIntArray.toString()
- + "','" + writtenIntArray.toString() + "','"
- + dirreadIntArray.toString() + "','"
- + dirwrittenIntArray.toString() + "');\n");
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write bandwidth "
- + "history to raw database import file. We won't make "
- + "any further attempts to write raw import files in "
- + "this execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- readArray = writtenArray = dirreadArray = dirwrittenArray = null;
- }
- if (historyLine.equals("EOL")) {
- break;
- }
- long lastIntervalTime;
- try {
- lastIntervalTime = dateTimeFormat.parse(parts[0] + " "
- + parts[1]).getTime() - dateTimeFormat.parse(parts[0]
- + " 00:00:00").getTime();
- } catch (ParseException e) {
- continue;
- }
- String[] stringValues = parts[5].split(",");
- long[] longValues = new long[stringValues.length];
- for (int i = 0; i < longValues.length; i++) {
- longValues[i] = Long.parseLong(stringValues[i]);
- }
-
- int offset = (int) (lastIntervalTime / (15L * 60L * 1000L))
- - longValues.length + 1;
- String type = parts[2];
- if (type.equals("read-history")) {
- readArray = longValues;
- readOffset = offset;
- } else if (type.equals("write-history")) {
- writtenArray = longValues;
- writtenOffset = offset;
- } else if (type.equals("dirreq-read-history")) {
- dirreadArray = longValues;
- dirreadOffset = offset;
- } else if (type.equals("dirreq-write-history")) {
- dirwrittenArray = longValues;
- dirwrittenOffset = offset;
- }
- lastDate = currentDate;
- }
- }
-
- /**
- * Insert network status consensus into database.
- */
- public void addConsensus(long validAfter, byte[] rawDescriptor) {
- if (this.importIntoDatabase) {
- try {
- this.addDateToScheduledUpdates(validAfter);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- Timestamp validAfterTimestamp = new Timestamp(validAfter);
- this.psCs.setTimestamp(1, validAfterTimestamp, cal);
- ResultSet rs = psCs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- this.psC.clearParameters();
- this.psC.setTimestamp(1, validAfterTimestamp, cal);
- this.psC.setBytes(2, rawDescriptor);
- this.psC.executeUpdate();
- rcsCount++;
- if (rcsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add network status "
- + "consensus. We won't make any further SQL requests in "
- + "this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.consensusOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.consensusOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/consensus.sql"));
- this.consensusOut.write(" COPY consensus (validafter, rawdesc) "
- + "FROM stdin;\n");
- }
- String validAfterString = this.dateTimeFormat.format(validAfter);
- this.consensusOut.write(validAfterString + "\t");
- this.consensusOut.write(PGbytea.toPGString(rawDescriptor).
- replaceAll("\\\\", "\\\\\\\\") + "\n");
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not write network status "
- + "consensus to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write network status "
- + "consensus to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- /**
- * Insert network status vote into database.
- */
- public void addVote(long validAfter, String dirSource,
- byte[] rawDescriptor) {
- if (this.importIntoDatabase) {
- try {
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- Timestamp validAfterTimestamp = new Timestamp(validAfter);
- this.psVs.setTimestamp(1, validAfterTimestamp, cal);
- this.psVs.setString(2, dirSource);
- ResultSet rs = psVs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- this.psV.clearParameters();
- this.psV.setTimestamp(1, validAfterTimestamp, cal);
- this.psV.setString(2, dirSource);
- this.psV.setBytes(3, rawDescriptor);
- this.psV.executeUpdate();
- rvsCount++;
- if (rvsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add network status "
- + "vote. We won't make any further SQL requests in this "
- + "execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.voteOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.voteOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/vote.sql"));
- this.voteOut.write(" COPY vote (validafter, dirsource, "
- + "rawdesc) FROM stdin;\n");
- }
- String validAfterString = this.dateTimeFormat.format(validAfter);
- this.voteOut.write(validAfterString + "\t" + dirSource + "\t");
- this.voteOut.write(PGbytea.toPGString(rawDescriptor).
- replaceAll("\\\\", "\\\\\\\\") + "\n");
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not write network status "
- + "vote to raw database import file. We won't make any "
- + "further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write network status "
- + "vote to raw database import file. We won't make any "
- + "further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- /**
- * Insert a conn-bi-direct stats string into the database.
- */
- public void addConnBiDirect(String source, String statsEnd,
- long seconds, long below, long read, long write, long both) {
- long statsEndTime = 0L;
- try {
- statsEndTime = this.dateTimeFormat.parse(statsEnd).getTime();
- } catch (ParseException e) {
- this.logger.log(Level.WARNING, "Could not add conn-bi-direct "
- + "stats string with interval ending '" + statsEnd + "'.", e);
- return;
- }
- if (this.importIntoDatabase) {
- try {
- this.addDateToScheduledUpdates(statsEndTime);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- Timestamp statsEndTimestamp = new Timestamp(statsEndTime);
- this.psBs.setString(1, source);
- this.psBs.setTimestamp(2, statsEndTimestamp, cal);
- ResultSet rs = psBs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- this.psB.clearParameters();
- this.psB.setString(1, source);
- this.psB.setTimestamp(2, statsEndTimestamp, cal);
- this.psB.setLong(3, seconds);
- this.psB.setLong(4, below);
- this.psB.setLong(5, read);
- this.psB.setLong(6, write);
- this.psB.setLong(7, both);
- this.psB.executeUpdate();
- rbsCount++;
- if (rbsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add conn-bi-direct "
- + "stats string. We won't make any further SQL requests in "
- + "this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.connBiDirectOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.connBiDirectOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/connbidirect.sql"));
- this.connBiDirectOut.write(" COPY connbidirect (source, "
- + "statsend, seconds, belownum, readnum, writenum, "
- + "bothnum) FROM stdin;\n");
- }
- this.connBiDirectOut.write(source + "\t" + statsEnd + "\t"
- + seconds + "\t" + below + "\t" + read + "\t" + write + "\t"
- + both + "\n");
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write conn-bi-direct "
- + "stats string to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- /**
- * Adds observations on the number of directory requests by country as
- * seen on a directory at a given date to the database.
- */
- public void addDirReqStats(String source, String statsEnd, long seconds,
- Map<String, String> dirReqsPerCountry) {
- long statsEndTime = 0L;
- try {
- statsEndTime = this.dateTimeFormat.parse(statsEnd).getTime();
- } catch (ParseException e) {
- this.logger.log(Level.WARNING, "Could not add dirreq stats with "
- + "interval ending '" + statsEnd + "'.", e);
- return;
- }
- if (this.importIntoDatabase) {
- try {
- this.addDateToScheduledUpdates(statsEndTime);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- Timestamp statsEndTimestamp = new Timestamp(statsEndTime);
- this.psQs.setString(1, source);
- this.psQs.setTimestamp(2, statsEndTimestamp, cal);
- ResultSet rs = psQs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- for (Map.Entry<String, String> e :
- dirReqsPerCountry.entrySet()) {
- this.psQ.clearParameters();
- this.psQ.setString(1, source);
- this.psQ.setTimestamp(2, statsEndTimestamp, cal);
- this.psQ.setLong(3, seconds);
- this.psQ.setString(4, e.getKey());
- this.psQ.setLong(5, Long.parseLong(e.getValue()));
- this.psQ.executeUpdate();
- rqsCount++;
- if (rqsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add dirreq stats. We "
- + "won't make any further SQL requests in this execution.",
- e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.dirReqOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.dirReqOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/dirreq_stats.sql"));
- this.dirReqOut.write(" COPY dirreq_stats (source, statsend, "
- + "seconds, country, requests) FROM stdin;\n");
- }
- for (Map.Entry<String, String> e :
- dirReqsPerCountry.entrySet()) {
- this.dirReqOut.write(source + "\t" + statsEnd + "\t" + seconds
- + "\t" + e.getKey() + "\t" + e.getValue() + "\n");
- }
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write dirreq stats to "
- + "raw database import file. We won't make any further "
- + "attempts to write raw import files in this execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- /**
- * Close the relay descriptor database connection.
- */
- public void closeConnection() {
-
- /* Log stats about imported descriptors. */
- this.logger.info(String.format("Finished importing relay "
- + "descriptors: %d consensuses, %d network status entries, %d "
- + "votes, %d server descriptors, %d extra-info descriptors, %d "
- + "bandwidth history elements, %d dirreq stats elements, and %d "
- + "conn-bi-direct stats lines", rcsCount, rrsCount, rvsCount,
- rdsCount, resCount, rhsCount, rqsCount, rbsCount));
-
- /* Insert scheduled updates a second time, just in case the refresh
- * run has started since inserting them the first time in which case
- * it will miss the data inserted afterwards. We cannot, however,
- * insert them only now, because if a Java execution fails at a random
- * point, we might have added data, but not the corresponding dates to
- * update statistics. */
- if (this.importIntoDatabase) {
- try {
- for (long dateMillis : this.scheduledUpdates) {
- this.psU.setDate(1, new java.sql.Date(dateMillis));
- this.psU.execute();
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add scheduled dates "
- + "for the next refresh run.", e);
- }
- }
-
- /* Commit any stragglers before closing. */
- if (this.conn != null) {
- try {
- this.csH.executeBatch();
-
- this.conn.commit();
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not commit final records to "
- + "database", e);
- }
- try {
- this.conn.close();
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not close database "
- + "connection.", e);
- }
- }
-
- /* Close raw import files. */
- try {
- if (this.statusentryOut != null) {
- this.statusentryOut.write("\\.\n");
- this.statusentryOut.close();
- }
- if (this.descriptorOut != null) {
- this.descriptorOut.write("\\.\n");
- this.descriptorOut.close();
- }
- if (this.extrainfoOut != null) {
- this.extrainfoOut.write("\\.\n");
- this.extrainfoOut.close();
- }
- if (this.bwhistOut != null) {
- this.bwhistOut.write("\\.\n");
- this.bwhistOut.close();
- }
- if (this.consensusOut != null) {
- this.consensusOut.write("\\.\n");
- this.consensusOut.close();
- }
- if (this.voteOut != null) {
- this.voteOut.write("\\.\n");
- this.voteOut.close();
- }
- if (this.connBiDirectOut != null) {
- this.connBiDirectOut.write("\\.\n");
- this.connBiDirectOut.close();
- }
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not close one or more raw "
- + "database import files.", e);
- }
- }
-}
-
diff --git a/src/org/torproject/ernie/db/RelayDescriptorParser.java b/src/org/torproject/ernie/db/RelayDescriptorParser.java
index e94b55b..fcfc0eb 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorParser.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorParser.java
@@ -35,12 +35,6 @@ public class RelayDescriptorParser {
private RelayDescriptorDownloader rdd;
/**
- * Relay descriptor database importer that stores relay descriptor
- * contents for later evaluation.
- */
- private RelayDescriptorDatabaseImporter rddi;
-
- /**
* Logger for this class.
*/
private Logger logger;
@@ -51,10 +45,9 @@ public class RelayDescriptorParser {
* Initializes this class.
*/
public RelayDescriptorParser(BridgeStatsFileHandler bsfh,
- ArchiveWriter aw, RelayDescriptorDatabaseImporter rddi) {
+ ArchiveWriter aw) {
this.bsfh = bsfh;
this.aw = aw;
- this.rddi = rddi;
/* Initialize logger. */
this.logger = Logger.getLogger(RelayDescriptorParser.class.getName());
@@ -89,18 +82,12 @@ public class RelayDescriptorParser {
// time to see when we switch from hourly to half-hourly
// consensuses
boolean isConsensus = true;
- int exit = 0, fast = 0, guard = 0, running = 0, stable = 0;
- String validAfterTime = null, nickname = null,
- relayIdentity = null, serverDesc = null, version = null,
- ports = null;
- String fingerprint = null, dirSource = null, address = null;
- long validAfter = -1L, published = -1L, bandwidth = -1L,
- orPort = 0L, dirPort = 0L;
+ String validAfterTime = null, fingerprint = null,
+ dirSource = null;
+ long validAfter = -1L;
SortedSet<String> dirSources = new TreeSet<String>();
SortedSet<String> serverDescriptors = new TreeSet<String>();
SortedSet<String> hashedRelayIdentities = new TreeSet<String>();
- SortedSet<String> relayFlags = null;
- StringBuilder rawStatusEntry = null;
while ((line = br.readLine()) != null) {
if (line.equals("vote-status vote")) {
isConsensus = false;
@@ -114,19 +101,6 @@ public class RelayDescriptorParser {
} else if (line.startsWith("fingerprint ")) {
fingerprint = line.split(" ")[1];
} else if (line.startsWith("r ")) {
- if (isConsensus && relayIdentity != null &&
- this.rddi != null) {
- byte[] rawDescriptor = rawStatusEntry.toString().getBytes();
- this.rddi.addStatusEntry(validAfter, nickname,
- relayIdentity, serverDesc, published, address, orPort,
- dirPort, relayFlags, version, bandwidth, ports,
- rawDescriptor);
- relayFlags = null;
- version = null;
- bandwidth = -1L;
- ports = null;
- }
- rawStatusEntry = new StringBuilder(line + "\n");
String[] parts = line.split(" ");
if (parts.length < 9) {
this.logger.log(Level.WARNING, "Could not parse r line '"
@@ -134,65 +108,19 @@ public class RelayDescriptorParser {
break;
}
String publishedTime = parts[4] + " " + parts[5];
- nickname = parts[1];
- relayIdentity = Hex.encodeHexString(
+ String relayIdentity = Hex.encodeHexString(
Base64.decodeBase64(parts[2] + "=")).
toLowerCase();
- serverDesc = Hex.encodeHexString(Base64.decodeBase64(
+ String serverDesc = Hex.encodeHexString(Base64.decodeBase64(
parts[3] + "=")).toLowerCase();
serverDescriptors.add(publishedTime + "," + relayIdentity
+ "," + serverDesc);
hashedRelayIdentities.add(DigestUtils.shaHex(
Base64.decodeBase64(parts[2] + "=")).
toUpperCase());
- published = parseFormat.parse(parts[4] + " " + parts[5]).
- getTime();
- address = parts[6];
- orPort = Long.parseLong(parts[7]);
- dirPort = Long.parseLong(parts[8]);
- } else if (line.startsWith("s ") || line.equals("s")) {
- rawStatusEntry.append(line + "\n");
- if (line.contains(" Running")) {
- exit += line.contains(" Exit") ? 1 : 0;
- fast += line.contains(" Fast") ? 1 : 0;
- guard += line.contains(" Guard") ? 1 : 0;
- stable += line.contains(" Stable") ? 1 : 0;
- running++;
- }
- relayFlags = new TreeSet<String>();
- if (line.length() > 2) {
- for (String flag : line.substring(2).split(" ")) {
- relayFlags.add(flag);
- }
- }
- } else if (line.startsWith("v ")) {
- rawStatusEntry.append(line + "\n");
- version = line.substring(2);
- } else if (line.startsWith("w ")) {
- rawStatusEntry.append(line + "\n");
- String[] parts = line.split(" ");
- for (String part : parts) {
- if (part.startsWith("Bandwidth=")) {
- bandwidth = Long.parseLong(part.substring(
- "Bandwidth=".length()));
- }
- }
- } else if (line.startsWith("p ")) {
- rawStatusEntry.append(line + "\n");
- ports = line.substring(2);
}
}
if (isConsensus) {
- if (this.rddi != null) {
- this.rddi.addConsensus(validAfter, data);
- if (relayIdentity != null) {
- byte[] rawDescriptor = rawStatusEntry.toString().getBytes();
- this.rddi.addStatusEntry(validAfter, nickname,
- relayIdentity, serverDesc, published, address, orPort,
- dirPort, relayFlags, version, bandwidth, ports,
- rawDescriptor);
- }
- }
if (this.bsfh != null) {
for (String hashedRelayIdentity : hashedRelayIdentities) {
this.bsfh.addHashedRelay(hashedRelayIdentity);
@@ -206,9 +134,6 @@ public class RelayDescriptorParser {
this.aw.storeConsensus(data, validAfter);
}
} else {
- if (this.rddi != null) {
- this.rddi.addVote(validAfter, dirSource, data);
- }
if (this.rdd != null) {
this.rdd.haveParsedVote(validAfterTime, fingerprint,
serverDescriptors);
@@ -231,19 +156,16 @@ public class RelayDescriptorParser {
}
}
} else if (line.startsWith("router ")) {
- String platformLine = null, publishedLine = null,
- publishedTime = null, bandwidthLine = null,
+ String publishedLine = null, publishedTime = null,
extraInfoDigest = null, relayIdentifier = null;
String[] parts = line.split(" ");
String nickname = parts[1];
String address = parts[2];
int orPort = Integer.parseInt(parts[3]);
int dirPort = Integer.parseInt(parts[4]);
- long published = -1L, uptime = -1L;
+ long published = -1L;
while ((line = br.readLine()) != null) {
- if (line.startsWith("platform ")) {
- platformLine = line;
- } else if (line.startsWith("published ")) {
+ if (line.startsWith("published ")) {
publishedTime = line.substring("published ".length());
published = parseFormat.parse(publishedTime).getTime();
} else if (line.startsWith("opt fingerprint") ||
@@ -251,15 +173,11 @@ public class RelayDescriptorParser {
relayIdentifier = line.substring(line.startsWith("opt ") ?
"opt fingerprint".length() : "fingerprint".length()).
replaceAll(" ", "").toLowerCase();
- } else if (line.startsWith("bandwidth ")) {
- bandwidthLine = line;
} else if (line.startsWith("opt extra-info-digest ") ||
line.startsWith("extra-info-digest ")) {
extraInfoDigest = line.startsWith("opt ") ?
line.split(" ")[2].toLowerCase() :
line.split(" ")[1].toLowerCase();
- } else if (line.startsWith("uptime ")) {
- uptime = Long.parseLong(line.substring("uptime ".length()));
}
}
String ascii = new String(data, "US-ASCII");
@@ -280,93 +198,14 @@ public class RelayDescriptorParser {
this.rdd.haveParsedServerDescriptor(publishedTime,
relayIdentifier, digest, extraInfoDigest);
}
- if (this.rddi != null && digest != null) {
- String[] bwParts = bandwidthLine.split(" ");
- long bandwidthAvg = Long.parseLong(bwParts[1]);
- long bandwidthBurst = Long.parseLong(bwParts[2]);
- long bandwidthObserved = Long.parseLong(bwParts[3]);
- String platform = platformLine.substring("platform ".length());
- this.rddi.addServerDescriptor(digest, nickname, address, orPort,
- dirPort, relayIdentifier, bandwidthAvg, bandwidthBurst,
- bandwidthObserved, platform, published, uptime,
- extraInfoDigest, data);
- }
} else if (line.startsWith("extra-info ")) {
- String nickname = line.split(" ")[1];
String publishedTime = null, relayIdentifier = line.split(" ")[2];
long published = -1L;
- String dir = line.split(" ")[2];
- String statsEnd = null;
- long seconds = -1L;
- List<String> bandwidthHistory = new ArrayList<String>();
boolean skip = false;
while ((line = br.readLine()) != null) {
if (line.startsWith("published ")) {
publishedTime = line.substring("published ".length());
published = parseFormat.parse(publishedTime).getTime();
- } else if (line.startsWith("read-history ") ||
- line.startsWith("write-history ") ||
- line.startsWith("dirreq-read-history ") ||
- line.startsWith("dirreq-write-history ")) {
- bandwidthHistory.add(line);
- } else if (line.startsWith("dirreq-stats-end ")) {
- String[] parts = line.split(" ");
- if (parts.length < 5) {
- this.logger.warning("Could not parse dirreq-stats-end "
- + "line '" + line + "' in descriptor. Skipping.");
- break;
- }
- statsEnd = parts[1] + " " + parts[2];
- seconds = Long.parseLong(parts[3].substring(1));
- } else if (line.startsWith("dirreq-v3-reqs ")
- && line.length() > "dirreq-v3-reqs ".length()) {
- if (this.rddi != null) {
- try {
- int allUsers = 0;
- Map<String, String> obs = new HashMap<String, String>();
- String[] parts = line.substring("dirreq-v3-reqs ".
- length()).split(",");
- for (String p : parts) {
- String country = p.substring(0, 2);
- int users = Integer.parseInt(p.substring(3)) - 4;
- allUsers += users;
- obs.put(country, "" + users);
- }
- obs.put("zy", "" + allUsers);
- this.rddi.addDirReqStats(dir, statsEnd, seconds, obs);
- } catch (NumberFormatException e) {
- this.logger.log(Level.WARNING, "Could not parse "
- + "dirreq-v3-reqs line '" + line + "' in descriptor. "
- + "Skipping.", e);
- break;
- }
- }
- } else if (line.startsWith("conn-bi-direct ")) {
- if (this.rddi != null) {
- String[] parts = line.split(" ");
- if (parts.length == 6 &&
- parts[5].split(",").length == 4) {
- try {
- String connBiDirectStatsEnd = parts[1] + " " + parts[2];
- long connBiDirectSeconds = Long.parseLong(parts[3].
- substring(1));
- String[] parts2 = parts[5].split(",");
- long below = Long.parseLong(parts2[0]);
- long read = Long.parseLong(parts2[1]);
- long write = Long.parseLong(parts2[2]);
- long both = Long.parseLong(parts2[3]);
- this.rddi.addConnBiDirect(dir, connBiDirectStatsEnd,
- connBiDirectSeconds, below, read, write, both);
- } catch (NumberFormatException e) {
- this.logger.log(Level.WARNING, "Number format "
- + "exception while parsing conn-bi-direct stats "
- + "string '" + line + "'. Skipping.", e);
- }
- } else {
- this.logger.warning("Skipping invalid conn-bi-direct "
- + "stats string '" + line + "'.");
- }
- }
}
}
String ascii = new String(data, "US-ASCII");
@@ -387,10 +226,6 @@ public class RelayDescriptorParser {
this.rdd.haveParsedExtraInfoDescriptor(publishedTime,
relayIdentifier.toLowerCase(), digest);
}
- if (this.rddi != null && digest != null) {
- this.rddi.addExtraInfoDescriptor(digest, nickname,
- dir.toLowerCase(), published, data, bandwidthHistory);
- }
}
} catch (IOException e) {
this.logger.log(Level.WARNING, "Could not parse descriptor. "
@@ -401,3 +236,4 @@ public class RelayDescriptorParser {
}
}
}
+
More information about the tor-commits
mailing list