[tor-commits] [stegotorus/master] Defer connection and circuit deallocation to a stable point.

zwol at torproject.org zwol at torproject.org
Fri Jul 20 23:17:08 UTC 2012


commit f4baab1f5bbd3ae63b457ba82b7d2bea7b54c3cd
Author: Zack Weinberg <zackw at cmu.edu>
Date:   Mon Jun 25 17:24:03 2012 -0700

    Defer connection and circuit deallocation to a stable point.
    
    We were crashing because libevent had internally queued up connection
    and/or circuit events and would go ahead and fire them even though the
    associated objects had been deallocated.  If we instead disable all
    events on to-be-deallocated connections/circuits, wait for all of
    the current batch of events to be processed, and _then_ deallocate,
    the crash doesn't happen.
    
    As a consequence we can no longer test protocol code from the unit
    test harness; this is no great loss as we were only testing the null
    protocol that way, and the integration tests cover it adequately.
    
    Also fixes a bug in the integration test suite, where we could fail to
    wait for a subprocess.
---
 Makefile.am                   |    5 +-
 src/audit-globals.sh          |    7 +-
 src/connections.cc            |  222 ++++++++++++++++++++++++++++++-----------
 src/connections.h             |   42 ++++++--
 src/main.cc                   |   21 ++--
 src/main.h                    |    9 --
 src/network.cc                |   35 ++++---
 src/protocol.h                |    2 +
 src/protocol/chop.cc          |   79 ++++++++------
 src/protocol/null.cc          |   41 +++-----
 src/test/itestlib.py          |    5 +-
 src/test/unittest.cc          |   99 +------------------
 src/test/unittest.h           |   40 --------
 src/test/unittest_config.cc   |   87 ----------------
 src/test/unittest_transfer.cc |  114 ---------------------
 15 files changed, 292 insertions(+), 516 deletions(-)

diff --git a/Makefile.am b/Makefile.am
index 513e848..174e09c 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -56,11 +56,9 @@ stegotorus_SOURCES = \
 UTGROUPS = \
 	src/test/unittest_base64.cc \
 	src/test/unittest_compression.cc \
-	src/test/unittest_config.cc \
 	src/test/unittest_crypt.cc \
 	src/test/unittest_pdfsteg.cc \
-	src/test/unittest_socks.cc \
-	src/test/unittest_transfer.cc
+	src/test/unittest_socks.cc
 
 unittests_SOURCES = \
 	src/test/tinytest.cc \
@@ -75,7 +73,6 @@ noinst_HEADERS = \
 	src/connections.h \
 	src/crypt.h \
 	src/listener.h \
-	src/main.h \
 	src/protocol.h \
 	src/rng.h \
 	src/socks.h \
diff --git a/src/audit-globals.sh b/src/audit-globals.sh
index 7c28f21..2512bb6 100644
--- a/src/audit-globals.sh
+++ b/src/audit-globals.sh
@@ -29,12 +29,7 @@ sed '
 
   /^compression ZLIB_CEILING$/d
   /^compression ZLIB_UINT_MAX$/d
-  /^connections circuits$/d
-  /^connections closing_all_connections$/d
-  /^connections connections$/d
-  /^connections last_ckt_serial$/d
-  /^connections last_conn_serial$/d
-  /^connections shutting_down$/d
+  /^connections cgs$/d
   /^crypt bctx$/d
   /^crypt crypto_initialized$/d
   /^crypt crypto_errs_initialized$/d
diff --git a/src/connections.cc b/src/connections.cc
index 505299a..99957fe 100644
--- a/src/connections.cc
+++ b/src/connections.cc
@@ -5,7 +5,6 @@
 
 #include "util.h"
 #include "connections.h"
-#include "main.h"
 #include "protocol.h"
 #include "socks.h"
 
@@ -16,75 +15,147 @@
 
 using std::tr1::unordered_set;
 
-/** All active connections.  */
-static unordered_set<conn_t *> connections;
+static void close_cleanup_cb(evutil_socket_t, short, void *);
 
-/** All active circuits.  */
-static unordered_set<circuit_t *> circuits;
+namespace {
+struct conn_global_state
+{
+  /** All active connections.  */
+  unordered_set<conn_t *> connections;
+
+  /** Connections which are to be deallocated after we return to the
+      event loop. */
+  unordered_set<conn_t *> closed_connections;
+
+  /** All active circuits.  */
+  unordered_set<circuit_t *> circuits;
+
+  /** Circuits which are to be deallocated after we return to the
+      event loop. */
+  unordered_set<circuit_t *> closed_circuits;
+
+  /** The one and only event base used by this program.
+      Not owned by this object. */
+  struct event_base *the_event_base;
+
+  /** Low-priority event which fires when there are connections or
+      circuits waiting to be deallocated, and all other pending events
+      have been processed.  This ensures that we don't deallocate
+      connections that have pending events. */
+  struct event *close_cleanup;
+
+  /** Most recently assigned serial numbers for connections and circuits.
+      Note that serial number 0 is never used. These are only used for
+      debugging messages, so we don't worry about them wrapping around. */
+  unsigned int last_conn_serial;
+  unsigned int last_ckt_serial;
+
+  /** True when stegotorus is shutting down: no further connections or
+      circuits may be created, and we break out of the event loop when
+      the last one (of either) is closed. */
+  bool shutting_down;
+
+  conn_global_state(struct event_base *evbase);
+  ~conn_global_state();
+};
+
+conn_global_state::conn_global_state(struct event_base *evbase)
+  : the_event_base(evbase),
+    close_cleanup(0),
+    last_conn_serial(0), last_ckt_serial(0),
+    shutting_down(false)
+{
+  close_cleanup = evtimer_new(evbase, close_cleanup_cb, this);
+  log_assert(close_cleanup);
+  if (event_priority_set(close_cleanup, 1))
+    log_abort("failed to demote priority of close-cleanup event");
+}
 
-/** Most recently assigned serial numbers for connections and circuits.
-    Note that serial number 0 is never used. These are only used for
-    debugging messages, so we don't worry about them wrapping around. */
-static unsigned int last_conn_serial = 0;
-static unsigned int last_ckt_serial = 0;
+conn_global_state::~conn_global_state()
+{
+  log_assert(shutting_down);
+  log_assert(connections.empty());
+  log_assert(closed_connections.empty());
+  log_assert(circuits.empty());
+  log_assert(closed_circuits.empty());
 
-/** True when stegotorus is shutting down: no further connections or
-    circuits may be created, and we break out of the event loop when
-    the last one (of either) is closed. */
-static bool shutting_down;
+  event_free(close_cleanup);
+}
 
-/** True in the middle of a barbaric connection shutdown; prevents
-    maybe_finish_shutdown from shutting down too early. */
-static bool closing_all_connections;
+} // anonymous namespace
 
 static void
