[tor-commits] [collector/master] Circumvent Collection (integer) size limit.

karsten at torproject.org karsten at torproject.org
Mon Feb 26 13:19:46 UTC 2018


commit d05b4e4aee3bc15c3e4d5bac660dfcee5bc26279
Author: iwakeh <iwakeh at torproject.org>
Date:   Tue Feb 20 16:30:14 2018 +0000

    Circumvent Collection (integer) size limit.
    
    Clean log lines immediately when they are read and also make use of sanitized
    log's high redundancy immediately, i.e., continue with maps of
    <LocalDate, <Map<String, Long>>.
    
    Rename method(s) to reflect what they do.
---
 .../collector/webstats/SanitizeWeblogs.java        | 89 ++++++++++++++++------
 1 file changed, 65 insertions(+), 24 deletions(-)

diff --git a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
index 1f2e922..5a270dd 100644
--- a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
+++ b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
@@ -4,8 +4,10 @@
 package org.torproject.collector.webstats;
 
 import static java.util.stream.Collectors.counting;
+import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.groupingByConcurrent;
-import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.reducing;
+import static java.util.stream.Collectors.summingLong;
 
 import org.torproject.collector.conf.Configuration;
 import org.torproject.collector.conf.ConfigurationException;
@@ -35,6 +37,8 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -111,35 +115,36 @@ public class SanitizeWeblogs extends CollecTorMain {
           : virtualEntry.getValue().entrySet()) {
         String physicalHost = physicalEntry.getKey();
         log.info("Processing logs for {} on {}.", virtualHost, physicalHost);
-        Map<LocalDate, List<WebServerAccessLogLine>> linesByDate
+        Map<LocalDate, Map<String, Long>> linesByDate
             = physicalEntry.getValue().values().stream().parallel()
-            .flatMap((LogMetadata metadata) -> lineStream(metadata)
-               .filter((line) -> line.isValid())).parallel()
-            .collect(groupingByConcurrent(WebServerAccessLogLine::getDate));
+            .flatMap(metadata -> sanitzedLineStream(metadata).entrySet()
+            .stream())
+            .collect(groupingBy(Map.Entry::getKey,
+              reducing(Collections.emptyMap(), Map.Entry::getValue,
+                (e1, e2) -> Stream.concat(e1.entrySet().stream(), e2.entrySet()
+                  .stream())
+                  .collect(groupingByConcurrent(Map.Entry::getKey,
+                  summingLong(Map.Entry::getValue))))));
         LocalDate[] interval = determineInterval(linesByDate.keySet());
         linesByDate.entrySet().stream()
             .filter((entry) -> entry.getKey().isAfter(interval[0])
               && entry.getKey().isBefore(interval[1])).parallel()
-            .forEach((entry) -> storeSanitized(virtualHost, physicalHost,
+            .forEach((entry) -> storeSortedAndForget(virtualHost, physicalHost,
               entry.getKey(), entry.getValue()));
       }
     }
   }
 
