[tor-commits] [metrics-web/master] Make R object generation thread-safe.

karsten at torproject.org karsten at torproject.org
Wed Mar 21 09:56:24 UTC 2012


commit b0b2f22e160750f9c7acc9de509d2467d3b5c572
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Wed Mar 21 10:53:55 2012 +0100

    Make R object generation thread-safe.
    
    Problems with concurrent R object generation were highly unlikely before
    this patch.  But that makes them even harder to track down.  Now, we're
    generating all R objects in a separate thread that we join.  Before
    starting a new thread we check if there's already a thread running to
    generate the same object, and if so, we join that one.
---
 src/org/torproject/ernie/web/RObjectGenerator.java |  200 +++++++++++---------
 1 files changed, 108 insertions(+), 92 deletions(-)

diff --git a/src/org/torproject/ernie/web/RObjectGenerator.java b/src/org/torproject/ernie/web/RObjectGenerator.java
index 011eccd..3482dce 100644
--- a/src/org/torproject/ernie/web/RObjectGenerator.java
+++ b/src/org/torproject/ernie/web/RObjectGenerator.java
@@ -4,11 +4,13 @@ package org.torproject.ernie.web;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -181,48 +183,11 @@ public class RObjectGenerator implements ServletContextListener {
     /* See if we need to generate this graph. */
     File imageFile = new File(this.cachedGraphsDirectory + "/"
         + imageFilename);
-    long now = System.currentTimeMillis();
-    if (!checkCache || !imageFile.exists() ||
-        imageFile.lastModified() < now - this.maxCacheAge * 1000L) {
-
-      /* We do. Update the R query to contain the absolute path to the
-       * file to be generated, create a connection to Rserve, run the R
-       * query, and close the connection. The generated graph will be on
-       * disk. */
-      rQuery = String.format(rQuery, imageFile.getAbsolutePath());
-      try {
-        RConnection rc = new RConnection(rserveHost, rservePort);
-        rc.eval(rQuery);
-        rc.close();
-      } catch (RserveException e) {
-        return null;
-      }
-
-      /* Check that we really just generated the file */
-      if (!imageFile.exists() || imageFile.lastModified() < now
-          - this.maxCacheAge * 1000L) {
-        return null;
-      }
-    }
-
-    /* Read the image from disk and write it to a byte array. */
-    byte[] result = null;
-    try {
-      BufferedInputStream bis = new BufferedInputStream(
-          new FileInputStream(imageFile), 1024);
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      byte[] buffer = new byte[1024];
-      int length;
-      while ((length = bis.read(buffer)) > 0) {
-        baos.write(buffer, 0, length);
-      }
-      result = baos.toByteArray();
-    } catch (IOException e) {
-      return null;
-    }
+    byte[] imageBytes = this.generateRObject(rQuery, imageFile,
+        checkCache);
 
     /* Return the graph bytes. */
-    return result;
+    return imageBytes;
   }
 
   public SortedSet<String> getAvailableCsvFiles() {
@@ -245,35 +210,14 @@ public class RObjectGenerator implements ServletContextListener {
     /* See if we need to generate this .csv file. */
     File csvFile = new File(this.cachedGraphsDirectory + "/"
         + csvFilename);
-    long now = System.currentTimeMillis();
-    if (!checkCache || !csvFile.exists() ||
-        csvFile.lastModified() < now - this.maxCacheAge * 1000L) {
-
-      /* We do. Update the R query to contain the absolute path to the
-       * file to be generated, create a connection to Rserve, run the R
-       * query, and close the connection. The generated csv file will be
-       * on disk in the same directory as the generated graphs. */
-      rQuery = String.format(rQuery, csvFile.getAbsolutePath());
-      try {
-        RConnection rc = new RConnection(rserveHost, rservePort);
-        rc.eval(rQuery);
-        rc.close();
-      } catch (RserveException e) {
-        return null;
-      }
-
-      /* Check that we really just generated the file */
-      if (!csvFile.exists() || csvFile.lastModified() < now
-          - this.maxCacheAge * 1000L) {
-        return null;
-      }
-    }
+    byte[] csvBytes = this.generateRObject(rQuery, csvFile, checkCache);
 
     /* Read the text file from disk and write it to a string. */
     String result = null;
     try {
       StringBuilder sb = new StringBuilder();
-      BufferedReader br = new BufferedReader(new FileReader(csvFile));
+      BufferedReader br = new BufferedReader(new InputStreamReader(
+          new ByteArrayInputStream(csvBytes)));
       String line = null;
       while ((line = br.readLine()) != null) {
         sb.append(line + "\n");
@@ -283,7 +227,7 @@ public class RObjectGenerator implements ServletContextListener {
       return null;
     }
 
-    /* Return the csv file. */
+    /* Return the csv file content. */
     return result;
   }
 
@@ -343,36 +287,15 @@ public class RObjectGenerator implements ServletContextListener {
     /* See if we need to generate this table. */
     File tableFile = new File(this.cachedGraphsDirectory + "/"
         + tableFilename);
-    long now = System.currentTimeMillis();
-    if (!checkCache || !tableFile.exists() ||
-        tableFile.lastModified() < now - this.maxCacheAge * 1000L) {
-
-      /* We do. Update the R query to contain the absolute path to the
-       * file to be generated, create a connection to Rserve, run the R
-       * query, and close the connection. The generated csv file will be
-       * on disk in the same directory as the generated graphs. */
-      rQuery = String.format(rQuery, tableFile.getAbsolutePath());
-      try {
-        RConnection rc = new RConnection(rserveHost, rservePort);
-        rc.eval(rQuery);
-        rc.close();
-      } catch (RserveException e) {
-        return null;
-      }
+    byte[] tableBytes = this.generateRObject(rQuery, tableFile,
+        checkCache);
 
-      /* Check that we really just generated the file */
-      if (!tableFile.exists() || tableFile.lastModified() < now
-          - this.maxCacheAge * 1000L) {
-        return null;
-      }
-    }
-
-    /* Read the text file from disk and write the table content to a
-     * map. */
+    /* Write the table content to a map. */
     List<Map<String, String>> result = null;
     try {
       result = new ArrayList<Map<String, String>>();
-      BufferedReader br = new BufferedReader(new FileReader(tableFile));
+      BufferedReader br = new BufferedReader(new InputStreamReader(
+          new ByteArrayInputStream(tableBytes)));
       String line = br.readLine();
       if (line != null) {
         List<String> headers = new ArrayList<String>(Arrays.asList(
@@ -396,5 +319,98 @@ public class RObjectGenerator implements ServletContextListener {
     /* Return table values. */
     return result;
   }
-}
 
+  /* Generate an R object in a separate worker thread, or wait for an
+   * already running worker thread to finish and get its result. */
+  private byte[] generateRObject(String rQuery, File rObjectFile,
+      boolean checkCache) {
+    RObjectGeneratorWorker worker = null;
+    synchronized (this.rObjectGeneratorThreads) {
+      if (this.rObjectGeneratorThreads.containsKey(rQuery)) {
+        worker = this.rObjectGeneratorThreads.get(rQuery);
+      } else {
+        worker = new RObjectGeneratorWorker(rQuery, rObjectFile,
+            checkCache);
+        this.rObjectGeneratorThreads.put(rQuery, worker);
+        worker.start();
+      }
+    }
+    try {
+      worker.join();
+    } catch (InterruptedException e) {
+    }
+    synchronized (this.rObjectGeneratorThreads) {
+      if (this.rObjectGeneratorThreads.containsKey(rQuery) &&
+          this.rObjectGeneratorThreads.get(rQuery) == worker) {
+        this.rObjectGeneratorThreads.remove(rQuery);
+      }
+    }
+    return worker.getRObjectBytes();
+  }
+
+  private Map<String, RObjectGeneratorWorker> rObjectGeneratorThreads =
+      new HashMap<String, RObjectGeneratorWorker>();
+
+  private class RObjectGeneratorWorker extends Thread {
+
+    private String rQuery;
+    private File rObjectFile;
+    private boolean checkCache;
+    private byte[] result = null;
+
+    public RObjectGeneratorWorker(String rQuery, File rObjectFile,
+        boolean checkCache) {
+      this.rQuery = rQuery;
+      this.rObjectFile = rObjectFile;
+      this.checkCache = checkCache;
+    }
+
+    public void run() {
+
+      /* See if we need to generate this R object. */
+      long now = System.currentTimeMillis();
+      if (!this.checkCache || !this.rObjectFile.exists() ||
+          this.rObjectFile.lastModified() < now - maxCacheAge * 1000L) {
+
+        /* We do. Update the R query to contain the absolute path to the
+         * file to be generated, create a connection to Rserve, run the R
+         * query, and close the connection. The generated object will be
+         * on disk. */
+        this.rQuery = String.format(this.rQuery,
+            this.rObjectFile.getAbsolutePath());
+        try {
+          RConnection rc = new RConnection(rserveHost, rservePort);
+          rc.eval(this.rQuery);
+          rc.close();
+        } catch (RserveException e) {
+          return;
+        }
+
+        /* Check that we really just generated the R object. */
+        if (!this.rObjectFile.exists() || this.rObjectFile.lastModified()
+            < now - maxCacheAge * 1000L) {
+          return;
+        }
+      }
+
+      /* Read the R object from disk and write it to a byte array. */
+      try {
+        BufferedInputStream bis = new BufferedInputStream(
+            new FileInputStream(this.rObjectFile), 1024);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        byte[] buffer = new byte[1024];
+        int length;
+        while ((length = bis.read(buffer)) > 0) {
+          baos.write(buffer, 0, length);
+        }
+        this.result = baos.toByteArray();
+      } catch (IOException e) {
+        return;
+      }
+    }
+
+    public byte[] getRObjectBytes() {
+      return result;
+    }
+  }
+}



More information about the tor-commits mailing list