[tor-commits] [metrics-web/master] Split up ArchiveReader in two parts.

karsten at torproject.org karsten at torproject.org
Thu Dec 13 15:56:28 UTC 2012


commit af7f95dcc86328b558f4711a9b8e4b72688df27f
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Thu Dec 13 13:50:33 2012 +0100

    Split up ArchiveReader in two parts.
---
 src/org/torproject/ernie/cron/ArchiveReader.java   |  173 --------------------
 .../ernie/cron/BridgeStatsFileHandler.java         |   68 +++++++-
 src/org/torproject/ernie/cron/Main.java            |   15 ++-
 .../cron/RelayDescriptorDatabaseImporter.java      |  135 +++++++++++++++-
 4 files changed, 207 insertions(+), 184 deletions(-)

diff --git a/src/org/torproject/ernie/cron/ArchiveReader.java b/src/org/torproject/ernie/cron/ArchiveReader.java
deleted file mode 100644
index 9126787..0000000
--- a/src/org/torproject/ernie/cron/ArchiveReader.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/* Copyright 2011, 2012 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.ernie.cron;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
-
-import org.apache.commons.codec.DecoderException;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.torproject.descriptor.Descriptor;
-import org.torproject.descriptor.DescriptorFile;
-import org.torproject.descriptor.DescriptorReader;
-import org.torproject.descriptor.DescriptorSourceFactory;
-import org.torproject.descriptor.ExtraInfoDescriptor;
-import org.torproject.descriptor.NetworkStatusEntry;
-import org.torproject.descriptor.RelayNetworkStatusConsensus;
-import org.torproject.descriptor.ServerDescriptor;
-
-/**
- * Read in all files in a given directory and pass buffered readers of
- * them to the relay descriptor parser.
- */
-public class ArchiveReader {
-
-  /**
-   * Stats file handler that accepts parse results for bridge statistics.
-   */
-  private BridgeStatsFileHandler bsfh;
-
-  /**
-   * Relay descriptor database importer that stores relay descriptor
-   * contents for later evaluation.
-   */
-  private RelayDescriptorDatabaseImporter rddi;
-
-  /**
-   * Logger for this class.
-   */
-  private Logger logger;
-
-  public ArchiveReader(RelayDescriptorDatabaseImporter rddi,
-      BridgeStatsFileHandler bsfh, File archivesDirectory,
-      File statsDirectory, boolean keepImportHistory) {
-
-    if (archivesDirectory == null ||
-        statsDirectory == null) {
-      throw new IllegalArgumentException();
-    }
-
-    this.rddi = rddi;
-    this.bsfh = bsfh;
-
-    this.logger = Logger.getLogger(ArchiveReader.class.getName());
-    if (archivesDirectory.exists()) {
-      logger.fine("Importing files in directory " + archivesDirectory
-          + "/...");
-      DescriptorReader reader =
-          DescriptorSourceFactory.createDescriptorReader();
-      reader.addDirectory(archivesDirectory);
-      if (keepImportHistory) {
-        reader.setExcludeFiles(new File(statsDirectory,
-            "relay-descriptor-history"));
-      }
-      Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
-      while (descriptorFiles.hasNext()) {
-        DescriptorFile descriptorFile = descriptorFiles.next();
-        if (descriptorFile.getDescriptors() != null) {
-          for (Descriptor descriptor : descriptorFile.getDescriptors()) {
-            if (descriptor instanceof RelayNetworkStatusConsensus) {
-              this.addRelayNetworkStatusConsensus(
-                  (RelayNetworkStatusConsensus) descriptor);
-            } else if (descriptor instanceof ServerDescriptor) {
-              this.addServerDescriptor((ServerDescriptor) descriptor);
-            } else if (descriptor instanceof ExtraInfoDescriptor) {
-              this.addExtraInfoDescriptor(
-                  (ExtraInfoDescriptor) descriptor);
-            }
-          }
-        }
-      }
-    }
-
-    logger.info("Finished importing relay descriptors.");
-  }
-
-  private void addRelayNetworkStatusConsensus(
-      RelayNetworkStatusConsensus consensus) {
-    for (NetworkStatusEntry statusEntry :
-      consensus.getStatusEntries().values()) {
-      this.rddi.addStatusEntry(consensus.getValidAfterMillis(),
-          statusEntry.getNickname(),
-          statusEntry.getFingerprint().toLowerCase(),
-          statusEntry.getDescriptor().toLowerCase(),
-          statusEntry.getPublishedMillis(), statusEntry.getAddress(),
-          statusEntry.getOrPort(), statusEntry.getDirPort(),
-          statusEntry.getFlags(), statusEntry.getVersion(),
-          statusEntry.getBandwidth(), statusEntry.getPortList(),
-          statusEntry.getStatusEntryBytes());
-      try {
-        this.bsfh.addHashedRelay(DigestUtils.shaHex(Hex.decodeHex(
-            statusEntry.getFingerprint().toCharArray())).toUpperCase());
-      } catch (DecoderException e) {
-      }
-    }
-    this.rddi.addConsensus(consensus.getValidAfterMillis(),
-        consensus.getRawDescriptorBytes());
-  }
-
-  private void addServerDescriptor(ServerDescriptor descriptor) {
-    this.rddi.addServerDescriptor(descriptor.getServerDescriptorDigest(),
-        descriptor.getNickname(), descriptor.getAddress(),
-        descriptor.getOrPort(), descriptor.getDirPort(),
-        descriptor.getFingerprint(), descriptor.getBandwidthRate(),
-        descriptor.getBandwidthBurst(), descriptor.getBandwidthObserved(),
-        descriptor.getPlatform(), descriptor.getPublishedMillis(),
-        descriptor.getUptime(), descriptor.getExtraInfoDigest(),
-        descriptor.getRawDescriptorBytes());
-  }
-
-  private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) {
-    if (descriptor.getDirreqV3Reqs() != null) {
-      int allUsers = 0;
-      Map<String, String> obs = new HashMap<String, String>();
-      for (Map.Entry<String, Integer> e :
-          descriptor.getDirreqV3Reqs().entrySet()) {
-        String country = e.getKey();
-        int users = e.getValue() - 4;
-        allUsers += users;
-        obs.put(country, "" + users);
-      }
-      obs.put("zy", "" + allUsers);
-      this.rddi.addDirReqStats(descriptor.getFingerprint(),
-          descriptor.getDirreqStatsEndMillis(),
-          descriptor.getDirreqStatsIntervalLength(), obs);
-    }
-    if (descriptor.getConnBiDirectStatsEndMillis() >= 0L) {
-      this.rddi.addConnBiDirect(descriptor.getFingerprint(),
-          descriptor.getConnBiDirectStatsEndMillis(),
-          descriptor.getConnBiDirectStatsIntervalLength(),
-          descriptor.getConnBiDirectBelow(),
-          descriptor.getConnBiDirectRead(),
-          descriptor.getConnBiDirectWrite(),
-          descriptor.getConnBiDirectBoth());
-    }
-    List<String> bandwidthHistoryLines = new ArrayList<String>();
-    if (descriptor.getWriteHistory() != null) {
-      bandwidthHistoryLines.add(descriptor.getWriteHistory().getLine());
-    }
-    if (descriptor.getReadHistory() != null) {
-      bandwidthHistoryLines.add(descriptor.getReadHistory().getLine());
-    }
-    if (descriptor.getDirreqWriteHistory() != null) {
-      bandwidthHistoryLines.add(
-          descriptor.getDirreqWriteHistory().getLine());
-    }
-    if (descriptor.getDirreqReadHistory() != null) {
-      bandwidthHistoryLines.add(
-          descriptor.getDirreqReadHistory().getLine());
-    }
-    this.rddi.addExtraInfoDescriptor(descriptor.getExtraInfoDigest(),
-        descriptor.getNickname(),
-        descriptor.getFingerprint().toLowerCase(),
-        descriptor.getPublishedMillis(),
-        descriptor.getRawDescriptorBytes(), bandwidthHistoryLines);
-  }
-}
-
diff --git a/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java b/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java
index 4534d2f..085394c 100644
--- a/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java
+++ b/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java
@@ -29,11 +29,16 @@ import java.util.TreeSet;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.torproject.descriptor.Descriptor;
 import org.torproject.descriptor.DescriptorFile;
 import org.torproject.descriptor.DescriptorReader;
 import org.torproject.descriptor.DescriptorSourceFactory;
 import org.torproject.descriptor.ExtraInfoDescriptor;