-  private void storeSanitized(String virtualHost, String physicalHost,
-      LocalDate date, List<WebServerAccessLogLine> lines) {
+  private void storeSortedAndForget(String virtualHost, String physicalHost,
+      LocalDate date, Map<String, Long> lineCounts) {
     String name = new StringJoiner(InternalLogDescriptor.SEP)
         .add(virtualHost).add(physicalHost)
         .add(InternalWebServerAccessLog.MARKER)
         .add(date.format(DateTimeFormatter.BASIC_ISO_DATE))
         .toString() + "." + FileType.XZ.name().toLowerCase();
-    log.debug("Sanitizing {}.", name);
-    Map<String, Long> retainedLines = new TreeMap<>(lines
-        .stream().parallel().map((line) -> sanitize(line, date))
-        .filter((line) -> line.isPresent())
-        .map((line) -> line.get())
-        .collect(groupingByConcurrent(line -> line, counting())));
-    lines.clear(); // not needed anymore
+    log.debug("Storing {}.", name);
+    Map<String, Long> retainedLines = new TreeMap<>(lineCounts);
+    lineCounts.clear(); // not needed anymore
     try {
       WebServerAccessLogPersistence walp
           = new WebServerAccessLogPersistence(
@@ -187,8 +192,8 @@ public class SanitizeWeblogs extends CollecTorMain {
         .collect(Collectors.joining("\n", "", "\n")).getBytes();
   }
 
-  static Optional<String> sanitize(WebServerAccessLogLine logLine,
-      LocalDate date) {
+  static Optional<WebServerAccessLogLine>
+      sanitize(WebServerAccessLogLine logLine) {
     if (!logLine.isValid()
         || !(Method.GET == logLine.getMethod()
              || Method.HEAD == logLine.getMethod())
@@ -203,10 +208,13 @@ public class SanitizeWeblogs extends CollecTorMain {
     if (queryStart > 0) {
       logLine.setRequest(logLine.getRequest().substring(0, queryStart));
     }
-    return Optional.of(logLine.toLogString());
+    return Optional.of(logLine);
   }
 
   LocalDate[] determineInterval(Set<LocalDate> dates) {
+    if (dates.isEmpty()) { // return the empty interval
+      return new LocalDate[]{LocalDate.MAX, LocalDate.MIN};
+    }
     SortedSet<LocalDate> sorted = new TreeSet<>();
     sorted.addAll(dates);
     if (this.limits) {
@@ -214,7 +222,7 @@ public class SanitizeWeblogs extends CollecTorMain {
         sorted.remove(sorted.last());
       }
     }
-    if (sorted.isEmpty()) {
+    if (sorted.isEmpty()) { // return the empty interval
       return new LocalDate[]{LocalDate.MAX, LocalDate.MIN};
     }
     if (!this.limits) {
@@ -224,18 +232,51 @@ public class SanitizeWeblogs extends CollecTorMain {
     return new LocalDate[]{sorted.first(), sorted.last()};
   }
 
-  private Stream<WebServerAccessLogLine> lineStream(LogMetadata metadata) {
+  private static final int LISTLIMIT = Integer.MAX_VALUE / 2;
+
+  private Map<LocalDate, Map<String, Long>>
+      sanitzedLineStream(LogMetadata metadata) {
     log.debug("Processing file {}.", metadata.path);
     try (BufferedReader br
         = new BufferedReader(new InputStreamReader(
          metadata.fileType.decompress(Files.newInputStream(metadata.path))))) {
-      return br.lines()
-          .map((String line) -> WebServerAccessLogLine.makeLine(line))
-          .collect(toList()).stream();
+      List<List<WebServerAccessLogLine>> lists = new ArrayList<>();
+      List<WebServerAccessLogLine> currentList = new ArrayList<>();
+      lists.add(currentList);
+      String lineStr = br.readLine();
+      int count = 0;
+      while (null != lineStr) {
+        WebServerAccessLogLine wsal = WebServerAccessLogLine.makeLine(lineStr);
+        if (wsal.isValid()) {
+          currentList.add(wsal);
+          count++;
+        }
+        if (count >= LISTLIMIT) {
+          currentList = new ArrayList<>();
+          lists.add(currentList);
+          count = 0;
+        }
+        lineStr = br.readLine();
+      }
+      br.close();
+      return lists.parallelStream()
+          .map(list -> list.stream()
+              .map(line -> sanitize(line))
+              .filter(line -> line.isPresent())
+              .map(line -> line.get())
+              .collect(groupingBy(WebServerAccessLogLine::getDate,
+                  groupingBy(WebServerAccessLogLine::toLogString, counting()))))
+          .flatMap(map -> map.entrySet().stream()).parallel()
+          .collect(groupingByConcurrent(Map.Entry::getKey,
+              reducing(Collections.emptyMap(), Map.Entry::getValue,
+                (e1, e2) -> Stream.concat(e1.entrySet().stream(),
+                    e2.entrySet().stream()).parallel()
+                    .collect(groupingByConcurrent(Map.Entry::getKey,
+                        summingLong(Map.Entry::getValue))))));
     } catch (Exception ex) {
       log.debug("Skipping log-file {}.", metadata.path, ex);
     }
-    return Stream.empty();
+    return Collections.emptyMap();
   }
 
 }



More information about the tor-commits mailing list