[tor-commits] [metrics-web/master] Split up ArchiveReader in two parts.
karsten at torproject.org
karsten at torproject.org
Thu Dec 13 15:56:28 UTC 2012
commit af7f95dcc86328b558f4711a9b8e4b72688df27f
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date: Thu Dec 13 13:50:33 2012 +0100
Split up ArchiveReader in two parts.
---
src/org/torproject/ernie/cron/ArchiveReader.java | 173 --------------------
.../ernie/cron/BridgeStatsFileHandler.java | 68 +++++++-
src/org/torproject/ernie/cron/Main.java | 15 ++-
.../cron/RelayDescriptorDatabaseImporter.java | 135 +++++++++++++++-
4 files changed, 207 insertions(+), 184 deletions(-)
diff --git a/src/org/torproject/ernie/cron/ArchiveReader.java b/src/org/torproject/ernie/cron/ArchiveReader.java
deleted file mode 100644
index 9126787..0000000
--- a/src/org/torproject/ernie/cron/ArchiveReader.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/* Copyright 2011, 2012 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.ernie.cron;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
-
-import org.apache.commons.codec.DecoderException;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.torproject.descriptor.Descriptor;
-import org.torproject.descriptor.DescriptorFile;
-import org.torproject.descriptor.DescriptorReader;
-import org.torproject.descriptor.DescriptorSourceFactory;
-import org.torproject.descriptor.ExtraInfoDescriptor;
-import org.torproject.descriptor.NetworkStatusEntry;
-import org.torproject.descriptor.RelayNetworkStatusConsensus;
-import org.torproject.descriptor.ServerDescriptor;
-
-/**
- * Read in all files in a given directory and pass buffered readers of
- * them to the relay descriptor parser.
- */
-public class ArchiveReader {
-
- /**
- * Stats file handler that accepts parse results for bridge statistics.
- */
- private BridgeStatsFileHandler bsfh;
-
- /**
- * Relay descriptor database importer that stores relay descriptor
- * contents for later evaluation.
- */
- private RelayDescriptorDatabaseImporter rddi;
-
- /**
- * Logger for this class.
- */
- private Logger logger;
-
- public ArchiveReader(RelayDescriptorDatabaseImporter rddi,
- BridgeStatsFileHandler bsfh, File archivesDirectory,
- File statsDirectory, boolean keepImportHistory) {
-
- if (archivesDirectory == null ||
- statsDirectory == null) {
- throw new IllegalArgumentException();
- }
-
- this.rddi = rddi;
- this.bsfh = bsfh;
-
- this.logger = Logger.getLogger(ArchiveReader.class.getName());
- if (archivesDirectory.exists()) {
- logger.fine("Importing files in directory " + archivesDirectory
- + "/...");
- DescriptorReader reader =
- DescriptorSourceFactory.createDescriptorReader();
- reader.addDirectory(archivesDirectory);
- if (keepImportHistory) {
- reader.setExcludeFiles(new File(statsDirectory,
- "relay-descriptor-history"));
- }
- Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
- while (descriptorFiles.hasNext()) {
- DescriptorFile descriptorFile = descriptorFiles.next();
- if (descriptorFile.getDescriptors() != null) {
- for (Descriptor descriptor : descriptorFile.getDescriptors()) {
- if (descriptor instanceof RelayNetworkStatusConsensus) {
- this.addRelayNetworkStatusConsensus(
- (RelayNetworkStatusConsensus) descriptor);
- } else if (descriptor instanceof ServerDescriptor) {
- this.addServerDescriptor((ServerDescriptor) descriptor);
- } else if (descriptor instanceof ExtraInfoDescriptor) {
- this.addExtraInfoDescriptor(
- (ExtraInfoDescriptor) descriptor);
- }
- }
- }
- }
- }
-
- logger.info("Finished importing relay descriptors.");
- }
-
- private void addRelayNetworkStatusConsensus(
- RelayNetworkStatusConsensus consensus) {
- for (NetworkStatusEntry statusEntry :
- consensus.getStatusEntries().values()) {
- this.rddi.addStatusEntry(consensus.getValidAfterMillis(),
- statusEntry.getNickname(),
- statusEntry.getFingerprint().toLowerCase(),
- statusEntry.getDescriptor().toLowerCase(),
- statusEntry.getPublishedMillis(), statusEntry.getAddress(),
- statusEntry.getOrPort(), statusEntry.getDirPort(),
- statusEntry.getFlags(), statusEntry.getVersion(),
- statusEntry.getBandwidth(), statusEntry.getPortList(),
- statusEntry.getStatusEntryBytes());
- try {
- this.bsfh.addHashedRelay(DigestUtils.shaHex(Hex.decodeHex(
- statusEntry.getFingerprint().toCharArray())).toUpperCase());
- } catch (DecoderException e) {
- }
- }
- this.rddi.addConsensus(consensus.getValidAfterMillis(),
- consensus.getRawDescriptorBytes());
- }
-
- private void addServerDescriptor(ServerDescriptor descriptor) {
- this.rddi.addServerDescriptor(descriptor.getServerDescriptorDigest(),
- descriptor.getNickname(), descriptor.getAddress(),
- descriptor.getOrPort(), descriptor.getDirPort(),
- descriptor.getFingerprint(), descriptor.getBandwidthRate(),
- descriptor.getBandwidthBurst(), descriptor.getBandwidthObserved(),
- descriptor.getPlatform(), descriptor.getPublishedMillis(),
- descriptor.getUptime(), descriptor.getExtraInfoDigest(),
- descriptor.getRawDescriptorBytes());
- }
-
- private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) {
- if (descriptor.getDirreqV3Reqs() != null) {
- int allUsers = 0;
- Map<String, String> obs = new HashMap<String, String>();
- for (Map.Entry<String, Integer> e :
- descriptor.getDirreqV3Reqs().entrySet()) {
- String country = e.getKey();
- int users = e.getValue() - 4;
- allUsers += users;
- obs.put(country, "" + users);
- }
- obs.put("zy", "" + allUsers);
- this.rddi.addDirReqStats(descriptor.getFingerprint(),
- descriptor.getDirreqStatsEndMillis(),
- descriptor.getDirreqStatsIntervalLength(), obs);
- }
- if (descriptor.getConnBiDirectStatsEndMillis() >= 0L) {
- this.rddi.addConnBiDirect(descriptor.getFingerprint(),
- descriptor.getConnBiDirectStatsEndMillis(),
- descriptor.getConnBiDirectStatsIntervalLength(),
- descriptor.getConnBiDirectBelow(),
- descriptor.getConnBiDirectRead(),
- descriptor.getConnBiDirectWrite(),
- descriptor.getConnBiDirectBoth());
- }
- List<String> bandwidthHistoryLines = new ArrayList<String>();
- if (descriptor.getWriteHistory() != null) {
- bandwidthHistoryLines.add(descriptor.getWriteHistory().getLine());
- }
- if (descriptor.getReadHistory() != null) {
- bandwidthHistoryLines.add(descriptor.getReadHistory().getLine());
- }
- if (descriptor.getDirreqWriteHistory() != null) {
- bandwidthHistoryLines.add(
- descriptor.getDirreqWriteHistory().getLine());
- }
- if (descriptor.getDirreqReadHistory() != null) {
- bandwidthHistoryLines.add(
- descriptor.getDirreqReadHistory().getLine());
- }
- this.rddi.addExtraInfoDescriptor(descriptor.getExtraInfoDigest(),
- descriptor.getNickname(),
- descriptor.getFingerprint().toLowerCase(),
- descriptor.getPublishedMillis(),
- descriptor.getRawDescriptorBytes(), bandwidthHistoryLines);
- }
-}
-
diff --git a/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java b/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java
index 4534d2f..085394c 100644
--- a/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java
+++ b/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java
@@ -29,11 +29,16 @@ import java.util.TreeSet;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
import org.torproject.descriptor.Descriptor;
import org.torproject.descriptor.DescriptorFile;
import org.torproject.descriptor.DescriptorReader;
import org.torproject.descriptor.DescriptorSourceFactory;
import org.torproject.descriptor.ExtraInfoDescriptor;
+import org.torproject.descriptor.NetworkStatusEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
import org.torproject.descriptor.ServerDescriptor;
/**
@@ -117,7 +122,11 @@ public class BridgeStatsFileHandler {
private File statsDirectory;
- private boolean keepImportHistory;
+ private boolean keepBridgeDescriptorImportHistory;
+
+ private File archivesDirectory;
+
+ private boolean keepRelayDescriptorImportHistory;
/**
* Initializes this class, including reading in intermediate results
@@ -125,14 +134,21 @@ public class BridgeStatsFileHandler {
* <code>stats/hashed-relay-identities</code>.
*/
public BridgeStatsFileHandler(String connectionURL,
- File bridgesDir, File statsDirectory, boolean keepImportHistory) {
+ File bridgesDir, File statsDirectory,
+ boolean keepBridgeDescriptorImportHistory, File archivesDirectory,
+ boolean keepRelayDescriptorImportHistory) {
- if (bridgesDir == null || statsDirectory == null) {
- throw new IllegalArgumentException();
+ if (bridgesDir == null || statsDirectory == null ||
+ archivesDirectory == null || statsDirectory == null) {
+ throw new IllegalArgumentException();
}
this.bridgesDir = bridgesDir;
this.statsDirectory = statsDirectory;
- this.keepImportHistory = keepImportHistory;
+ this.keepBridgeDescriptorImportHistory =
+ keepBridgeDescriptorImportHistory;
+ this.archivesDirectory = archivesDirectory;
+ this.keepRelayDescriptorImportHistory =
+ keepRelayDescriptorImportHistory;
/* Initialize set of known countries. */
this.countries = new TreeSet<String>();
@@ -356,7 +372,7 @@ public class BridgeStatsFileHandler {
DescriptorReader reader =
DescriptorSourceFactory.createDescriptorReader();
reader.addDirectory(bridgesDir);
- if (keepImportHistory) {
+ if (keepBridgeDescriptorImportHistory) {
reader.setExcludeFiles(new File(statsDirectory,
"bridge-descriptor-history"));
}
@@ -424,6 +440,46 @@ public class BridgeStatsFileHandler {
}
}
+ public void importRelayDescriptors() {
+ if (archivesDirectory.exists()) {
+ logger.fine("Importing files in directory " + archivesDirectory
+ + "/...");
+ DescriptorReader reader =
+ DescriptorSourceFactory.createDescriptorReader();
+ reader.addDirectory(archivesDirectory);
+ if (keepRelayDescriptorImportHistory) {
+ reader.setExcludeFiles(new File(statsDirectory,
+ "relay-descriptor-history"));
+ }
+ Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
+ while (descriptorFiles.hasNext()) {
+ DescriptorFile descriptorFile = descriptorFiles.next();
+ if (descriptorFile.getDescriptors() != null) {
+ for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+ if (descriptor instanceof RelayNetworkStatusConsensus) {
+ this.addRelayNetworkStatusConsensus(
+ (RelayNetworkStatusConsensus) descriptor);
+ }
+ }
+ }
+ }
+ }
+
+ logger.info("Finished importing relay descriptors.");
+ }
+
+ private void addRelayNetworkStatusConsensus(
+ RelayNetworkStatusConsensus consensus) {
+ for (NetworkStatusEntry statusEntry :
+ consensus.getStatusEntries().values()) {
+ try {
+ this.addHashedRelay(DigestUtils.shaHex(Hex.decodeHex(
+ statusEntry.getFingerprint().toCharArray())).toUpperCase());
+ } catch (DecoderException e) {
+ }
+ }
+ }
+
/**
* Writes the list of hashed relay identities and bridge user numbers as
* observed by single bridges to disk, aggregates per-day statistics for
diff --git a/src/org/torproject/ernie/cron/Main.java b/src/org/torproject/ernie/cron/Main.java
index 457433f..a3f38e6 100644
--- a/src/org/torproject/ernie/cron/Main.java
+++ b/src/org/torproject/ernie/cron/Main.java
@@ -37,7 +37,9 @@ public class Main {
new BridgeStatsFileHandler(
config.getRelayDescriptorDatabaseJDBC(),
new File(config.getSanitizedBridgesDirectory()),
- statsDirectory, config.getKeepSanitizedBridgesImportHistory()) :
+ statsDirectory, config.getKeepSanitizedBridgesImportHistory(),
+ new File(config.getDirectoryArchivesDirectory()),
+ config.getKeepDirectoryArchiveImportHistory()) :
null;
// Import relay descriptors
@@ -49,11 +51,16 @@ public class Main {
config.getWriteRelayDescriptorDatabase() ?
config.getRelayDescriptorDatabaseJDBC() : null,
config.getWriteRelayDescriptorsRawFiles() ?
- config.getRelayDescriptorRawFilesDirectory() : null) : null;
- new ArchiveReader(rddi, bsfh,
+ config.getRelayDescriptorRawFilesDirectory() : null,
new File(config.getDirectoryArchivesDirectory()),
statsDirectory,
- config.getKeepDirectoryArchiveImportHistory());
+ config.getKeepDirectoryArchiveImportHistory()) : null;
+ if (rddi != null) {
+ rddi.importRelayDescriptors();
+ }
+ if (bsfh != null) {
+ bsfh.importRelayDescriptors();
+ }
rddi.closeConnection();
}
diff --git a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
index 3e7e694..c9d9bc4 100644
--- a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
@@ -16,8 +16,11 @@ import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Calendar;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -28,6 +31,14 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.postgresql.util.PGbytea;
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorFile;
+import org.torproject.descriptor.DescriptorReader;
+import org.torproject.descriptor.DescriptorSourceFactory;
+import org.torproject.descriptor.ExtraInfoDescriptor;
+import org.torproject.descriptor.NetworkStatusEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
+import org.torproject.descriptor.ServerDescriptor;
/**
* Parse directory data.
@@ -207,12 +218,25 @@ public final class RelayDescriptorDatabaseImporter {
private boolean importIntoDatabase;
private boolean writeRawImportFiles;
+ private File archivesDirectory;
+ private File statsDirectory;
+ private boolean keepImportHistory;
+
/**
* Initialize database importer by connecting to the database and
* preparing statements.
*/
public RelayDescriptorDatabaseImporter(String connectionURL,
- String rawFilesDirectory) {
+ String rawFilesDirectory, File archivesDirectory,
+ File statsDirectory, boolean keepImportHistory) {
+
+ if (archivesDirectory == null ||
+ statsDirectory == null) {
+ throw new IllegalArgumentException();
+ }
+ this.archivesDirectory = archivesDirectory;
+ this.statsDirectory = statsDirectory;
+ this.keepImportHistory = keepImportHistory;
/* Initialize logger. */
this.logger = Logger.getLogger(
@@ -979,6 +1003,115 @@ public final class RelayDescriptorDatabaseImporter {
}
}
+ public void importRelayDescriptors() {
+ if (archivesDirectory.exists()) {
+ logger.fine("Importing files in directory " + archivesDirectory
+ + "/...");
+ DescriptorReader reader =
+ DescriptorSourceFactory.createDescriptorReader();
+ reader.addDirectory(archivesDirectory);
+ if (keepImportHistory) {
+ reader.setExcludeFiles(new File(statsDirectory,
+ "relay-descriptor-history"));
+ }
+ Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
+ while (descriptorFiles.hasNext()) {
+ DescriptorFile descriptorFile = descriptorFiles.next();
+ if (descriptorFile.getDescriptors() != null) {
+ for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+ if (descriptor instanceof RelayNetworkStatusConsensus) {
+ this.addRelayNetworkStatusConsensus(
+ (RelayNetworkStatusConsensus) descriptor);
+ } else if (descriptor instanceof ServerDescriptor) {
+ this.addServerDescriptor((ServerDescriptor) descriptor);
+ } else if (descriptor instanceof ExtraInfoDescriptor) {
+ this.addExtraInfoDescriptor(
+ (ExtraInfoDescriptor) descriptor);
+ }
+ }
+ }
+ }
+ }
+
+ logger.info("Finished importing relay descriptors.");
+ }
+
+ private void addRelayNetworkStatusConsensus(
+ RelayNetworkStatusConsensus consensus) {
+ for (NetworkStatusEntry statusEntry :
+ consensus.getStatusEntries().values()) {
+ this.addStatusEntry(consensus.getValidAfterMillis(),
+ statusEntry.getNickname(),
+ statusEntry.getFingerprint().toLowerCase(),
+ statusEntry.getDescriptor().toLowerCase(),
+ statusEntry.getPublishedMillis(), statusEntry.getAddress(),
+ statusEntry.getOrPort(), statusEntry.getDirPort(),
+ statusEntry.getFlags(), statusEntry.getVersion(),
+ statusEntry.getBandwidth(), statusEntry.getPortList(),
+ statusEntry.getStatusEntryBytes());
+ }
+ this.addConsensus(consensus.getValidAfterMillis(),
+ consensus.getRawDescriptorBytes());
+ }
+
+ private void addServerDescriptor(ServerDescriptor descriptor) {
+ this.addServerDescriptor(descriptor.getServerDescriptorDigest(),
+ descriptor.getNickname(), descriptor.getAddress(),
+ descriptor.getOrPort(), descriptor.getDirPort(),
+ descriptor.getFingerprint(), descriptor.getBandwidthRate(),
+ descriptor.getBandwidthBurst(), descriptor.getBandwidthObserved(),
+ descriptor.getPlatform(), descriptor.getPublishedMillis(),
+ descriptor.getUptime(), descriptor.getExtraInfoDigest(),
+ descriptor.getRawDescriptorBytes());
+ }
+
+ private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) {
+ if (descriptor.getDirreqV3Reqs() != null) {
+ int allUsers = 0;
+ Map<String, String> obs = new HashMap<String, String>();
+ for (Map.Entry<String, Integer> e :
+ descriptor.getDirreqV3Reqs().entrySet()) {
+ String country = e.getKey();
+ int users = e.getValue() - 4;
+ allUsers += users;
+ obs.put(country, "" + users);
+ }
+ obs.put("zy", "" + allUsers);
+ this.addDirReqStats(descriptor.getFingerprint(),
+ descriptor.getDirreqStatsEndMillis(),
+ descriptor.getDirreqStatsIntervalLength(), obs);
+ }
+ if (descriptor.getConnBiDirectStatsEndMillis() >= 0L) {
+ this.addConnBiDirect(descriptor.getFingerprint(),
+ descriptor.getConnBiDirectStatsEndMillis(),
+ descriptor.getConnBiDirectStatsIntervalLength(),
+ descriptor.getConnBiDirectBelow(),
+ descriptor.getConnBiDirectRead(),
+ descriptor.getConnBiDirectWrite(),
+ descriptor.getConnBiDirectBoth());
+ }
+ List<String> bandwidthHistoryLines = new ArrayList<String>();
+ if (descriptor.getWriteHistory() != null) {
+ bandwidthHistoryLines.add(descriptor.getWriteHistory().getLine());
+ }
+ if (descriptor.getReadHistory() != null) {
+ bandwidthHistoryLines.add(descriptor.getReadHistory().getLine());
+ }
+ if (descriptor.getDirreqWriteHistory() != null) {
+ bandwidthHistoryLines.add(
+ descriptor.getDirreqWriteHistory().getLine());
+ }
+ if (descriptor.getDirreqReadHistory() != null) {
+ bandwidthHistoryLines.add(
+ descriptor.getDirreqReadHistory().getLine());
+ }
+ this.addExtraInfoDescriptor(descriptor.getExtraInfoDigest(),
+ descriptor.getNickname(),
+ descriptor.getFingerprint().toLowerCase(),
+ descriptor.getPublishedMillis(),
+ descriptor.getRawDescriptorBytes(), bandwidthHistoryLines);
+ }
+
/**
* Close the relay descriptor database connection.
*/
More information about the tor-commits
mailing list