[tor-commits] [collector/master] Circumvent Collection (integer) size limit.
karsten at torproject.org
karsten at torproject.org
Mon Feb 26 13:19:46 UTC 2018
commit d05b4e4aee3bc15c3e4d5bac660dfcee5bc26279
Author: iwakeh <iwakeh at torproject.org>
Date: Tue Feb 20 16:30:14 2018 +0000
Circumvent Collection (integer) size limit.
Clean log lines immediately when they are read and also make use of sanitized
log's high redundancy immediately, i.e., continue with maps of
<LocalDate, <Map<String, Long>>.
Rename method(s) to reflect what they do.
---
.../collector/webstats/SanitizeWeblogs.java | 89 ++++++++++++++++------
1 file changed, 65 insertions(+), 24 deletions(-)
diff --git a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
index 1f2e922..5a270dd 100644
--- a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
+++ b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
@@ -4,8 +4,10 @@
package org.torproject.collector.webstats;
import static java.util.stream.Collectors.counting;
+import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.groupingByConcurrent;
-import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.reducing;
+import static java.util.stream.Collectors.summingLong;
import org.torproject.collector.conf.Configuration;
import org.torproject.collector.conf.ConfigurationException;
@@ -35,6 +37,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -111,35 +115,36 @@ public class SanitizeWeblogs extends CollecTorMain {
: virtualEntry.getValue().entrySet()) {
String physicalHost = physicalEntry.getKey();
log.info("Processing logs for {} on {}.", virtualHost, physicalHost);
- Map<LocalDate, List<WebServerAccessLogLine>> linesByDate
+ Map<LocalDate, Map<String, Long>> linesByDate
= physicalEntry.getValue().values().stream().parallel()
- .flatMap((LogMetadata metadata) -> lineStream(metadata)
- .filter((line) -> line.isValid())).parallel()
- .collect(groupingByConcurrent(WebServerAccessLogLine::getDate));
+ .flatMap(metadata -> sanitzedLineStream(metadata).entrySet()
+ .stream())
+ .collect(groupingBy(Map.Entry::getKey,
+ reducing(Collections.emptyMap(), Map.Entry::getValue,
+ (e1, e2) -> Stream.concat(e1.entrySet().stream(), e2.entrySet()
+ .stream())
+ .collect(groupingByConcurrent(Map.Entry::getKey,
+ summingLong(Map.Entry::getValue))))));
LocalDate[] interval = determineInterval(linesByDate.keySet());
linesByDate.entrySet().stream()
.filter((entry) -> entry.getKey().isAfter(interval[0])
&& entry.getKey().isBefore(interval[1])).parallel()
- .forEach((entry) -> storeSanitized(virtualHost, physicalHost,
+ .forEach((entry) -> storeSortedAndForget(virtualHost, physicalHost,
entry.getKey(), entry.getValue()));
}
}
}
- private void storeSanitized(String virtualHost, String physicalHost,
- LocalDate date, List<WebServerAccessLogLine> lines) {
+ private void storeSortedAndForget(String virtualHost, String physicalHost,
+ LocalDate date, Map<String, Long> lineCounts) {
String name = new StringJoiner(InternalLogDescriptor.SEP)
.add(virtualHost).add(physicalHost)
.add(InternalWebServerAccessLog.MARKER)
.add(date.format(DateTimeFormatter.BASIC_ISO_DATE))
.toString() + "." + FileType.XZ.name().toLowerCase();
- log.debug("Sanitizing {}.", name);
- Map<String, Long> retainedLines = new TreeMap<>(lines
- .stream().parallel().map((line) -> sanitize(line, date))
- .filter((line) -> line.isPresent())
- .map((line) -> line.get())
- .collect(groupingByConcurrent(line -> line, counting())));
- lines.clear(); // not needed anymore
+ log.debug("Storing {}.", name);
+ Map<String, Long> retainedLines = new TreeMap<>(lineCounts);
+ lineCounts.clear(); // not needed anymore
try {
WebServerAccessLogPersistence walp
= new WebServerAccessLogPersistence(
@@ -187,8 +192,8 @@ public class SanitizeWeblogs extends CollecTorMain {
.collect(Collectors.joining("\n", "", "\n")).getBytes();
}
- static Optional<String> sanitize(WebServerAccessLogLine logLine,
- LocalDate date) {
+ static Optional<WebServerAccessLogLine>
+ sanitize(WebServerAccessLogLine logLine) {
if (!logLine.isValid()
|| !(Method.GET == logLine.getMethod()
|| Method.HEAD == logLine.getMethod())
@@ -203,10 +208,13 @@ public class SanitizeWeblogs extends CollecTorMain {
if (queryStart > 0) {
logLine.setRequest(logLine.getRequest().substring(0, queryStart));
}
- return Optional.of(logLine.toLogString());
+ return Optional.of(logLine);
}
LocalDate[] determineInterval(Set<LocalDate> dates) {
+ if (dates.isEmpty()) { // return the empty interval
+ return new LocalDate[]{LocalDate.MAX, LocalDate.MIN};
+ }
SortedSet<LocalDate> sorted = new TreeSet<>();
sorted.addAll(dates);
if (this.limits) {
@@ -214,7 +222,7 @@ public class SanitizeWeblogs extends CollecTorMain {
sorted.remove(sorted.last());
}
}
- if (sorted.isEmpty()) {
+ if (sorted.isEmpty()) { // return the empty interval
return new LocalDate[]{LocalDate.MAX, LocalDate.MIN};
}
if (!this.limits) {
@@ -224,18 +232,51 @@ public class SanitizeWeblogs extends CollecTorMain {
return new LocalDate[]{sorted.first(), sorted.last()};
}
- private Stream<WebServerAccessLogLine> lineStream(LogMetadata metadata) {
+ private static final int LISTLIMIT = Integer.MAX_VALUE / 2;
+
+ private Map<LocalDate, Map<String, Long>>
+ sanitzedLineStream(LogMetadata metadata) {
log.debug("Processing file {}.", metadata.path);
try (BufferedReader br
= new BufferedReader(new InputStreamReader(
metadata.fileType.decompress(Files.newInputStream(metadata.path))))) {
- return br.lines()
- .map((String line) -> WebServerAccessLogLine.makeLine(line))
- .collect(toList()).stream();
+ List<List<WebServerAccessLogLine>> lists = new ArrayList<>();
+ List<WebServerAccessLogLine> currentList = new ArrayList<>();
+ lists.add(currentList);
+ String lineStr = br.readLine();
+ int count = 0;
+ while (null != lineStr) {
+ WebServerAccessLogLine wsal = WebServerAccessLogLine.makeLine(lineStr);
+ if (wsal.isValid()) {
+ currentList.add(wsal);
+ count++;
+ }
+ if (count >= LISTLIMIT) {
+ currentList = new ArrayList<>();
+ lists.add(currentList);
+ count = 0;
+ }
+ lineStr = br.readLine();
+ }
+ br.close();
+ return lists.parallelStream()
+ .map(list -> list.stream()
+ .map(line -> sanitize(line))
+ .filter(line -> line.isPresent())
+ .map(line -> line.get())
+ .collect(groupingBy(WebServerAccessLogLine::getDate,
+ groupingBy(WebServerAccessLogLine::toLogString, counting()))))
+ .flatMap(map -> map.entrySet().stream()).parallel()
+ .collect(groupingByConcurrent(Map.Entry::getKey,
+ reducing(Collections.emptyMap(), Map.Entry::getValue,
+ (e1, e2) -> Stream.concat(e1.entrySet().stream(),
+ e2.entrySet().stream()).parallel()
+ .collect(groupingByConcurrent(Map.Entry::getKey,
+ summingLong(Map.Entry::getValue))))));
} catch (Exception ex) {
log.debug("Skipping log-file {}.", metadata.path, ex);
}
- return Stream.empty();
+ return Collections.emptyMap();
}
}
More information about the tor-commits
mailing list