[tor-commits] [collector/master] Refactor ArchiveReader.
karsten at torproject.org
karsten at torproject.org
Mon Aug 27 12:34:23 UTC 2018
commit 1caca7c1f4786ef31207b42ed8298998c989487b
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date: Mon Aug 20 21:08:19 2018 +0200
Refactor ArchiveReader.
---
.../collector/relaydescs/ArchiveReader.java | 100 +++++++++++++--------
.../collector/relaydescs/ArchiveWriter.java | 3 +-
2 files changed, 66 insertions(+), 37 deletions(-)
diff --git a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java
index 74700f7..7c59054 100644
--- a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java
+++ b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java
@@ -46,30 +46,52 @@ public class ArchiveReader {
private Map<String, Set<String>> microdescriptorValidAfterTimes =
new HashMap<>();
- /** Reads all descriptors from the given directory, possibly using a
- * parse history file, and passes them to the given descriptor
- * parser. */
- public ArchiveReader(RelayDescriptorParser rdp, File archivesDirectory,
- File statsDirectory, boolean keepImportHistory) {
+ private RelayDescriptorParser rdp;
+
+ private File archivesDirectory;
+
+ private boolean keepImportHistory;
+ private int parsedFiles = 0;
+
+ private int ignoredFiles = 0;
+
+ private SortedSet<String> archivesImportHistory = new TreeSet<>();
+
+ private File archivesImportHistoryFile;
+
+ /** Initializes an archive reader but without reading any descriptors yet. */
+ ArchiveReader(RelayDescriptorParser rdp, File archivesDirectory,
+ File statsDirectory, boolean keepImportHistory) {
if (rdp == null || archivesDirectory == null
|| statsDirectory == null) {
throw new IllegalArgumentException();
}
-
- rdp.setArchiveReader(this);
- int parsedFiles = 0;
- int ignoredFiles = 0;
- SortedSet<String> archivesImportHistory = new TreeSet<>();
- File archivesImportHistoryFile = new File(statsDirectory,
+ this.rdp = rdp;
+ this.rdp.setArchiveReader(this);
+ this.archivesDirectory = archivesDirectory;
+ this.keepImportHistory = keepImportHistory;
+ this.archivesImportHistoryFile = new File(statsDirectory,
"archives-import-history");
- if (keepImportHistory && archivesImportHistoryFile.exists()) {
+ }
+
+ /** Reads all descriptors from the given directory, possibly using a
+ * parse history file, and passes them to the given descriptor
+ * parser. */
+ public void readDescriptors() {
+ this.readHistoryFile();
+ this.readDescriptorFiles();
+ this.writeHistoryFile();
+ }
+
+ private void readHistoryFile() {
+ if (this.keepImportHistory && this.archivesImportHistoryFile.exists()) {
try {
BufferedReader br = new BufferedReader(new FileReader(
- archivesImportHistoryFile));
+ this.archivesImportHistoryFile));
String line;
while ((line = br.readLine()) != null) {
- archivesImportHistory.add(line);
+ this.archivesImportHistory.add(line);
}
br.close();
} catch (IOException e) {
@@ -77,11 +99,14 @@ public class ArchiveReader {
+ "history file. Skipping.", e);
}
}
- if (archivesDirectory.exists()) {
- logger.debug("Importing files in directory " + archivesDirectory
+ }
+
+ private void readDescriptorFiles() {
+ if (this.archivesDirectory.exists()) {
+ logger.debug("Importing files in directory " + this.archivesDirectory
+ "/...");
Stack<File> filesInInputDir = new Stack<>();
- filesInInputDir.add(archivesDirectory);
+ filesInInputDir.add(this.archivesDirectory);
List<File> problems = new ArrayList<>();
Set<File> filesToRetry = new HashSet<>();
while (!filesInInputDir.isEmpty()) {
@@ -91,9 +116,9 @@ public class ArchiveReader {
} else {
try {
BufferedInputStream bis;
- if (keepImportHistory
- && archivesImportHistory.contains(pop.getName())) {
- ignoredFiles++;
+ if (this.keepImportHistory
+ && this.archivesImportHistory.contains(pop.getName())) {
+ this.ignoredFiles++;
continue;
} else if (pop.getName().endsWith(".tar.bz2")) {
logger.warn("Cannot parse compressed tarball "
@@ -116,15 +141,15 @@ public class ArchiveReader {
}
bis.close();
byte[] allData = baos.toByteArray();
- boolean stored = rdp.parse(allData);
+ boolean stored = this.rdp.parse(allData);
if (!stored) {
filesToRetry.add(pop);
continue;
}
- if (keepImportHistory) {
- archivesImportHistory.add(pop.getName());
+ if (this.keepImportHistory) {
+ this.archivesImportHistory.add(pop.getName());
}
- parsedFiles++;
+ this.parsedFiles++;
} catch (IOException e) {
problems.add(pop);
if (problems.size() > 3) {
@@ -219,10 +244,10 @@ public class ArchiveReader {
}
}
}
- if (keepImportHistory) {
- archivesImportHistory.add(pop.getName());
+ if (this.keepImportHistory) {
+ this.archivesImportHistory.add(pop.getName());
}
- parsedFiles++;
+ this.parsedFiles++;
} catch (IOException e) {
problems.add(pop);
if (problems.size() > 3) {
@@ -232,10 +257,10 @@ public class ArchiveReader {
}
if (problems.isEmpty()) {
logger.debug("Finished importing files in directory "
- + archivesDirectory + "/.");
+ + this.archivesDirectory + "/.");
} else {
StringBuilder sb = new StringBuilder("Failed importing files in "
- + "directory " + archivesDirectory + "/:");
+ + "directory " + this.archivesDirectory + "/:");
int printed = 0;
for (File f : problems) {
sb.append("\n ").append(f.getAbsolutePath());
@@ -246,12 +271,15 @@ public class ArchiveReader {
}
}
}
- if (keepImportHistory) {
+ }
+
+ private void writeHistoryFile() {
+ if (this.keepImportHistory) {
try {
- archivesImportHistoryFile.getParentFile().mkdirs();
+ this.archivesImportHistoryFile.getParentFile().mkdirs();
BufferedWriter bw = new BufferedWriter(new FileWriter(
- archivesImportHistoryFile));
- for (String line : archivesImportHistory) {
+ this.archivesImportHistoryFile));
+ for (String line : this.archivesImportHistory) {
bw.write(line + "\n");
}
bw.close();
@@ -261,15 +289,15 @@ public class ArchiveReader {
}
}
logger.info("Finished importing relay descriptors from local "
- + "directory:\nParsed " + parsedFiles + ", ignored "
- + ignoredFiles + " files.");
+ + "directory:\nParsed " + this.parsedFiles + ", ignored "
+ + this.ignoredFiles + " files.");
}
/** Stores the valid-after time and microdescriptor digests of a given
* microdesc consensus, so that microdescriptors (which don't contain a
* publication time) can later be sorted into the correct month
* folders. */
- public void haveParsedMicrodescConsensus(String validAfterTime,
+ void haveParsedMicrodescConsensus(String validAfterTime,
SortedSet<String> microdescriptorDigests) {
for (String microdescriptor : microdescriptorDigests) {
if (!this.microdescriptorValidAfterTimes.containsKey(
diff --git a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveWriter.java b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveWriter.java
index ac3f5e3..8679439 100644
--- a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveWriter.java
+++ b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveWriter.java
@@ -169,7 +169,8 @@ public class ArchiveWriter extends CollecTorMain {
new ArchiveReader(rdp,
config.getPath(Key.RelayLocalOrigins).toFile(),
statsDirectory,
- config.getBool(Key.KeepDirectoryArchiveImportHistory));
+ config.getBool(Key.KeepDirectoryArchiveImportHistory))
+ .readDescriptors();
this.intermediateStats("importing relay descriptors from local "
+ "directory");
}
More information about the tor-commits
mailing list