[tor-commits] [metrics-web/master] Make R object generation thread-safe.
karsten at torproject.org
karsten at torproject.org
Wed Mar 21 09:56:24 UTC 2012
commit b0b2f22e160750f9c7acc9de509d2467d3b5c572
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date: Wed Mar 21 10:53:55 2012 +0100
Make R object generation thread-safe.
Problems with concurrent R object generation were highly unlikely before
this patch. But that makes them even harder to track down. Now, we're
generating all R objects in a separate thread that we join. Before
starting a new thread we check if there's already a thread running to
generate the same object, and if so, we join that one.
---
src/org/torproject/ernie/web/RObjectGenerator.java | 200 +++++++++++---------
1 files changed, 108 insertions(+), 92 deletions(-)
diff --git a/src/org/torproject/ernie/web/RObjectGenerator.java b/src/org/torproject/ernie/web/RObjectGenerator.java
index 011eccd..3482dce 100644
--- a/src/org/torproject/ernie/web/RObjectGenerator.java
+++ b/src/org/torproject/ernie/web/RObjectGenerator.java
@@ -4,11 +4,13 @@ package org.torproject.ernie.web;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -181,48 +183,11 @@ public class RObjectGenerator implements ServletContextListener {
/* See if we need to generate this graph. */
File imageFile = new File(this.cachedGraphsDirectory + "/"
+ imageFilename);
- long now = System.currentTimeMillis();
- if (!checkCache || !imageFile.exists() ||
- imageFile.lastModified() < now - this.maxCacheAge * 1000L) {
-
- /* We do. Update the R query to contain the absolute path to the
- * file to be generated, create a connection to Rserve, run the R
- * query, and close the connection. The generated graph will be on
- * disk. */
- rQuery = String.format(rQuery, imageFile.getAbsolutePath());
- try {
- RConnection rc = new RConnection(rserveHost, rservePort);
- rc.eval(rQuery);
- rc.close();
- } catch (RserveException e) {
- return null;
- }
-
- /* Check that we really just generated the file */
- if (!imageFile.exists() || imageFile.lastModified() < now
- - this.maxCacheAge * 1000L) {
- return null;
- }
- }
-
- /* Read the image from disk and write it to a byte array. */
- byte[] result = null;
- try {
- BufferedInputStream bis = new BufferedInputStream(
- new FileInputStream(imageFile), 1024);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- byte[] buffer = new byte[1024];
- int length;
- while ((length = bis.read(buffer)) > 0) {
- baos.write(buffer, 0, length);
- }
- result = baos.toByteArray();
- } catch (IOException e) {
- return null;
- }
+ byte[] imageBytes = this.generateRObject(rQuery, imageFile,
+ checkCache);
/* Return the graph bytes. */
- return result;
+ return imageBytes;
}
public SortedSet<String> getAvailableCsvFiles() {
@@ -245,35 +210,14 @@ public class RObjectGenerator implements ServletContextListener {
/* See if we need to generate this .csv file. */
File csvFile = new File(this.cachedGraphsDirectory + "/"
+ csvFilename);
- long now = System.currentTimeMillis();
- if (!checkCache || !csvFile.exists() ||
- csvFile.lastModified() < now - this.maxCacheAge * 1000L) {
-
- /* We do. Update the R query to contain the absolute path to the
- * file to be generated, create a connection to Rserve, run the R
- * query, and close the connection. The generated csv file will be
- * on disk in the same directory as the generated graphs. */
- rQuery = String.format(rQuery, csvFile.getAbsolutePath());
- try {
- RConnection rc = new RConnection(rserveHost, rservePort);
- rc.eval(rQuery);
- rc.close();
- } catch (RserveException e) {
- return null;
- }
-
- /* Check that we really just generated the file */
- if (!csvFile.exists() || csvFile.lastModified() < now
- - this.maxCacheAge * 1000L) {
- return null;
- }
- }
+ byte[] csvBytes = this.generateRObject(rQuery, csvFile, checkCache);
/* Read the text file from disk and write it to a string. */
String result = null;
try {
StringBuilder sb = new StringBuilder();
- BufferedReader br = new BufferedReader(new FileReader(csvFile));
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new ByteArrayInputStream(csvBytes)));
String line = null;
while ((line = br.readLine()) != null) {
sb.append(line + "\n");
@@ -283,7 +227,7 @@ public class RObjectGenerator implements ServletContextListener {
return null;
}
- /* Return the csv file. */
+ /* Return the csv file content. */
return result;
}
@@ -343,36 +287,15 @@ public class RObjectGenerator implements ServletContextListener {
/* See if we need to generate this table. */
File tableFile = new File(this.cachedGraphsDirectory + "/"
+ tableFilename);
- long now = System.currentTimeMillis();
- if (!checkCache || !tableFile.exists() ||
- tableFile.lastModified() < now - this.maxCacheAge * 1000L) {
-
- /* We do. Update the R query to contain the absolute path to the
- * file to be generated, create a connection to Rserve, run the R
- * query, and close the connection. The generated csv file will be
- * on disk in the same directory as the generated graphs. */
- rQuery = String.format(rQuery, tableFile.getAbsolutePath());
- try {
- RConnection rc = new RConnection(rserveHost, rservePort);
- rc.eval(rQuery);
- rc.close();
- } catch (RserveException e) {
- return null;
- }
+ byte[] tableBytes = this.generateRObject(rQuery, tableFile,
+ checkCache);
- /* Check that we really just generated the file */
- if (!tableFile.exists() || tableFile.lastModified() < now
- - this.maxCacheAge * 1000L) {
- return null;
- }
- }
-
- /* Read the text file from disk and write the table content to a
- * map. */
+ /* Write the table content to a map. */
List<Map<String, String>> result = null;
try {
result = new ArrayList<Map<String, String>>();
- BufferedReader br = new BufferedReader(new FileReader(tableFile));
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ new ByteArrayInputStream(tableBytes)));
String line = br.readLine();
if (line != null) {
List<String> headers = new ArrayList<String>(Arrays.asList(
@@ -396,5 +319,98 @@ public class RObjectGenerator implements ServletContextListener {
/* Return table values. */
return result;
}
-}
+ /* Generate an R object in a separate worker thread, or wait for an
+ * already running worker thread to finish and get its result. */
+ private byte[] generateRObject(String rQuery, File rObjectFile,
+ boolean checkCache) {
+ RObjectGeneratorWorker worker = null;
+ synchronized (this.rObjectGeneratorThreads) {
+ if (this.rObjectGeneratorThreads.containsKey(rQuery)) {
+ worker = this.rObjectGeneratorThreads.get(rQuery);
+ } else {
+ worker = new RObjectGeneratorWorker(rQuery, rObjectFile,
+ checkCache);
+ this.rObjectGeneratorThreads.put(rQuery, worker);
+ worker.start();
+ }
+ }
+ try {
+ worker.join();
+ } catch (InterruptedException e) {
+ }
+ synchronized (this.rObjectGeneratorThreads) {
+ if (this.rObjectGeneratorThreads.containsKey(rQuery) &&
+ this.rObjectGeneratorThreads.get(rQuery) == worker) {
+ this.rObjectGeneratorThreads.remove(rQuery);
+ }
+ }
+ return worker.getRObjectBytes();
+ }
+
+ private Map<String, RObjectGeneratorWorker> rObjectGeneratorThreads =
+ new HashMap<String, RObjectGeneratorWorker>();
+
+ private class RObjectGeneratorWorker extends Thread {
+
+ private String rQuery;
+ private File rObjectFile;
+ private boolean checkCache;
+ private byte[] result = null;
+
+ public RObjectGeneratorWorker(String rQuery, File rObjectFile,
+ boolean checkCache) {
+ this.rQuery = rQuery;
+ this.rObjectFile = rObjectFile;
+ this.checkCache = checkCache;
+ }
+
+ public void run() {
+
+ /* See if we need to generate this R object. */
+ long now = System.currentTimeMillis();
+ if (!this.checkCache || !this.rObjectFile.exists() ||
+ this.rObjectFile.lastModified() < now - maxCacheAge * 1000L) {
+
+ /* We do. Update the R query to contain the absolute path to the
+ * file to be generated, create a connection to Rserve, run the R
+ * query, and close the connection. The generated object will be
+ * on disk. */
+ this.rQuery = String.format(this.rQuery,
+ this.rObjectFile.getAbsolutePath());
+ try {
+ RConnection rc = new RConnection(rserveHost, rservePort);
+ rc.eval(this.rQuery);
+ rc.close();
+ } catch (RserveException e) {
+ return;
+ }
+
+ /* Check that we really just generated the R object. */
+ if (!this.rObjectFile.exists() || this.rObjectFile.lastModified()
+ < now - maxCacheAge * 1000L) {
+ return;
+ }
+ }
+
+ /* Read the R object from disk and write it to a byte array. */
+ try {
+ BufferedInputStream bis = new BufferedInputStream(
+ new FileInputStream(this.rObjectFile), 1024);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] buffer = new byte[1024];
+ int length;
+ while ((length = bis.read(buffer)) > 0) {
+ baos.write(buffer, 0, length);
+ }
+ this.result = baos.toByteArray();
+ } catch (IOException e) {
+ return;
+ }
+ }
+
+ public byte[] getRObjectBytes() {
+ return result;
+ }
+ }
+}
More information about the tor-commits
mailing list