[tbb-commits] [torbutton/master] Bug 40679: Missing features on first-time launch in esr91

gk at torproject.org gk at torproject.org
Wed Jan 19 13:35:24 UTC 2022


commit 10ecf6dd6ecf4ddf8f4cacfdfcb36be0881e83bb
Author: Richard Pospesel <richard at torproject.org>
Date:   Thu Dec 16 14:38:05 2021 +0100

    Bug 40679: Missing features on first-time launch in esr91
    
    - fixes myriad problems with our async control port communication
      logic
    - fixes tor-browser#40679
---
 chrome/content/tor-circuit-display.js |  21 +-
 chrome/content/torbutton.js           |   4 +-
 modules/tor-control-port.js           | 573 +++++++++++++++++++++-------------
 3 files changed, 361 insertions(+), 237 deletions(-)

diff --git a/chrome/content/tor-circuit-display.js b/chrome/content/tor-circuit-display.js
index 4e0088f0..d6034384 100644
--- a/chrome/content/tor-circuit-display.js
+++ b/chrome/content/tor-circuit-display.js
@@ -29,7 +29,7 @@ let createTorCircuitDisplay = (function () {
 const { Services } = ChromeUtils.import("resource://gre/modules/Services.jsm");
 
 // Import the controller code.
-let { controller } = ChromeUtils.import("resource://torbutton/modules/tor-control-port.js", {});
+let { wait_for_controller } = ChromeUtils.import("resource://torbutton/modules/tor-control-port.js", {});
 
 // Utility functions
 let { bindPrefAndInit, observe, getLocale, getDomainForBrowser, torbutton_get_property_string } = ChromeUtils.import("resource://torbutton/modules/utils.js", {});
@@ -458,7 +458,7 @@ let setupDisplay = function (enablePrefName) {
         syncDisplayWithSelectedTab(false);
         if (myController) {
           if (stopCollectingIsolationData) {
-	    stopCollectingIsolationData();
+            stopCollectingIsolationData();
           }
           if (stopCollectingBrowserCredentials) {
             stopCollectingBrowserCredentials();
@@ -469,19 +469,20 @@ let setupDisplay = function (enablePrefName) {
           myController = null;
         }
       },
-      start = function () {
+      start = async function () {
         if (!myController) {
-          myController = controller(function (err) {
-            // An error has occurred.
+          try {
+            myController = await wait_for_controller();
+            syncDisplayWithSelectedTab(true);
+            stopCollectingIsolationData = collectIsolationData(myController, updateCircuitDisplay);
+            stopCollectingBrowserCredentials = collectBrowserCredentials();
+            stopEnsuringCorrectPopupDimensions = ensureCorrectPopupDimensions();
+          } catch (err) {
             logger.eclog(5, err);
             logger.eclog(5, "Disabling tor display circuit because of an error.");
             myController.close();
             stop();
-          });
-          syncDisplayWithSelectedTab(true);
-          stopCollectingIsolationData = collectIsolationData(myController, updateCircuitDisplay);
-          stopCollectingBrowserCredentials = collectBrowserCredentials();
-          stopEnsuringCorrectPopupDimensions = ensureCorrectPopupDimensions();
+          }
        }
      };
   try {
diff --git a/chrome/content/torbutton.js b/chrome/content/torbutton.js
index 65048b1a..3bfaa1f8 100644
--- a/chrome/content/torbutton.js
+++ b/chrome/content/torbutton.js
@@ -28,7 +28,7 @@ let {
   torbutton_log,
   torbutton_get_property_string,
 } = ChromeUtils.import("resource://torbutton/modules/utils.js", {});
-let { configureControlPortModule, controller } = Cu.import("resource://torbutton/modules/tor-control-port.js", {});
+let { configureControlPortModule, wait_for_controller } = Cu.import("resource://torbutton/modules/tor-control-port.js", {});
 
 const k_tb_tor_check_failed_topic = "Torbutton:TorCheckFailed";
 
@@ -352,7 +352,7 @@ async function torbutton_send_ctrl_cmd(command) {
   let response = null;
   try {
     const avoidCache = true;
-    let torController = controller(e => { throw e; }, avoidCache);
+    let torController = await wait_for_controller(avoidCache);
 
     let bytes = await torController.sendCommand(command);
     if (!bytes.startsWith("250")) {
diff --git a/modules/tor-control-port.js b/modules/tor-control-port.js
index ef7bbe69..51ac8ac0 100644
--- a/modules/tor-control-port.js
+++ b/modules/tor-control-port.js
@@ -7,16 +7,16 @@
 //
 // To import the module, use
 //
-//  let { configureControlPortModule, controller } =
+//  let { configureControlPortModule, controller, wait_for_controller } =
 //                Components.utils.import("path/to/tor-control-port.js", {});
 //
-// See the second-to-last function defined in this file:
+// See the third-to-last function defined in this file:
 //   configureControlPortModule(ipcFile, host, port, password)
 // for usage of the configureControlPortModule function.
 //
-// See the last function defined in this file:
-//   controller(onError)
-// for usage of the controller function.
+// See the last functions defined in this file:
+//   controller(avoidCache), wait_for_controller(avoidCache)
+// for usage of the controller functions.
 
 /* jshint esnext: true */
 /* jshint -W097 */
@@ -29,6 +29,14 @@ let { Constructor: CC } = Components;
 // ### Import Mozilla Services
 const { Services } = ChromeUtils.import("resource://gre/modules/Services.jsm");
 
+const { TorProtocolService, TorProcessStatus } = ChromeUtils.import(
+    "resource:///modules/TorProtocolService.jsm"
+);
+// tor-launcher observer topics
+const TorTopics = Object.freeze({
+    ProcessIsReady: "TorProcessIsReady",
+});
+
 // __log__.
 // Logging function
 let logger = Cc["@torproject.org/torbutton-logger;1"]
@@ -38,159 +46,289 @@ let log = x => logger.eclog(3, x.trimRight().replace(/\r\n/g, "\n"));
 // ### announce this file
 log("Loading tor-control-port.js\n");
 
-// ## io
-// I/O utilities namespace
-let io = {};
+class AsyncSocket {
+  constructor(ipcFile, host, port) {
+    let sts = Cc["@mozilla.org/network/socket-transport-service;1"].getService(Ci.nsISocketTransportService);
+    const OPEN_UNBUFFERED = Ci.nsITransport.OPEN_UNBUFFERED;
 
-// __io.asyncSocketStreams(ipcFile, host, port)__.
-// Creates a pair of asynchronous input and output streams for a socket at the
-// given ipcFile or host and port.
-io.asyncSocketStreams = function (ipcFile, host, port) {
-  let sts = Cc["@mozilla.org/network/socket-transport-service;1"]
-              .getService(Ci.nsISocketTransportService),
-      UNBUFFERED = Ci.nsITransport.OPEN_UNBUFFERED;
-
-  // Create an instance of a socket transport.
-  let socketTransport;
-  if (ipcFile) {
-    socketTransport = sts.createUnixDomainTransport(ipcFile);
-  } else {
-    socketTransport = sts.createTransport([], host, port, null, null);
+    let socketTransport = ipcFile ?
+                          sts.createUnixDomainTransport(ipcFile) :
+                          sts.createTransport([], host, port, null, null);
+
+
+    this.outputStream = socketTransport.openOutputStream(OPEN_UNBUFFERED, 1, 1).QueryInterface(Ci.nsIAsyncOutputStream);
+    this.outputQueue = [];
+
+    this.inputStream = socketTransport.openInputStream(OPEN_UNBUFFERED, 1, 1).QueryInterface(Ci.nsIAsyncInputStream);
+    this.scriptableInputStream = Cc["@mozilla.org/scriptableinputstream;1"].createInstance(Ci.nsIScriptableInputStream);
+    this.scriptableInputStream.init(this.inputStream);
+    this.inputQueue = [];
   }
 
-  // Open unbuffered asynchronous outputStream.
-  let outputStream = socketTransport.openOutputStream(UNBUFFERED, 1, 1)
-                      .QueryInterface(Ci.nsIAsyncOutputStream),
-      // Open unbuffered asynchronous inputStream.
-      inputStream = socketTransport.openInputStream(UNBUFFERED, 1, 1)
-                      .QueryInterface(Ci.nsIAsyncInputStream);
-  return [inputStream, outputStream];
-};
+  // asynchronously write string to underlying socket and return number of bytes written
+  async write(str) {
+    return new Promise((resolve, reject) => {
 
-// __io.pumpInputStream(scriptableInputStream, onInputData, onError)__.
-// Take an input stream and asynchronously pass data to the onInputData callback.
-io.pumpInputStream = function (inputStream, onInputData, onError) {
-  // Wrap raw inputStream with a "ScriptableInputStream" so we can read incoming data.
-  let ScriptableInputStream = Components.Constructor(
-    "@mozilla.org/scriptableinputstream;1", "nsIScriptableInputStream", "init"),
-      scriptableInputStream = new ScriptableInputStream(inputStream),
-      awaitNextChunk = function () {
-        inputStream.asyncWait({
-          onInputStreamReady: (stream) => {
-            try {
-              let chunk = scriptableInputStream.read(scriptableInputStream.available());
-              onInputData(chunk);
-              awaitNextChunk();
-            } catch (err) {
-              if (err.result !== Cr.NS_BASE_STREAM_CLOSED) {
-                onError(err);
-              }
-            }
+      // asyncWait next write request
+      const tryAsyncWait = () => {
+        if (this.outputQueue.length > 0) {
+          this.outputStream.asyncWait(
+            this.outputQueue.at(0), // next request
+            0, 0, Services.tm.currentThread);
+        }
+      };
+
+      // output stream can only have 1 registered callback at a time, so multiple writes
+      // need to be queued up (see nsIAsyncOutputStream.idl)
+      this.outputQueue.push({
+        // Implement an nsIOutputStreamCallback:
+        onOutputStreamReady: () => {
+          try {
+            let bytesWritten = this.outputStream.write(str, str.length);
+
+            // remove this callback object from queue as it is now completed
+            this.outputQueue.shift();
+
+            // request next wait if there is one
+            tryAsyncWait();
+
+            // finally resolve promise
+            resolve(bytesWritten);
+          } catch (err) {
+            // reject promise on error
+            reject(err);
           }
-        }, 0, 0, Services.tm.currentThread);
+        }
+      });
+
+      // length 1 imples that there is no in-flight asyncWait, so we may immediately
+      // follow through on this write
+      if (this.outputQueue.length == 1) {
+        tryAsyncWait();
+      }
+    });
+  }
+
+  // asynchronously read string from underlying socket and return it
+  async read() {
+    return new Promise((resolve, reject) => {
+
+      const tryAsyncWait = () => {
+        if (this.inputQueue.length > 0) {
+          this.inputStream.asyncWait(
+            this.inputQueue.at(0),  // next input request
+            0, 0, Services.tm.currentThread);
+        }
       };
-  awaitNextChunk();
-};
 
-// __io.asyncSocket(ipcFile, host, port, onInputData, onError)__.
-// Creates an asynchronous, text-oriented IPC socket (if ipcFile is defined)
-// or a TCP socket at host:port.
-// The onInputData callback should accept a single argument, which will be called
-// repeatedly, whenever incoming text arrives. Returns a socket object with two methods:
-// socket.write(text) and socket.close(). onError will be passed the error object
-// whenever a write fails.
-io.asyncSocket = function (ipcFile, host, port, onInputData, onError) {
-  let [inputStream, outputStream] = io.asyncSocketStreams(ipcFile, host, port),
-      pendingWrites = [];
-  // Run an input stream pump to send incoming data to the onInputData callback.
-  io.pumpInputStream(inputStream, onInputData, onError);
-  // Return the "socket object" as described.
-  return {
-           // Write a message to the socket.
-           write : function(aString) {
-             pendingWrites.push(aString);
-             outputStream.asyncWait(
-               // Implement an nsIOutputStreamCallback:
-               { onOutputStreamReady : function () {
-                 let totalString = pendingWrites.join("");
-                   try {
-                     outputStream.write(totalString, totalString.length);
-                     log("controlPort << " + totalString);
-                   } catch (err) {
-                     onError(err);
-                   }
-                   pendingWrites = [];
-               } },
-               0, 0, Services.tm.currentThread);
-           },
-           // Close the socket.
-           close : function () {
-             // Close stream objects.
-             inputStream.close();
-             outputStream.close();
-           }
-         };
-};
+      this.inputQueue.push({
+        onInputStreamReady: (stream) => {
+          try {
+            // read our string from input stream
+            let str = this.scriptableInputStream.read(this.scriptableInputStream.available());
 
-// __io.onDataFromOnLine(onLine)__.
-// Converts a callback that expects incoming individual lines of text to a callback that
-// expects incoming raw socket string data.
-io.onDataFromOnLine = function (onLine) {
-  // A private variable that stores the last unfinished line.
-  let pendingData = "";
-  // Return a callback to be passed to io.asyncSocket. First, splits data into lines of
-  // text. If the incoming data is not terminated by CRLF, then the last
-  // unfinished line will be stored in pendingData, to be prepended to the data in the
-  // next call to onData. The already complete lines of text are then passed in sequence
-  // to onLine.
-  return function (data) {
-    let totalData = pendingData + data,
-        lines = totalData.split("\r\n"),
-        n = lines.length;
-    pendingData = lines[n - 1];
-    // Call onLine for all completed lines.
-    lines.slice(0,-1).map(onLine);
-  };
+            // remove this callback object from queue now that we have read
+            this.inputQueue.shift();
+
+            // request next wait if there is one
+            tryAsyncWait();
+
+            // finally resolve promise
+            resolve(str);
+          } catch (err) {
+            reject(err);
+          }
+        }
+      });
+
+      // length 1 imples that there is no in-flight asyncWait, so we may immediately
+      // follow through on this read
+      if (this.inputQueue.length == 1) {
+        tryAsyncWait();
+      }
+    });
+  }
+
+  close() {
+    this.outputStream.close();
+    this.inputStream.close();
+  }
 };
 
-// __io.onLineFromOnMessage(onMessage)__.
-// Converts a callback that expects incoming control port multiline message strings to a
-// callback that expects individual lines.
-io.onLineFromOnMessage = function (onMessage) {
-  // A private variable that stores the last unfinished line.
-  let pendingLines = [],
-      // A private variable to monitor whether we are receiving a multiline
-      // value, beginning with ###+ and ending with a single ".".
-      multilineValueInProgress = false;
-  // Return a callback that expects individual lines.
-  return function (line) {
-    // Add to the list of pending lines.
-    pendingLines.push(line);
-    // 'Multiline values' are possible. We avoid interrupting one by detecting it
-    // and waiting for a terminating "." on its own line.
-    // (See control-spec section 3.9 and https://trac.torproject.org/16990#comment:28
-    if (line.match(/^\d\d\d\+.+?=$/) && pendingLines.length === 1) {
-      multilineValueInProgress = true;
+class ControlSocket {
+  constructor(asyncSocket) {
+    this.socket = asyncSocket;
+    this._isOpen = true;
+    this.pendingData = "";
+    this.pendingLines = [];
+
+    this.mainDispatcher = io.callbackDispatcher();
+    this.notificationDispatcher = io.callbackDispatcher();
+    // mainDispatcher pushes only async notifications (650) to notificationDispatcher
+    this.mainDispatcher.addCallback(/^650/, this._handleNotification.bind(this));
+    // callback for handling responses and errors
+    this.mainDispatcher.addCallback(/^[245]\d\d/, this._handleCommandReply.bind(this) );
+
+    this.commandQueue = [];
+
+    this._startMessagePump();
+  }
+
+  // blocks until an entire line is read and returns it
+  // immediately returns next line in queue (pendingLines) if present
+  async _readLine() {
+    // keep reading from socket until we have a full line to return
+    while(this.pendingLines.length == 0) {
+      // read data from our socket and spit on newline tokens
+      this.pendingData += await this.socket.read();
+      let lines = this.pendingData.split("\r\n");
+
+      // the last line will either be empty string, or a partial read of a response/event
+      // so save it off for the next socket read
+      this.pendingData = lines.pop();
+
+      // copy remaining full lines to our pendingLines list
+      this.pendingLines = this.pendingLines.concat(lines);
+
     }
-    if (multilineValueInProgress && line.match(/^\.$/)) {
-      multilineValueInProgress = false;
+    return this.pendingLines.shift();
+  }
+
+  // blocks until an entire message is ready and returns it
+  async _readMessage() {
+    // whether we are searching for the end of a multi-line values
+    // See control-spec section 3.9
+    let handlingMultlineValue = false;
+    let endOfMessageFound = false;
+    const message = [];
+
+    do {
+      const line = await this._readLine();
+      message.push(line);
+
+      if (handlingMultlineValue) {
+        // look for end of multiline
+        if (line.match(/^\.$/)) {
+          handlingMultlineValue = false;
+        }
+      } else {
+        // 'Multiline values' are possible. We avoid interrupting one by detecting it
+        // and waiting for a terminating "." on its own line.
+        // (See control-spec section 3.9 and https://trac.torproject.org/16990#comment:28
+        // Ensure this is the first line of a new message
+        if (message.length === 1 && line.match(/^\d\d\d\+.+?=$/)) {
+          handlingMultlineValue = true;
+        }
+        // look for end of message (note the space character at end of the regex)
+        else if(line.match(/^\d\d\d /)) {
+          if (message.length == 1) {
+            endOfMessageFound = true;
+          } else {
+            let firstReplyCode = message[0].substring(0,3);
+            let lastReplyCode = line.substring(0,3);
+            if (firstReplyCode == lastReplyCode) {
+              endOfMessageFound = true;
+            }
+          }
+        }
+      }
+    } while(!endOfMessageFound);
+
+    // join our lines back together to form one message
+    return message.join("\r\n");
+  }
+
+  async _startMessagePump() {
+    try {
+      while(true) {
+        let message = await this._readMessage();
+        log("controlPort >> " + message);
+        this.mainDispatcher.pushMessage(message);
+      }
+    } catch (err) {
+      this._isOpen = false;
+      for(const cmd of this.commandQueue) {
+        cmd.reject(err);
+      }
+      this.commandQueue = [];
     }
-    // If line is the last in a message, then pass on the full multiline message.
-    if (!multilineValueInProgress &&
-        line.match(/^\d\d\d /) &&
-        (pendingLines.length === 1 ||
-         pendingLines[0].substring(0,3) === line.substring(0,3))) {
-      // Combine pending lines to form message.
-      let message = pendingLines.join("\r\n");
-      log("controlPort >> " + message);
-      // Wipe pendingLines before we call onMessage, in case onMessage throws an error.
-      pendingLines = [];
-      // Pass multiline message to onMessage.
-      onMessage(message);
+  }
+
+  _writeNextCommand() {
+    let cmd = this.commandQueue[0];
+    log("controlPort << " + cmd.commandString);
+    this.socket.write(`${cmd.commandString}\r\n`).catch(cmd.reject);
+  }
+
+  async sendCommand(commandString) {
+    if (!this.isOpen()) {
+      throw new Error("ControlSocket not open");
     }
-  };
+
+    // this promise is resolved either in _handleCommandReply, or
+    // in _startMessagePump (on stream error)
+    return new Promise((resolve, reject) => {
+      let command = {
+        commandString: commandString,
+        resolve: resolve,
+        reject: reject,
+      };
+
+      this.commandQueue.push(command);
+      if (this.commandQueue.length == 1) {
+        this._writeNextCommand();
+      }
+    });
+  }
+
+  _handleCommandReply(message) {
+    let cmd = this.commandQueue.shift();
+    if (message.match(/^2/)) {
+      cmd.resolve(message);
+    } else if (message.match(/^[45]/)) {
+      let myErr = new Error(cmd.commandString + " -> " + message);
+      // Add Tor-specific information to the Error object.
+      let idx = message.indexOf(' ');
+      if (idx > 0) {
+        myErr.torStatusCode = message.substring(0, idx);
+        myErr.torMessage = message.substring(idx);
+      } else {
+        myErr.torStatusCode = message;
+      }
+      cmd.reject(myErr);
+    } else {
+      cmd.reject(new Error(`ControlSocket::_handleCommandReply received unexpected message:\n----\n${message}\n----`));
+    }
+
+    // send next command if one is available
+    if (this.commandQueue.length > 0) {
+      this._writeNextCommand();
+    }
+  }
+
+  _handleNotification(message) {
+    this.notificationDispatcher.pushMessage(message);
+  }
+
+  close() {
+    this.socket.close();
+    this._isOpen = false;
+  }
+
+  addNotificationCallback(regex, callback) {
+    this.notificationDispatcher.addCallback(regex, callback);
+  }
+
+  isOpen() {
+    return this._isOpen;
+  }
 };
 
+// ## io
+// I/O utilities namespace
+
+let io = {};
+
 // __io.callbackDispatcher()__.
 // Returns dispatcher object with three member functions:
 // dispatcher.addCallback(regex, callback), dispatcher.removeCallback(callback),
@@ -222,77 +360,30 @@ io.callbackDispatcher = function () {
            addCallback : addCallback };
 };
 
-// __io.matchRepliesToCommands(asyncSend, dispatcher)__.
-// Takes asyncSend(message), an asynchronous send function, and the callback
-// dispatcher, and returns a function Promise<response> sendCommand(command).
-io.matchRepliesToCommands = function (asyncSend, dispatcher) {
-  let commandQueue = [],
-      sendCommand = function (command, replyCallback, errorCallback) {
-        commandQueue.push([command, replyCallback, errorCallback]);
-        asyncSend(command);
-      };
-  // Watch for responses (replies or error messages)
-  dispatcher.addCallback(/^[245]\d\d/, function (message) {
-    let [command, replyCallback, errorCallback] = commandQueue.shift();
-    if (message.match(/^2/) && replyCallback) replyCallback(message);
-    if (message.match(/^[45]/) && errorCallback) {
-      let myErr = new Error(command + " -> " + message);
-      // Add Tor-specific information to the Error object.
-      let idx = message.indexOf(' ');
-      if (idx > 0) {
-        myErr.torStatusCode = message.substring(0, idx);
-        myErr.torMessage = message.substring(idx);
-      } else {
-        myErr.torStatusCode = message;
-      }
-      errorCallback(myErr);
-    }
-  });
-  // Create and return a version of sendCommand that returns a Promise.
-  return command => new Promise(function (replyCallback, errorCallback) {
-    sendCommand(command, replyCallback, errorCallback);
-  });
-};
-
-// __io.controlSocket(ipcFile, host, port, password, onError)__.
+// __io.controlSocket(ipcFile, host, port, password)__.
 // Instantiates and returns a socket to a tor ControlPort at ipcFile or
-// host:port, authenticating with the given password. onError is called with an
-// error object as its single argument whenever an error occurs. Example:
+// host:port, authenticating with the given password. Example:
 //
 //     // Open the socket
-//     let socket = controlSocket(undefined, "127.0.0.1", 9151, "MyPassw0rd",
-//                    function (error) { console.log(error.message || error); });
-//     // Send command and receive "250" reply or error message
-//     socket.sendCommand(commandText, replyCallback, errorCallback);
+//     let socket = await io.controlSocket(undefined, "127.0.0.1", 9151, "MyPassw0rd");
+//     // Send command and receive "250" response reply or error is thrown
+//     await socket.sendCommand(commandText);
 //     // Register or deregister for "650" notifications
 //     // that match regex
 //     socket.addNotificationCallback(regex, callback);
 //     socket.removeNotificationCallback(callback);
 //     // Close the socket permanently
 //     socket.close();
-io.controlSocket = function (ipcFile, host, port, password, onError) {
-  // Produce a callback dispatcher for Tor messages.
-  let mainDispatcher = io.callbackDispatcher(),
-      // Open the socket and convert format to Tor messages.
-      socket = io.asyncSocket(ipcFile, host, port,
-                              io.onDataFromOnLine(
-                                   io.onLineFromOnMessage(mainDispatcher.pushMessage)),
-                              onError),
-      // Controllers should send commands terminated by CRLF.
-      writeLine = function (text) { socket.write(text + "\r\n"); },
-      // Create a sendCommand method from writeLine.
-      sendCommand = io.matchRepliesToCommands(writeLine, mainDispatcher),
-      // Create a secondary callback dispatcher for Tor notification messages.
-      notificationDispatcher = io.callbackDispatcher();
-  // Pass asynchronous notifications to notification dispatcher.
-  mainDispatcher.addCallback(/^650/, notificationDispatcher.pushMessage);
+io.controlSocket = async function (ipcFile, host, port, password) {
+  let socket = new AsyncSocket(ipcFile, host, port);
+  let controlSocket = new ControlSocket(socket);
+
   // Log in to control port.
-  sendCommand("authenticate " + (password || "")).catch(onError);
+  await controlSocket.sendCommand("authenticate " + (password || ""));
   // Activate needed events.
-  sendCommand("setevents stream").catch(onError);
-  return { close : socket.close, sendCommand : sendCommand,
-           addNotificationCallback : notificationDispatcher.addCallback,
-           removeNotificationCallback : notificationDispatcher.removeCallback };
+  await controlSocket.sendCommand("setevents stream");
+
+  return controlSocket;
 };
 
 // ## utils
@@ -684,13 +775,11 @@ let tor = {};
 // redundant instantiation of control sockets.
 tor.controllerCache = new Map();
 
-// __tor.controller(ipcFile, host, port, password, onError)__.
+// __tor.controller(ipcFile, host, port, password)__.
 // Creates a tor controller at the given ipcFile or host and port, with the
 // given password.
-// onError returns asynchronously whenever a connection error occurs.
-tor.controller = function (ipcFile, host, port, password, onError) {
-  let socket = io.controlSocket(ipcFile, host, port, password, onError),
-      isOpen = true;
+tor.controller = async function (ipcFile, host, port, password) {
+  let socket = await io.controlSocket(ipcFile, host, port, password);
   return { getInfo : key => info.getInfo(socket, key),
            getConf : key => info.getConf(socket, key),
            onionAuthViewKeys : () => onionAuth.viewKeys(socket),
@@ -701,8 +790,8 @@ tor.controller = function (ipcFile, host, port, password, onError) {
                                onionAuth.remove(socket, hsAddress),
            watchEvent : (type, filter, onData, raw=false) =>
                           event.watchEvent(socket, type, filter, onData, raw),
-           isOpen : () => isOpen,
-           close : () => { isOpen = false; socket.close(); },
+           isOpen : () => socket.isOpen(),
+           close : () => { socket.close(); },
            sendCommand: cmd => socket.sendCommand(cmd),
          };
 };
@@ -722,23 +811,24 @@ var configureControlPortModule = function (ipcFile, host, port, password) {
   controlPortInfo.password = password;
 };
 
-// __controller(onError)__.
+// __controller(avoidCache)__.
 // Instantiates and returns a controller object that is connected and
 // authenticated to a Tor ControlPort using the connection parameters
 // provided in the most recent call to configureControlPortModule(), if
 // the controller doesn't yet exist. Otherwise returns the existing
-// controller to the given ipcFile or host:port.
-// onError is called with an error object as its single argument whenever
-// an error occurs. Example:
+// controller to the given ipcFile or host:port. Throws on error.
 //
-//     // Get the controller
-//     let c = controller(
-//                    function (error) { console.log(error.message || error); });
+// Example:
+//
+//     // Get a new controller
+//     const avoidCache = true;
+//     let c = controller(avoidCache);
 //     // Send command and receive `250` reply or error message in a promise:
 //     let replyPromise = c.getInfo("ip-to-country/16.16.16.16");
 //     // Close the controller permanently
 //     c.close();
-var controller = function (onError, avoidCache) {
+var controller = async function (avoidCache) {
+
   if (!controlPortInfo.ipcFile && !controlPortInfo.host)
     throw new Error("Please call configureControlPortModule first");
 
@@ -748,18 +838,17 @@ var controller = function (onError, avoidCache) {
 
   // constructor shorthand
   const newTorController =
-    () => {
-      return tor.controller(
+    async () => {
+      return await tor.controller(
         controlPortInfo.ipcFile,
         controlPortInfo.host,
         controlPortInfo.port,
-        controlPortInfo.password,
-        onError);
+        controlPortInfo.password);
     };
 
   // avoid cache so always return a new controller
   if (avoidCache) {
-    return newTorController();
+    return await newTorController();
   }
 
   // first check our cache and see if we already have one
@@ -769,10 +858,44 @@ var controller = function (onError, avoidCache) {
   }
 
   // create a new one and store in the map
-  cachedController = newTorController();
+  cachedController = await newTorController();
+  // overwrite the close() function to prevent consumers from closing a shared/cached controller
+  cachedController.close = () => {
+    throw new Error("May not close cached Tor Controller as it may be in use");
+  };
+
   tor.controllerCache.set(dest, cachedController);
   return cachedController;
 };
 
+// __wait_for_controller(avoidCache)
+// Same as controller() function, but explicitly waits until there is a tor daemon
+// to connect to (either launched by tor-launcher, or if we have an existing system
+// tor daemon)
+var wait_for_controller = async function(avoidCache) {
+  // if tor process is running (either ours or system) immediately return controller
+  if (!TorProtocolService.ownsTorDaemon ||
+      TorProtocolService.torProcessStatus == TorProcessStatus.Running) {
+      return await controller(avoidCache);
+  }
+
+  // otherwise we must wait for tor to finish launching before resolving
+  return new Promise((resolve, reject) => {
+    let observer = {
+      observe : async (subject, topic, data) => {
+        if (topic === TorTopics.ProcessIsReady) {
+          try {
+            resolve(await controller(avoidCache));
+          } catch (err) {
+            reject(err);
+          }
+          Services.obs.removeObserver(observer, TorTopics.ProcessIsReady);
+        }
+      },
+    };
+    Services.obs.addObserver(observer, TorTopics.ProcessIsReady);
+  });
+};
+
 // Export functions for external use.
-var EXPORTED_SYMBOLS = ["configureControlPortModule", "controller"];
+var EXPORTED_SYMBOLS = ["configureControlPortModule", "controller", "wait_for_controller"];



More information about the tbb-commits mailing list