+import org.torproject.descriptor.NetworkStatusEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
 import org.torproject.descriptor.ServerDescriptor;
 
 /**
@@ -117,7 +122,11 @@ public class BridgeStatsFileHandler {
 
   private File statsDirectory;
 
-  private boolean keepImportHistory;
+  private boolean keepBridgeDescriptorImportHistory;
+
+  private File archivesDirectory;
+
+  private boolean keepRelayDescriptorImportHistory;
 
   /**
    * Initializes this class, including reading in intermediate results
@@ -125,14 +134,21 @@ public class BridgeStatsFileHandler {
    * <code>stats/hashed-relay-identities</code>.
    */
   public BridgeStatsFileHandler(String connectionURL,
-      File bridgesDir, File statsDirectory, boolean keepImportHistory) {
+      File bridgesDir, File statsDirectory,
+      boolean keepBridgeDescriptorImportHistory, File archivesDirectory,
+      boolean keepRelayDescriptorImportHistory) {
 
-    if (bridgesDir == null || statsDirectory == null) {
-      throw new IllegalArgumentException();
+    if (bridgesDir == null || statsDirectory == null ||
+        archivesDirectory == null || statsDirectory == null) {
+        throw new IllegalArgumentException();
     }
     this.bridgesDir = bridgesDir;
     this.statsDirectory = statsDirectory;
-    this.keepImportHistory = keepImportHistory;
+    this.keepBridgeDescriptorImportHistory =
+        keepBridgeDescriptorImportHistory;
+    this.archivesDirectory = archivesDirectory;
+    this.keepRelayDescriptorImportHistory =
+        keepRelayDescriptorImportHistory;
 
     /* Initialize set of known countries. */
     this.countries = new TreeSet<String>();
@@ -356,7 +372,7 @@ public class BridgeStatsFileHandler {
       DescriptorReader reader =
           DescriptorSourceFactory.createDescriptorReader();
       reader.addDirectory(bridgesDir);
-      if (keepImportHistory) {
+      if (keepBridgeDescriptorImportHistory) {
         reader.setExcludeFiles(new File(statsDirectory,
             "bridge-descriptor-history"));
       }
@@ -424,6 +440,46 @@ public class BridgeStatsFileHandler {
     }
   }
 
+  public void importRelayDescriptors() {
+    if (archivesDirectory.exists()) {
+      logger.fine("Importing files in directory " + archivesDirectory
+          + "/...");
+      DescriptorReader reader =
+          DescriptorSourceFactory.createDescriptorReader();
+      reader.addDirectory(archivesDirectory);
+      if (keepRelayDescriptorImportHistory) {
+        reader.setExcludeFiles(new File(statsDirectory,
+            "relay-descriptor-history"));
+      }
+      Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
+      while (descriptorFiles.hasNext()) {
+        DescriptorFile descriptorFile = descriptorFiles.next();
+        if (descriptorFile.getDescriptors() != null) {
+          for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+            if (descriptor instanceof RelayNetworkStatusConsensus) {
+              this.addRelayNetworkStatusConsensus(
+                  (RelayNetworkStatusConsensus) descriptor);
+            }
+          }
+        }
+      }
+    }
+
+    logger.info("Finished importing relay descriptors.");
+  }
+
+  private void addRelayNetworkStatusConsensus(
+      RelayNetworkStatusConsensus consensus) {
+    for (NetworkStatusEntry statusEntry :
+      consensus.getStatusEntries().values()) {
+      try {
+        this.addHashedRelay(DigestUtils.shaHex(Hex.decodeHex(
+            statusEntry.getFingerprint().toCharArray())).toUpperCase());
+      } catch (DecoderException e) {
+      }
+    }
+  }
+
   /**
    * Writes the list of hashed relay identities and bridge user numbers as
    * observed by single bridges to disk, aggregates per-day statistics for
diff --git a/src/org/torproject/ernie/cron/Main.java b/src/org/torproject/ernie/cron/Main.java
index 457433f..a3f38e6 100644
--- a/src/org/torproject/ernie/cron/Main.java
+++ b/src/org/torproject/ernie/cron/Main.java
@@ -37,7 +37,9 @@ public class Main {
         new BridgeStatsFileHandler(
         config.getRelayDescriptorDatabaseJDBC(),
         new File(config.getSanitizedBridgesDirectory()),
-        statsDirectory, config.getKeepSanitizedBridgesImportHistory()) :
+        statsDirectory, config.getKeepSanitizedBridgesImportHistory(),
+        new File(config.getDirectoryArchivesDirectory()),
+        config.getKeepDirectoryArchiveImportHistory()) :
         null;
 
     // Import relay descriptors
@@ -49,11 +51,16 @@ public class Main {
           config.getWriteRelayDescriptorDatabase() ?
           config.getRelayDescriptorDatabaseJDBC() : null,
           config.getWriteRelayDescriptorsRawFiles() ?
-          config.getRelayDescriptorRawFilesDirectory() : null) : null;
-      new ArchiveReader(rddi, bsfh,
+          config.getRelayDescriptorRawFilesDirectory() : null,
           new File(config.getDirectoryArchivesDirectory()),
           statsDirectory,
-          config.getKeepDirectoryArchiveImportHistory());
+          config.getKeepDirectoryArchiveImportHistory()) : null;
+      if (rddi != null) {
+        rddi.importRelayDescriptors();
+      }
+      if (bsfh != null) {
+        bsfh.importRelayDescriptors();
+      }
       rddi.closeConnection();
     }
 
diff --git a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
index 3e7e694..c9d9bc4 100644
--- a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
@@ -16,8 +16,11 @@ import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,6 +31,14 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.postgresql.util.PGbytea;
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorFile;
+import org.torproject.descriptor.DescriptorReader;
+import org.torproject.descriptor.DescriptorSourceFactory;
+import org.torproject.descriptor.ExtraInfoDescriptor;
+import org.torproject.descriptor.NetworkStatusEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
+import org.torproject.descriptor.ServerDescriptor;
 
 /**
  * Parse directory data.
@@ -207,12 +218,25 @@ public final class RelayDescriptorDatabaseImporter {
   private boolean importIntoDatabase;
   private boolean writeRawImportFiles;
 
+  private File archivesDirectory;
+  private File statsDirectory;
+  private boolean keepImportHistory;
+
   /**
    * Initialize database importer by connecting to the database and
    * preparing statements.
    */
   public RelayDescriptorDatabaseImporter(String connectionURL,
-      String rawFilesDirectory) {
+      String rawFilesDirectory, File archivesDirectory,
+      File statsDirectory, boolean keepImportHistory) {
+
+    if (archivesDirectory == null ||
+        statsDirectory == null) {
+      throw new IllegalArgumentException();
+    }
+    this.archivesDirectory = archivesDirectory;
+    this.statsDirectory = statsDirectory;
+    this.keepImportHistory = keepImportHistory;
 
     /* Initialize logger. */
     this.logger = Logger.getLogger(
@@ -979,6 +1003,115 @@ public final class RelayDescriptorDatabaseImporter {
     }
   }
 
+  public void importRelayDescriptors() {
+    if (archivesDirectory.exists()) {
+      logger.fine("Importing files in directory " + archivesDirectory
+          + "/...");
+      DescriptorReader reader =
+          DescriptorSourceFactory.createDescriptorReader();
+      reader.addDirectory(archivesDirectory);
+      if (keepImportHistory) {
+        reader.setExcludeFiles(new File(statsDirectory,
+            "relay-descriptor-history"));
+      }
+      Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
+      while (descriptorFiles.hasNext()) {
+        DescriptorFile descriptorFile = descriptorFiles.next();
+        if (descriptorFile.getDescriptors() != null) {
+          for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+            if (descriptor instanceof RelayNetworkStatusConsensus) {
+              this.addRelayNetworkStatusConsensus(
+                  (RelayNetworkStatusConsensus) descriptor);
+            } else if (descriptor instanceof ServerDescriptor) {
+              this.addServerDescriptor((ServerDescriptor) descriptor);
+            } else if (descriptor instanceof ExtraInfoDescriptor) {
+              this.addExtraInfoDescriptor(
+                  (ExtraInfoDescriptor) descriptor);
+            }
+          }
+        }
+      }
+    }
+
+    logger.info("Finished importing relay descriptors.");
+  }
+
+  private void addRelayNetworkStatusConsensus(
+      RelayNetworkStatusConsensus consensus) {
+    for (NetworkStatusEntry statusEntry :
+      consensus.getStatusEntries().values()) {
+      this.addStatusEntry(consensus.getValidAfterMillis(),
+          statusEntry.getNickname(),
+          statusEntry.getFingerprint().toLowerCase(),
+          statusEntry.getDescriptor().toLowerCase(),
+          statusEntry.getPublishedMillis(), statusEntry.getAddress(),
+          statusEntry.getOrPort(), statusEntry.getDirPort(),
+          statusEntry.getFlags(), statusEntry.getVersion(),
+          statusEntry.getBandwidth(), statusEntry.getPortList(),
+          statusEntry.getStatusEntryBytes());
+    }
+    this.addConsensus(consensus.getValidAfterMillis(),
+        consensus.getRawDescriptorBytes());
+  }
+
+  private void addServerDescriptor(ServerDescriptor descriptor) {
+    this.addServerDescriptor(descriptor.getServerDescriptorDigest(),
+        descriptor.getNickname(), descriptor.getAddress(),
+        descriptor.getOrPort(), descriptor.getDirPort(),
+        descriptor.getFingerprint(), descriptor.getBandwidthRate(),
+        descriptor.getBandwidthBurst(), descriptor.getBandwidthObserved(),
+        descriptor.getPlatform(), descriptor.getPublishedMillis(),
+        descriptor.getUptime(), descriptor.getExtraInfoDigest(),
+        descriptor.getRawDescriptorBytes());
+  }
+
+  private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) {
+    if (descriptor.getDirreqV3Reqs() != null) {
+      int allUsers = 0;
+      Map<String, String> obs = new HashMap<String, String>();
+      for (Map.Entry<String, Integer> e :
+          descriptor.getDirreqV3Reqs().entrySet()) {
+        String country = e.getKey();
+        int users = e.getValue() - 4;
+        allUsers += users;
+        obs.put(country, "" + users);
+      }
+      obs.put("zy", "" + allUsers);
+      this.addDirReqStats(descriptor.getFingerprint(),
+          descriptor.getDirreqStatsEndMillis(),
+          descriptor.getDirreqStatsIntervalLength(), obs);
+    }
+    if (descriptor.getConnBiDirectStatsEndMillis() >= 0L) {
+      this.addConnBiDirect(descriptor.getFingerprint(),
+          descriptor.getConnBiDirectStatsEndMillis(),
+          descriptor.getConnBiDirectStatsIntervalLength(),
+          descriptor.getConnBiDirectBelow(),
+          descriptor.getConnBiDirectRead(),
+          descriptor.getConnBiDirectWrite(),
+          descriptor.getConnBiDirectBoth());
+    }
+    List<String> bandwidthHistoryLines = new ArrayList<String>();
+    if (descriptor.getWriteHistory() != null) {
+      bandwidthHistoryLines.add(descriptor.getWriteHistory().getLine());
+    }
+    if (descriptor.getReadHistory() != null) {
+      bandwidthHistoryLines.add(descriptor.getReadHistory().getLine());
+    }
+    if (descriptor.getDirreqWriteHistory() != null) {
+      bandwidthHistoryLines.add(
+          descriptor.getDirreqWriteHistory().getLine());
+    }
+    if (descriptor.getDirreqReadHistory() != null) {
+      bandwidthHistoryLines.add(
+          descriptor.getDirreqReadHistory().getLine());
+    }
+    this.addExtraInfoDescriptor(descriptor.getExtraInfoDigest(),
+        descriptor.getNickname(),
+        descriptor.getFingerprint().toLowerCase(),
+        descriptor.getPublishedMillis(),
+        descriptor.getRawDescriptorBytes(), bandwidthHistoryLines);
+  }
+
   /**
    * Close the relay descriptor database connection.
    */





More information about the tor-commits mailing list