-maybe_finish_shutdown(void)
+close_cleanup_cb(evutil_socket_t, short, void *arg)
 {
-  if (!shutting_down || closing_all_connections ||
-      !circuits.empty() || !connections.empty())
+  conn_global_state *cgs = (conn_global_state *)arg;
+
+  if (!cgs->closed_circuits.empty()) {
+    unordered_set<circuit_t *> v;
+    v.swap(cgs->closed_circuits);
+    for (unordered_set<circuit_t *>::iterator i = v.begin();
+         i != v.end(); i++)
+      delete *i;
+  }
+  if (!cgs->closed_connections.empty()) {
+    unordered_set<conn_t *> v;
+    v.swap(cgs->closed_connections);
+    for (unordered_set<conn_t *>::iterator i = v.begin();
+         i != v.end(); i++)
+      delete *i;
+  }
+
+  if (!cgs->shutting_down ||
+      !cgs->circuits.empty() ||
+      !cgs->connections.empty())
     return;
 
-  finish_shutdown();
+  log_debug("finishing shutdown");
+  event_base_loopexit(cgs->the_event_base, NULL);
+  delete cgs;
+  cgs = 0;
+}
+
+static conn_global_state *cgs;
+
+void
+conn_global_init(struct event_base *evbase)
+{
+  cgs = new conn_global_state(evbase);
 }
 
 void
 conn_start_shutdown(int barbaric)
 {
-  shutting_down = true;
+  cgs->shutting_down = true;
 
   if (barbaric) {
-    closing_all_connections = true;
-
-    if (!circuits.empty()) {
+    if (!cgs->circuits.empty()) {
       unordered_set<circuit_t *> v;
-      v.swap(circuits);
+      v.swap(cgs->circuits);
       for (unordered_set<circuit_t *>::iterator i = v.begin();
            i != v.end(); i++)
-        delete *i;
+        (*i)->close();
     }
-    if (!connections.empty()) {
+    if (!cgs->connections.empty()) {
       unordered_set<conn_t *> v;
-      v.swap(connections);
+      v.swap(cgs->connections);
       for (unordered_set<conn_t *>::iterator i = v.begin();
            i != v.end(); i++)
-        delete *i;
+        (*i)->close();
     }
-    closing_all_connections = false;
   }
-
-  maybe_finish_shutdown();
 }
 
 size_t
 conn_count(void)
 {
-  return connections.size();
+  return cgs->connections.size();
 }
 
 size_t
 circuit_count(void)
 {
-  return circuits.size();
+  return cgs->circuits.size();
 }
 
 /**
@@ -96,13 +167,13 @@ conn_create(config_t *cfg, size_t index,
 {
   conn_t *conn;
 
-  log_assert(!shutting_down);
+  log_assert(!cgs->shutting_down);
 
   conn = cfg->conn_create(index);
   conn->buffer = buf;
   conn->peername = peername;
-  conn->serial = ++last_conn_serial;
-  connections.insert(conn);
+  conn->serial = ++cgs->last_conn_serial;
+  cgs->connections.insert(conn);
   log_debug(conn, "new connection");
   return conn;
 }
@@ -112,16 +183,32 @@ conn_create(config_t *cfg, size_t index,
 */
 conn_t::~conn_t()
 {
-  connections.erase(this);
-  log_debug(this, "closing connection; %lu remaining",
-            (unsigned long) connections.size());
-
   if (this->peername)
     free((void *)this->peername);
   if (this->buffer)
     bufferevent_free(this->buffer);
+}
 
-  maybe_finish_shutdown();
+void
+conn_t::close()
+{
+  log_debug(this, "closing connection; %lu remaining",
+            (unsigned long) cgs->connections.size());
+  if (this->buffer)
+    bufferevent_disable(this->buffer, EV_READ|EV_WRITE);
+
+  bool need_event_add =
+    cgs->closed_connections.empty() && cgs->closed_circuits.empty();
+
+  cgs->connections.erase(this);
+  cgs->closed_connections.insert(this);
+
+  if (need_event_add) {
+    struct timeval tv;
+    tv.tv_sec = 0;
+    tv.tv_usec = 0;
+    event_add(cgs->close_cleanup, &tv);
+  }
 }
 
 /** Potentially called during connection construction or destruction. */
@@ -173,11 +260,7 @@ axe_timer_cb(evutil_socket_t, short, void *arg)
   circuit_t *ckt = (circuit_t *)arg;
   log_warn(ckt, "timeout waiting for new connections");
 
-  if (ckt->connected &&
-      evbuffer_get_length(bufferevent_get_output(ckt->up_buffer)) > 0)
-    circuit_do_flush(ckt);
-  else
-    delete ckt;
+  circuit_do_flush(ckt);
 }
 
 circuit_t *
@@ -185,25 +268,21 @@ circuit_create(config_t *cfg, size_t index)
 {
   circuit_t *ckt;
 
-  log_assert(!shutting_down);
+  log_assert(!cgs->shutting_down);
 
   ckt = cfg->circuit_create(index);
-  ckt->serial = ++last_ckt_serial;
+  ckt->serial = ++cgs->last_ckt_serial;
 
   if (cfg->mode == LSN_SOCKS_CLIENT)
     ckt->socks_state = socks_state_new();
 
-  circuits.insert(ckt);
+  cgs->circuits.insert(ckt);
   log_debug(ckt, "new circuit");
   return ckt;
 }
 
 circuit_t::~circuit_t()
 {
-  circuits.erase(this);
-  log_debug(this, "closing circuit; %lu remaining",
-            (unsigned long)circuits.size());
-
   if (this->up_buffer)
     bufferevent_free(this->up_buffer);
   if (this->up_peer)
@@ -214,8 +293,33 @@ circuit_t::~circuit_t()
     event_free(this->flush_timer);
   if (this->axe_timer)
     event_free(this->axe_timer);
+}
+
+void
+circuit_t::close()
+{
+  log_debug(this, "closing circuit; %lu remaining",
+            (unsigned long)cgs->circuits.size());
+
+  if (this->up_buffer)
+    bufferevent_disable(this->up_buffer, EV_READ|EV_WRITE);
+  if (this->flush_timer)
+    event_del(this->flush_timer);
+  if (this->axe_timer)
+    event_del(this->axe_timer);
+
+  bool need_event_add =
+    cgs->closed_connections.empty() && cgs->closed_circuits.empty();
 
-  maybe_finish_shutdown();
+  cgs->circuits.erase(this);
+  cgs->closed_circuits.insert(this);
+
+  if (need_event_add) {
+    struct timeval tv;
+    tv.tv_sec = 0;
+    tv.tv_usec = 0;
+    event_add(cgs->close_cleanup, &tv);
+  }
 }
 
 config_t *
@@ -241,7 +345,7 @@ circuit_send(circuit_t *ckt)
 {
   if (ckt->send()) {
     log_info(ckt, "error during transmit");
-    delete ckt;
+    ckt->close();
   }
 }
 
@@ -256,10 +360,10 @@ circuit_send_eof(circuit_t *ckt)
   ckt->pending_read_eof = true;
   if (ckt->socks_state) {
     log_debug(ckt, "EOF during SOCKS phase");
-    delete ckt;
+    ckt->close();
   } else if (ckt->send_eof()) {
     log_info(ckt, "error during transmit");
-    delete ckt;
+    ckt->close();
   }
 }
 
diff --git a/src/connections.h b/src/connections.h
index ef51c63..39f9984 100644
--- a/src/connections.h
+++ b/src/connections.h
@@ -35,11 +35,15 @@ struct conn_t {
     , pending_write_eof(false)
   {}
 
-  /** Close and deallocate a connection.  If the connection is part of a
-      circuit, disconnect it from the circuit; this may cause the circuit
-      to close as well. */
+  /** Deallocate a connection.  Normally should not be invoked directly,
+      use close() instead. */
   virtual ~conn_t();
 
+  /** Close a connection and schedule it for deallocation.  If the
+      connection is part of a circuit, disconnect it from the circuit;
+      this may cause the circuit to close as well. */
+  virtual void close();
+
   /** Return the upstream circuit for this connection, if there is one.
       NOTE: this is *not* a pure virtual method because it can be called
       legitimately after the subclass destructor has run. */
@@ -53,15 +57,23 @@ struct conn_t {
   struct evbuffer *outbound()
   { return this->buffer ? bufferevent_get_output(this->buffer) : 0; }
 
-  /** Create an upstream circuit for this connection, if it is
-      possible to do so without receiving data from the downstream
-      peer.  If data must be received first, this method should do
-      nothing (but return success), and the |recv| method is
-      responsible for creating the upstream circuit when appropriate.
-      Must return 0 on success, -1 on failure. */
+  /** Called immediately after the TCP handshake completes, for
+      incoming connections to server mode.
+
+      If it is possible to do so without receiving data from the
+      downstream peer, create an upstream circuit for this connection
+      here.  If data must be received first, this method should do
+      nothing (but return success), and the |recv| method should
+      create the upstream circuit when appropriate.  */
   virtual int maybe_open_upstream() = 0;
 
-  /** Perform a connection handshake. Not all protocols have a handshake. */
+  /** Called immediately after the TCP handshake completes, for
+      outgoing connections from client mode.
+
+      If it is necessary to transmit something immediately on new
+      connections, do so from this method.  (It may be more
+      appropriate to wait until the first time the associated circuit
+      wishes to transmit data on this connection.)  */
   virtual int handshake() = 0;
 
   /** Receive data from 'source' and pass it upstream (to the circuit). */
@@ -92,6 +104,9 @@ struct conn_t {
   virtual void transmit_soon(unsigned long timeout) = 0;
 };
 
+/** Prepare global connection-related state.  Succeeds or crashes.  */
+void conn_global_init(struct event_base *);
+
 /** When all currently-open connections and circuits are closed, stop
     the main event loop and exit the program.  If 'barbaric' is true,
     forcibly close them all now, then stop the event loop.
@@ -149,8 +164,15 @@ struct circuit_t {
     , pending_read_eof(false)
     , pending_write_eof(false)
   {}
+
+  /** Deallocate a circuit.  Normally should not be invoked directly,
+      use close() instead.  */
   virtual ~circuit_t();
 
+  /** Close a circuit and schedule it for deallocation.  Will also
+      disconnect and close all connections that belong to this circuit. */
+  virtual void close();
+
   /** Return the configuration that this circuit belongs to. */
   virtual config_t *cfg() const;
 
diff --git a/src/main.cc b/src/main.cc
index 42687e1..56e3d81 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -4,7 +4,6 @@
  */
 
 #include "util.h"
-#include "main.h"
 
 #include "connections.h"
 #include "crypt.h"
@@ -35,7 +34,6 @@
 using std::vector;
 using std::string;
 
-static struct event_base *the_event_base;
 static bool allow_kq = false;
 static bool daemon_mode = false;
 static string pidfile_name;
@@ -65,15 +63,6 @@ start_shutdown(int barbaric, const char *label)
   conn_start_shutdown(barbaric); /* possibly break existing connections */
 }
 
-/** Stop stegotorus's event loop. Final cleanup happens in main().
-    Called by conn_start_shutdown and/or conn_free (see connections.c). */
-void
-finish_shutdown(void)
-{
-  log_debug("finishing shutdown");
-  event_base_loopexit(the_event_base, NULL);
-}
-
 /**
    This is called when we receive an asynchronous signal.
    It figures out the signal type and acts accordingly.
@@ -440,10 +429,18 @@ main(int, const char *const *argv)
   /* Possibly worth doing in the future: activating Windows IOCP and
      telling it how many CPUs to use. */
 
-  the_event_base = event_base_new_with_config(evcfg);
+  struct event_base *the_event_base = event_base_new_with_config(evcfg);
   if (!the_event_base)
     log_abort("failed to initialize networking (evbase)");
 
+  /* Most events are processed at the default priority (0), but
+     connection cleanup events are processed at low priority (1)
+     to ensure that all pending I/O is handled first.  */
+  if (event_base_priority_init(the_event_base, 2))
+    log_abort("failed to initialize networking (priority queues)");
+
+  conn_global_init(the_event_base);
+
   /* ASN should this happen only when SOCKS is enabled? */
   if (init_evdns_base(the_event_base))
     log_abort("failed to initialize DNS resolver");
diff --git a/src/main.h b/src/main.h
deleted file mode 100644
index 5786569..0000000
--- a/src/main.h
+++ /dev/null
@@ -1,9 +0,0 @@
-/* Copyright 2011 Nick Mathewson, George Kadianakis
- * See LICENSE for other credits and copying information
- */
-#ifndef MAIN_H
-#define MAIN_H
-
-void finish_shutdown(void);
-
-#endif
diff --git a/src/network.cc b/src/network.cc
index 71d5b16..7a8f4ce 100644
--- a/src/network.cc
+++ b/src/network.cc
@@ -220,14 +220,14 @@ server_listener_cb(struct evconnlistener *, evutil_socket_t fd,
   /* If appropriate at this point, connect to upstream. */
   if (conn->maybe_open_upstream() < 0) {
     log_debug(conn, "error opening upstream circuit");
-    delete conn;
+    conn->close();
     return;
   }
 
   /* Queue handshake, if any. */
   if (conn->handshake() < 0) {
     log_debug(conn, "error during handshake");
-    delete conn;
+    conn->close();
     return;
   }
 
@@ -270,7 +270,7 @@ socks_read_cb(struct bufferevent *bev, void *arg)
   if (socks_ret == SOCKS_INCOMPLETE)
     return; /* need to read more data. */
   else if (socks_ret == SOCKS_BROKEN)
-    delete ckt; /* XXXX send socks reply */
+    ckt->close(); /* XXXX send socks reply */
   else if (socks_ret == SOCKS_CMD_NOT_CONNECT) {
     bufferevent_enable(bev, EV_WRITE);
     bufferevent_disable(bev, EV_READ);
@@ -314,7 +314,7 @@ downstream_read_cb(struct bufferevent *bev, void *arg)
 
   if (down->recv()) {
     log_debug(down, "error during receive");
-    delete down;
+    down->close();
   }
 }
 
@@ -346,9 +346,9 @@ upstream_event_cb(struct bufferevent *, short what, void *arg)
       /* Upstream is done sending us data. */
       circuit_send_eof(ckt);
       if (ckt->read_eof && ckt->write_eof)
-        delete ckt;
+        ckt->close();
     } else {
-      delete ckt;
+      ckt->close();
     }
   } else {
     /* We should never get BEV_EVENT_CONNECTED here.
@@ -390,9 +390,9 @@ downstream_event_cb(struct bufferevent *bev, short what, void *arg)
       /* Peer is done sending us data. */
       conn->recv_eof();
       if (conn->read_eof && conn->write_eof)
-        delete conn;
+        conn->close();
     } else {
-      delete conn;
+      conn->close();
     }
   } else {
     /* We should never get BEV_EVENT_CONNECTED here.
@@ -423,7 +423,7 @@ upstream_flush_cb(struct bufferevent *bev, void *arg)
       ckt->write_eof = true;
     }
     if (ckt->read_eof && ckt->write_eof)
-      delete ckt;
+      ckt->close();
   }
 }
 
@@ -451,7 +451,7 @@ downstream_flush_cb(struct bufferevent *bev, void *arg)
       conn->write_eof = true;
     }
     if (conn->read_eof && conn->write_eof)
-      delete conn;
+      conn->close();
   }
 }
 
@@ -515,7 +515,7 @@ downstream_connect_cb(struct bufferevent *bev, short what, void *arg)
     /* Queue handshake, if any. */
     if (conn->handshake() < 0) {
       log_debug(conn, "error during handshake");
-      delete conn;
+      conn->close();
       return;
     }
 
@@ -567,7 +567,7 @@ downstream_socks_connect_cb(struct bufferevent *bev, short what, void *arg)
       socks_send_reply(socks, bufferevent_get_output(ckt->up_buffer), err);
       circuit_do_flush(ckt);
     } else {
-      delete ckt;
+      ckt->close();
     }
     return;
   }
@@ -604,7 +604,7 @@ downstream_socks_connect_cb(struct bufferevent *bev, short what, void *arg)
     /* Queue handshake, if any. */
     if (conn->handshake()) {
       log_debug(conn, "error during handshake");
-      delete conn;
+      conn->close();
       return;
     }
 
@@ -726,11 +726,11 @@ create_outbound_connections(circuit_t *ckt, bool is_socks)
 
   if (n == 0) {
     log_warn(ckt, "no target addresses available");
-    delete ckt;
+    ckt->close();
   }
   if (any_successes == 0) {
     log_warn(ckt, "no outbound connections were successful");
-    delete ckt;
+    ckt->close();
   }
 }
 
@@ -785,7 +785,7 @@ create_outbound_connections_socks(circuit_t *ckt)
 
  failure:
   /* XXXX send socks reply */
-  delete ckt;
+  ckt->close();
   if (buf)
     bufferevent_free(buf);
 }
@@ -815,6 +815,7 @@ conn_do_flush(conn_t *conn)
   if (remain == 0)
     downstream_flush_cb(conn->buffer, conn);
   else
-    log_debug(conn, "flushing %lu bytes to peer [enabled=%x]", (unsigned long)remain,
+    log_debug(conn, "flushing %lu bytes to peer [enabled=%x]",
+              (unsigned long)remain,
               bufferevent_get_enabled(conn->buffer));
 }
diff --git a/src/protocol.h b/src/protocol.h
index a2e467d..874c7ac 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -128,6 +128,7 @@ extern const proto_module *const supported_protos[];
 #define CONN_DECLARE_METHODS(mod)                       \
   mod##_conn_t();                                       \
   virtual ~mod##_conn_t();                              \
+  virtual void close();                                 \
   virtual circuit_t *circuit() const;                   \
   virtual int  maybe_open_upstream();                   \
   virtual int  handshake();                             \
@@ -149,6 +150,7 @@ extern const proto_module *const supported_protos[];
 #define CIRCUIT_DECLARE_METHODS(mod)            \
   mod##_circuit_t();                            \
   virtual ~mod##_circuit_t();                   \
+  virtual void close();                         \
   virtual config_t *cfg() const;                \
   virtual void add_downstream(conn_t *);        \
   virtual void drop_downstream(conn_t *);       \
diff --git a/src/protocol/chop.cc b/src/protocol/chop.cc
index bb8e906..9109295 100644
--- a/src/protocol/chop.cc
+++ b/src/protocol/chop.cc
@@ -612,15 +612,15 @@ chop_circuit_t::chop_circuit_t()
 
 chop_circuit_t::~chop_circuit_t()
 {
-  // Attempt to prevent events from firing on partially or completely
-  // torn down circuits.  (This shouldn't happen, but it seems to.)
-  if (this->up_buffer)
-    bufferevent_disable(this->up_buffer, EV_READ|EV_WRITE);
-  if (this->flush_timer)
-    event_del(this->flush_timer);
-  if (this->axe_timer)
-    event_del(this->axe_timer);
+  delete send_crypt;
+  delete send_hdr_crypt;
+  delete recv_crypt;
+  delete recv_hdr_crypt;
+}
 
+void
+chop_circuit_t::close()
+{
   if (!sent_fin || !received_fin || !upstream_eof) {
     log_warn(this, "destroying active circuit: fin%c%c eof%c ds=%lu",
              sent_fin ? '+' : '-', received_fin ? '+' : '-',
@@ -632,16 +632,9 @@ chop_circuit_t::~chop_circuit_t()
        i != downstreams.end(); i++) {
     chop_conn_t *conn = *i;
     conn->upstream = NULL;
-    if (evbuffer_get_length(conn->outbound()) > 0)
-      conn_do_flush(conn);
-    else
-      delete conn;
+    conn_do_flush(conn);
   }
-
-  delete send_crypt;
-  delete send_hdr_crypt;
-  delete recv_crypt;
-  delete recv_hdr_crypt;
+  downstreams.clear();
 
   // The IDs for old circuits are preserved for a while (at present,
   // indefinitely; FIXME: purge them on a timer) against the
@@ -657,6 +650,8 @@ chop_circuit_t::~chop_circuit_t()
   log_assert(out != config->circuits.end());
   log_assert(out->second == this);
   out->second = NULL;
+
+  circuit_t::close();
 }
 
 config_t *
@@ -707,12 +702,7 @@ chop_circuit_t::drop_downstream(chop_conn_t *conn)
   // to enable further transmissions from the server.
   if (downstreams.empty()) {
     if (sent_fin && received_fin) {
-      if (evbuffer_get_length(bufferevent_get_output(up_buffer)) > 0)
-        // this may already have happened, but there's no harm in
-        // doing it again
-        circuit_do_flush(this);
-      else
-        delete this;
+      circuit_do_flush(this);
     } else if (config->mode == LSN_SIMPLE_SERVER) {
       circuit_arm_axe_timer(this, axe_interval());
     } else {
@@ -1126,6 +1116,7 @@ chop_circuit_t::check_for_eof()
   // If we're at EOF both ways, close all connections, sending first
   // if necessary.
   if (sent_fin && received_fin) {
+    log_debug(this, "sent and received FIN");
     circuit_disarm_flush_timer(this);
     for (unordered_set<chop_conn_t *>::iterator i = downstreams.begin();
          i != downstreams.end(); i++) {
@@ -1138,8 +1129,12 @@ chop_circuit_t::check_for_eof()
 
   // If we're the client we have to keep trying to talk as long as we
   // haven't both sent and received a FIN, or we might deadlock.
-  else if (config->mode != LSN_SIMPLE_SERVER)
+  else if (config->mode != LSN_SIMPLE_SERVER) {
+    log_debug(this, "client arming flush timer%s%s",
+              sent_fin ? " (sent FIN)" : "",
+              received_fin ? " (received FIN)": "");
     circuit_arm_flush_timer(this, flush_interval());
+  }
 
   return 0;
 }
@@ -1167,20 +1162,25 @@ chop_conn_t::chop_conn_t()
 
 chop_conn_t::~chop_conn_t()
 {
-  // Attempt to prevent events from firing on partially or completely
-  // torn down connections.  (This shouldn't happen, but it seems to.)
-  if (this->buffer)
-    bufferevent_disable(this->buffer, EV_READ|EV_WRITE);
-
   if (this->must_send_timer)
     event_free(this->must_send_timer);
-  if (upstream)
-    upstream->drop_downstream(this);
   if (steg)
     delete steg;
   evbuffer_free(recv_pending);
 }
 
+void
+chop_conn_t::close()
+{
+  if (this->must_send_timer)
+    event_del(this->must_send_timer);
+
+  if (upstream)
+    upstream->drop_downstream(this);
+
+  conn_t::close();
+}
+
 circuit_t *
 chop_conn_t::circuit() const
 {
@@ -1266,7 +1266,7 @@ chop_conn_t::recv_handshake()
     }
     if (circuit_open_upstream(ck)) {
       log_warn(this, "failed to begin upstream connection");
-      delete ck;
+      ck->close();
       return -1;
     }
     log_debug(this, "created new circuit to %s", ck->up_peer);
@@ -1290,7 +1290,19 @@ chop_conn_t::recv()
     return 0;
 
   if (!upstream) {
-    // Try to receive a handshake.
+    if (config->mode != LSN_SIMPLE_SERVER) {
+      // We're the client.  Client connections start out attached to a
+      // circuit; therefore this is a server-to-client message that
+      // crossed with the teardown of the circuit it belonged to, and
+      // we don't have the decryption keys for it anymore.
+      // By construction it must be chaff, so just throw it away.
+      log_debug(this, "discarding chaff after circuit closed");
+      log_assert(!must_send_p());
+      conn_do_flush(this);
+      return 0;
+    }
+
+    // We're the server. Try to receive a handshake.
     if (recv_handshake())
       return -1;
 
@@ -1304,7 +1316,6 @@ chop_conn_t::recv()
     // connection, possibly sending a response if the cover protocol
     // requires one.
     if (!upstream) {
-      evbuffer_drain(recv_pending, evbuffer_get_length(recv_pending));
       if (must_send_p())
         send();
       conn_do_flush(this);
diff --git a/src/protocol/null.cc b/src/protocol/null.cc
index e195cc9..461fffe 100644
--- a/src/protocol/null.cc
+++ b/src/protocol/null.cc
@@ -135,24 +135,20 @@ null_circuit_t::null_circuit_t()
 
 null_circuit_t::~null_circuit_t()
 {
-  // Attempt to prevent events from firing on partially or completely
-  // torn down circuits.  (This shouldn't happen, but it seems to.)
-  if (this->up_buffer)
-    bufferevent_disable(this->up_buffer, EV_READ|EV_WRITE);
-  if (this->flush_timer)
-    event_del(this->flush_timer);
-  if (this->axe_timer)
-    event_del(this->axe_timer);
+}
 
+void
+null_circuit_t::close()
+{
   if (downstream) {
     /* break the circular reference before deallocating the
        downstream connection */
     downstream->upstream = NULL;
-    if (evbuffer_get_length(downstream->outbound()) > 0)
-      conn_do_flush(downstream);
-    else
-      delete downstream;
+    conn_do_flush(downstream);
+    downstream = NULL;
   }
+
+  circuit_t::close();
 }
 
 config_t *
@@ -192,13 +188,7 @@ null_circuit_t::drop_downstream(conn_t *cn)
             this->serial, conn->serial, conn->peername);
   this->downstream = NULL;
   conn->upstream = NULL;
-
-  if (evbuffer_get_length(bufferevent_get_output(this->up_buffer)) > 0)
-    /* this may already have happened, but there's no harm in
-       doing it again */
-    circuit_do_flush(this);
-  else
-    delete this;
+  circuit_do_flush(this);
 }
 
 /* Send data from the upstream buffer. */
@@ -206,7 +196,8 @@ int
 null_circuit_t::send()
 {
   log_debug(this, "sending %lu bytes",
-            (unsigned long)evbuffer_get_length(bufferevent_get_input(this->up_buffer)));
+            (unsigned long)
+            evbuffer_get_length(bufferevent_get_input(this->up_buffer)));
 
   return evbuffer_add_buffer(this->downstream->outbound(),
                              bufferevent_get_input(this->up_buffer));
@@ -242,13 +233,15 @@ null_conn_t::null_conn_t()
 
 null_conn_t::~null_conn_t()
 {
-  // Attempt to prevent events from firing on partially or completely
-  // torn down connections.  (This shouldn't happen, but it seems to.)
-  if (this->buffer)
-    bufferevent_disable(this->buffer, EV_READ|EV_WRITE);
+}
 
+void
+null_conn_t::close()
+{
   if (this->upstream)
     this->upstream->drop_downstream(this);
+
+  conn_t::close();
 }
 
 /* Only used by connection callbacks */
diff --git a/src/test/itestlib.py b/src/test/itestlib.py
index 7378c3d..6906e5c 100644
--- a/src/test/itestlib.py
+++ b/src/test/itestlib.py
@@ -97,9 +97,10 @@ class Stegotorus(subprocess.Popen):
     def check_completion(self, label, force_stderr=False):
         self.stdin.close()
         self.communicator.join()
-        self.timeout.cancel()
+        if self.poll() is not None:
+            self.timeout.cancel()
         self.timeout.join()
-        self.poll()
+        self.wait()
 
         report = ""
 
diff --git a/src/test/unittest.cc b/src/test/unittest.cc
index 958c82c..08383dc 100644
--- a/src/test/unittest.cc
+++ b/src/test/unittest.cc
@@ -4,95 +4,8 @@
  */
 
 #include "util.h"
-#include "unittest.h"
-#include "main.h"
-
 #include "crypt.h"
-#include "connections.h"
-#include "protocol.h"
-
-#include <event2/event.h>
-#include <event2/bufferevent.h>
-
-/* Generic test fixture for protocol tests (currently used by transfer). */
-
-static void *
-setup_proto_test_state(const struct testcase_t *tcase)
-{
-  struct proto_test_state *s =
-    (struct proto_test_state *)xzalloc(sizeof(struct proto_test_state));
-  const struct proto_test_args *args =
-    (struct proto_test_args *)tcase->setup_data;
-  struct bufferevent *pairs[3][2];
-  int i;
-
-  s->args = args;
-  s->base = event_base_new();
-
-  for (i = 0; i < 3; i++) {
-    bufferevent_pair_new(s->base, 0, pairs[i]);
-    bufferevent_enable(pairs[i][0], EV_READ|EV_WRITE);
-    bufferevent_enable(pairs[i][1], EV_READ|EV_WRITE);
-  }
-
-  s->cfg_client = config_create(args->nopts_client, args->opts_client);
-  s->cfg_server = config_create(args->nopts_server, args->opts_server);
-  s->cfg_client->base = s->base;
-  s->cfg_server->base = s->base;
-
-  s->conn_client = conn_create(s->cfg_client, 0, pairs[0][0],
-                               xstrdup("to-server"));
-  s->conn_server = conn_create(s->cfg_server, 0, pairs[0][1],
-                               xstrdup("to-client"));
-
-  s->buf_client = pairs[1][0];
-  s->buf_server = pairs[2][0];
-
-  s->ckt_client = circuit_create(s->cfg_client, 0);
-  s->ckt_server = circuit_create(s->cfg_server, 0);
-
-  circuit_add_upstream(s->ckt_client, pairs[1][1],
-                       xstrdup("to-harness-client"));
-  circuit_add_upstream(s->ckt_server, pairs[2][1],
-                       xstrdup("to-harness-server"));
-
-  s->ckt_client->add_downstream(s->conn_client);
-  s->ckt_server->add_downstream(s->conn_server);
-
-  return s;
-}
-
-static int
-cleanup_proto_test_state(const struct testcase_t *, void *state)
-{
-  struct proto_test_state *s = (struct proto_test_state *)state;
-
-  /* We don't want to trigger circuit_*_shutdown, so dissociate the circuits
-     from their connections and close each separately. */
-  s->ckt_client->drop_downstream(s->conn_client);
-  s->ckt_server->drop_downstream(s->conn_server);
-
-  delete s->conn_client;
-  delete s->conn_server;
-
-  delete s->cfg_client;
-  delete s->cfg_server;
-
-  bufferevent_free(s->buf_client);
-  bufferevent_free(s->buf_server);
-  event_base_free(s->base);
-
-  free(state);
-  return 1;
-}
-
-const struct testcase_setup_t proto_test_fixture =
-  { setup_proto_test_state, cleanup_proto_test_state };
-
-void
-finish_shutdown(void)
-{
-}
+#include "unittest.h"
 
 int
 main(int argc, const char **argv)
@@ -110,18 +23,8 @@ main(int argc, const char **argv)
 
   init_crypto();
 
-  /* Ugly method to fix a Windows problem:
-     http://archives.seul.org/libevent/users/Oct-2010/msg00049.html */
-#ifdef _WIN32
-  {
-    WSADATA wsaData;
-    WSAStartup(0x101, &wsaData);
-  }
-#endif
-
   rv = tinytest_main(argc, argv, unittest_groups);
 
-  conn_start_shutdown(1);
   free_crypto();
 
   return rv;
diff --git a/src/test/unittest.h b/src/test/unittest.h
index 81c873a..b767d0b 100644
--- a/src/test/unittest.h
+++ b/src/test/unittest.h
@@ -8,46 +8,6 @@
 
 #include "tinytest_macros.h"
 
-/* Test fixture shared by most protocol tests. */
-
-struct proto_test_state
-{
-  struct event_base *base;
-  struct bufferevent *buf_client;
-  struct bufferevent *buf_server;
-
-  config_t *cfg_client;
-  config_t *cfg_server;
-
-  circuit_t *ckt_client;
-  circuit_t *ckt_server;
-
-  conn_t *conn_client;
-  conn_t *conn_server;
-
-  const struct proto_test_args *args;
-};
-
-extern const struct testcase_setup_t proto_test_fixture;
-
-/* Any test case that uses the above fixture must provide one of these
-   as its setup_data. */
-struct proto_test_args
-{
-  /* These fields are mandatory. */
-  size_t nopts_client;
-  size_t nopts_server;
-  const char *const *opts_client;
-  const char *const *opts_server;
-
-  /* These fields are only used by "transfer" test cases and may be 0/NULL
-     otherwise. */
-  size_t len_c2s_on_wire;
-  size_t len_s2c_on_wire;
-  const char *c2s_on_wire;
-  const char *s2c_on_wire;
-};
-
 /* Master group list - defined in unitgrplist.c (which is generated). */
 extern const struct testgroup_t unittest_groups[];
 
diff --git a/src/test/unittest_config.cc b/src/test/unittest_config.cc
deleted file mode 100644
index 72b39bd..0000000
--- a/src/test/unittest_config.cc
+++ /dev/null
@@ -1,87 +0,0 @@
-/* Copyright 2011 Nick Mathewson, George Kadianakis
- * Copyright 2011 SRI International
- * See LICENSE for other credits and copying information
- */
-
-#include "util.h"
-#include "unittest.h"
-
-#include "protocol.h"
-
-struct option_parsing_case {
-  config_t *result;
-  short should_succeed;
-  short n_opts;
-  const char *const opts[6];
-};
-
-static void
-test_config(void *cases)
-{
-  struct option_parsing_case *c;
-  for (c = (struct option_parsing_case *)cases; c->n_opts; c++) {
-    c->result = config_create(c->n_opts, c->opts);
-    if (c->should_succeed)
-      tt_ptr_op(c->result, !=, NULL);
-    else
-      tt_ptr_op(c->result, ==, NULL);
-  }
- end:;
-}
-
-static void *
-setup_test_config(const struct testcase_t *tc)
-{
-  /* Suppress logs for the duration of this test. */
-  log_set_method(LOG_METHOD_NULL, NULL);
-
-  /* Forward the test data to the actual test function. */
-  return tc->setup_data;
-}
-
-static int
-cleanup_test_config(const struct testcase_t *, void *state)
-{
-  struct option_parsing_case *c;
-  for (c = (struct option_parsing_case *)state; c->n_opts; c++)
-    if (c->result)
-      delete c->result;
-
-  /* Reactivate logging */
-  log_set_method(LOG_METHOD_STDERR, NULL);
-  return 1;
-}
-
-static const struct testcase_setup_t config_fixture =
-  { setup_test_config, cleanup_test_config };
-
-static struct option_parsing_case oc_null[] = {
-  /* wrong number of options */
-  { 0, 0, 1, {"null"} },
-  { 0, 0, 2, {"null", "client"} },
-  { 0, 0, 3, {"null", "client", "127.0.0.1:5552"} },
-  { 0, 0, 3, {"null", "server", "127.0.0.1:5552"} },
-  { 0, 0, 4, {"null", "socks", "127.0.0.1:5552", "192.168.1.99:11253"} },
-  /* unrecognized mode */
-  { 0, 0, 3, {"null", "floodcontrol", "127.0.0.1:5552" } },
-  { 0, 0, 4, {"null", "--frobozz", "client", "127.0.0.1:5552"} },
-  { 0, 0, 4, {"null", "client", "--frobozz", "127.0.0.1:5552"} },
-  /* bad address */
-  { 0, 0, 3, {"null", "socks", "@:5552"} },
-  { 0, 0, 3, {"null", "socks", "127.0.0.1:notanumber"} },
-  /* should succeed */
-  { 0, 1, 4, {"null", "client", "127.0.0.1:5552", "192.168.1.99:11253" } },
-  { 0, 1, 4, {"null", "client", "127.0.0.1", "192.168.1.99:11253" } },
-  { 0, 1, 4, {"null", "server", "127.0.0.1:5552", "192.168.1.99:11253" } },
-  { 0, 1, 3, {"null", "socks", "127.0.0.1:5552" } },
-
-  { 0, 0, 0, {0} }
-};
-
-#define T(name) \
-  { #name, test_config, 0, &config_fixture, oc_##name }
-
-struct testcase_t config_tests[] = {
-  T(null),
-  END_OF_TESTCASES
-};
diff --git a/src/test/unittest_transfer.cc b/src/test/unittest_transfer.cc
deleted file mode 100644
index de17ed0..0000000
--- a/src/test/unittest_transfer.cc
+++ /dev/null
@@ -1,114 +0,0 @@
-/* Copyright 2011 Nick Mathewson, George Kadianakis
- * Copyright 2011 SRI International
- * See LICENSE for other credits and copying information
- */
-
-#include "util.h"
-#include "unittest.h"
-#include "connections.h"
-
-#include <event2/buffer.h>
-
-static const char msg1[] =
-  "this is a 54-byte message passed from client to server";
-static const char msg2[] =
-  "this is a 55-byte message passed from server to client!";
-
-#define SLEN(s) (sizeof(s)-1)
-
-static void
-test_transfer(void *state)
-{
-  struct proto_test_state *s = (struct proto_test_state *)state;
-  const struct proto_test_args *a = s->args;
-
-  /* Handshake */
-  tt_int_op(0, ==, s->conn_client->handshake());
-  tt_int_op(0, ==, s->conn_server->recv());
-  tt_int_op(0, ==, s->conn_server->handshake());
-  tt_int_op(0, ==, s->conn_client->recv());
-  /* End of Handshake */
-
-  /* client -> server */
-  evbuffer_add(bufferevent_get_output(s->buf_client), msg1, SLEN(msg1));
-  circuit_send(s->ckt_client);
-  tt_int_op(0, ==, evbuffer_get_length(bufferevent_get_output(s->buf_client)));
-  tt_int_op(a->len_c2s_on_wire, ==,
-            evbuffer_get_length(s->conn_server->inbound()));
-  tt_mem_op(a->c2s_on_wire, ==,
-            evbuffer_pullup(s->conn_server->inbound(),
-                            a->len_c2s_on_wire),
-            a->len_c2s_on_wire);
-
-  s->conn_server->recv();
-  tt_int_op(0, ==, evbuffer_get_length(s->conn_server->inbound()));
-  tt_int_op(SLEN(msg1), ==,
-            evbuffer_get_length(bufferevent_get_input(s->buf_server)));
-  tt_mem_op(msg1, ==,
-            evbuffer_pullup(bufferevent_get_input(s->buf_server), SLEN(msg1)),
-            SLEN(msg1));
-
-  /* server -> client */
-  evbuffer_add(bufferevent_get_output(s->buf_server), msg2, SLEN(msg2));
-  circuit_send(s->ckt_server);
-  tt_int_op(0, ==, evbuffer_get_length(bufferevent_get_output(s->buf_server)));
-  tt_int_op(a->len_s2c_on_wire, ==,
-            evbuffer_get_length(s->conn_client->inbound()));
-  tt_mem_op(a->s2c_on_wire, ==,
-            evbuffer_pullup(s->conn_client->inbound(),
-                            a->len_s2c_on_wire),
-            a->len_s2c_on_wire);
-
-  s->conn_client->recv();
-  tt_int_op(0, ==, evbuffer_get_length(s->conn_client->inbound()));
-  tt_int_op(SLEN(msg2), ==,
-            evbuffer_get_length(bufferevent_get_input(s->buf_client)));
-  tt_mem_op(msg2, ==,
-            evbuffer_pullup(bufferevent_get_input(s->buf_client), SLEN(msg2)),
-            SLEN(msg2));
-
- end:;
-}
-
-#define enc1_null msg1
-#define enc2_null msg2
-
-#if 0 /* temporarily disabled - causes crashes */
-static const char enc1_s_x_http[] =
-    "GET /003600007468697320697320612035342d62797465206d6573736167652070617"
-    "37365642066726f6d20636c69656e7420746f2073657276657200== HTTP/1.1\r\n"
-    "Host: to-server\r\n"
-    "Connection: close\r\n\r\n";
-static const char enc2_s_x_http[] =
-    "HTTP/1.1 200 OK\r\n"
-    "Expires: Thu, 01 Jan 1970 00:00:00 GMT\r\n"
-    "Cache-Control: no-store\r\n"
-    "Connection: close\r\n"
-    "Content-Type: application/octet-stream\r\n"
-    "Content-Length: 60\r\n\r\n"
-    "\x00\x37\x00\x00"
-    "this is a 55-byte message passed from server to client!\x00";
-#endif
-
-static const char *const o_client_null[] =
-  {"null", "socks", "127.0.0.1:1800"};
-
-static const char *const o_server_null[] =
-  {"null", "server", "127.0.0.1:1800", "127.0.0.1:1801"};
-
-#define TA(name)                                                \
-  static const struct proto_test_args tr_##name##_args =        \
-    { ALEN(o_client_##name), ALEN(o_server_##name),             \
-      o_client_##name, o_server_##name,                         \
-      SLEN(enc1_##name), SLEN(enc2_##name),                     \
-      enc1_##name, enc2_##name }
-
-TA(null);
-
-#define T(name) \
-  { #name, test_transfer, 0, &proto_test_fixture, (void *)&tr_##name##_args }
-
-struct testcase_t transfer_tests[] = {
-  T(null),
-  END_OF_TESTCASES
-};





More information about the tor-commits mailing list