[tor-commits] [tor/master] Use circuitmux_t in channels and when relaying cells

andrea at torproject.org andrea at torproject.org
Thu Oct 11 02:05:23 UTC 2012


commit b208539b8047a12fb2f1f794c9932fddd577dfdb
Author: Andrea Shepard <andrea at torproject.org>
Date:   Fri Sep 21 14:46:22 2012 -0700

    Use circuitmux_t in channels and when relaying cells
---
 src/or/channel.c       |   49 ++++++---
 src/or/channel.h       |   33 ++-----
 src/or/channeltls.c    |   17 ++-
 src/or/circuitlist.c   |   38 ++++++-
 src/or/connection_or.c |    2 +-
 src/or/or.h            |   42 ++++++--
 src/or/relay.c         |  283 +++++++++++++++++++++++-------------------------
 src/or/relay.h         |    7 +-
 8 files changed, 258 insertions(+), 213 deletions(-)

diff --git a/src/or/channel.c b/src/or/channel.c
index 334f843..8241556 100644
--- a/src/or/channel.c
+++ b/src/or/channel.c
@@ -19,6 +19,7 @@
 #include "circuitbuild.h"
 #include "circuitlist.h"
 #include "connection_or.h" /* For var_cell_free() */
+#include "circuitmux.h"
 #include "geoip.h"
 #include "nodelist.h"
 #include "relay.h"
@@ -813,9 +814,10 @@ channel_free(channel_t *chan)
 
   channel_clear_remote_end(chan);
 
-  if (chan->active_circuit_pqueue) {
-    smartlist_free(chan->active_circuit_pqueue);
-    chan->active_circuit_pqueue = NULL;
+  if (chan->cmux) {
+    circuitmux_detach_all_circuits(chan->cmux);
+    circuitmux_free(chan->cmux);
+    chan->cmux = NULL;
   }
 
   /* We're in CLOSED or ERROR, so the cell queue is already empty */
@@ -866,7 +868,6 @@ channel_force_free(channel_t *chan)
   if (chan->free) chan->free(chan);
 
   channel_clear_remote_end(chan);
-  smartlist_free(chan->active_circuit_pqueue);
 
   /* We might still have a cell queue; kill it */
   if (chan->incoming_queue) {
@@ -2031,12 +2032,13 @@ channel_flush_some_cells(channel_t *chan, ssize_t num_cells)
         (unlimited ? -1 : num_cells - flushed));
     if (!unlimited && num_cells <= flushed) goto done;
 
-    if (chan->active_circuits) {
+    if (circuitmux_num_cells(chan->cmux) > 0) {
       /* Try to get more cells from any active circuits */
-      num_cells_from_circs =
-        channel_flush_from_first_active_circuit(chan,
-            (unlimited ? MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED :
-                         (num_cells - flushed)));
+      num_cells_from_circs = channel_flush_from_first_active_circuit(
+          chan,
+          (unlimited ?
+             MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED :
+             (num_cells - flushed)));
 
       /* If it claims we got some, process the queue again */
       if (num_cells_from_circs > 0) {
@@ -2227,7 +2229,7 @@ channel_more_to_flush(channel_t *chan)
       smartlist_len(chan->incoming_queue) > 0) return 1;
 
   /* Check if any circuits would like to queue some */
-  if (chan->active_circuits) return 1;
+  if (circuitmux_num_cells(chan->cmux) > 0) return 1;
 
   /* Else no */
   return 0;
@@ -2935,8 +2937,8 @@ channel_is_better(time_t now, channel_t *a, channel_t *b,
    * one that has no circuits is in its grace period.
    */
 
-  a_has_circs = (a->n_circuits > 0);
-  b_has_circs = (b->n_circuits > 0);
+  a_has_circs = (channel_num_circuits(a) > 0);
+  b_has_circs = (channel_num_circuits(b) > 0);
   a_grace = (forgive_new_connections &&
              (now < channel_when_created(a) + NEW_CHAN_GRACE_PERIOD));
   b_grace = (forgive_new_connections &&
@@ -3223,9 +3225,10 @@ channel_dump_statistics(channel_t *chan, int severity)
       " * Channel " U64_FORMAT " has %d active circuits out of"
       " %d in total",
       U64_PRINTF_ARG(chan->global_identifier),
-      (chan->active_circuit_pqueue != NULL) ?
-        smartlist_len(chan->active_circuit_pqueue) : 0,
-      chan->n_circuits);
+      (chan->cmux != NULL) ?
+         circuitmux_num_active_circuits(chan->cmux) : 0,
+      (chan->cmux != NULL) ?
+         circuitmux_num_circuits(chan->cmux) : 0);
 
   /* Describe timestamps */
   log(severity, LD_GENERAL,
@@ -4008,6 +4011,22 @@ channel_matches_target_addr_for_extend(channel_t *chan,
 }
 
 /**
+ * Return the total number of circuits used by a channel
+ *
+ * @param chan Channel to query
+ * @return Number of circuits using this as n_chan or p_chan
+ */
+
+unsigned int
+channel_num_circuits(channel_t *chan)
+{
+  tor_assert(chan);
+
+  return chan->num_n_circuits +
+         chan->num_p_circuits;
+}
+
+/**
  * Set up circuit ID generation
  *
  * This is called when setting up a channel and replaces the old
diff --git a/src/or/channel.h b/src/or/channel.h
index 27fba8f..4d3db41 100644
--- a/src/or/channel.h
+++ b/src/or/channel.h
@@ -10,6 +10,7 @@
 #define _TOR_CHANNEL_H
 
 #include "or.h"
+#include "circuitmux.h"
 
 /* Channel handler function pointer typedefs */
 typedef void (*channel_listener_fn_ptr)(channel_listener_t *, channel_t *);
@@ -99,7 +100,7 @@ struct channel_s {
   int (*matches_target)(channel_t *, const tor_addr_t *);
   /* Write a cell to an open channel */
   int (*write_cell)(channel_t *, cell_t *);
-  /* Write a packed cell to an open channel */
+   /* Write a packed cell to an open channel */
   int (*write_packed_cell)(channel_t *, packed_cell_t *);
   /* Write a variable-length cell to an open channel */
   int (*write_var_cell)(channel_t *, var_cell_t *);
@@ -124,29 +125,8 @@ struct channel_s {
   /* List of queued outgoing cells */
   smartlist_t *outgoing_queue;
 
-  /* Circuit stuff for use by relay.c */
-
-  /*
-   * Double-linked ring of circuits with queued cells waiting for room to
-   * free up on this connection's outbuf.  Every time we pull cells from
-   * a circuit, we advance this pointer to the next circuit in the ring.
-   */
-  struct circuit_t *active_circuits;
-  /*
-   * Priority queue of cell_ewma_t for circuits with queued cells waiting
-   * for room to free up on this connection's outbuf.  Kept in heap order
-   * according to EWMA.
-   *
-   * This is redundant with active_circuits; if we ever decide only to use
-   * the cell_ewma algorithm for choosing circuits, we can remove
-   * active_circuits.
-   */
-  smartlist_t *active_circuit_pqueue;
-  /*
-   * The tick on which the cell_ewma_ts in active_circuit_pqueue last had
-   * their ewma values rescaled.
-   */
-  unsigned active_circuit_pqueue_last_recalibrated;
+  /* Circuit mux for circuits sending on this channel */
+  circuitmux_t *cmux;
 
   /* Circuit ID generation stuff for use by circuitbuild.c */
 
@@ -161,8 +141,8 @@ struct channel_s {
    */
   circid_t next_circ_id;
 
-  /* How many circuits use this connection as p_chan or n_chan? */
-  int n_circuits;
+  /* For how many circuits are we n_chan?  What about p_chan? */
+  unsigned int num_n_circuits, num_p_circuits;
 
   /*
    * True iff this channel shouldn't get any new circs attached to it,
@@ -456,6 +436,7 @@ void channel_mark_client(channel_t *chan);
 int channel_matches_extend_info(channel_t *chan, extend_info_t *extend_info);
 int channel_matches_target_addr_for_extend(channel_t *chan,
                                            const tor_addr_t *target);
+unsigned int channel_num_circuits(channel_t *chan);
 void channel_set_circid_type(channel_t *chan, crypto_pk_t *identity_rcvd);
 void channel_timestamp_client(channel_t *chan);
 
diff --git a/src/or/channeltls.c b/src/or/channeltls.c
index 5d6a7a9..036d14f 100644
--- a/src/or/channeltls.c
+++ b/src/or/channeltls.c
@@ -16,6 +16,7 @@
 #include "or.h"
 #include "channel.h"
 #include "channeltls.h"
+#include "circuitmux.h"
 #include "config.h"
 #include "connection.h"
 #include "connection_or.h"
@@ -127,8 +128,11 @@ channel_tls_connect(const tor_addr_t *addr, uint16_t port,
   if (is_local_addr(addr)) channel_mark_local(chan);
   channel_mark_outgoing(chan);
 
-  chan->active_circuit_pqueue = smartlist_new();
-  chan->active_circuit_pqueue_last_recalibrated = cell_ewma_get_tick();
+  chan->cmux = circuitmux_alloc();
+  /* TODO get rid of this and set policy once we have them
+  chan->cmux->active_circuit_pqueue_last_recalibrated =
+    cell_ewma_get_tick();
+   */
 
   /* Set up or_connection stuff */
   tlschan->conn = connection_or_connect(addr, port, id_digest, tlschan);
@@ -146,7 +150,7 @@ channel_tls_connect(const tor_addr_t *addr, uint16_t port,
   goto done;
 
  err:
-  smartlist_free(chan->active_circuit_pqueue);
+  circuitmux_free(chan->cmux);
   tor_free(tlschan);
   chan = NULL;
 
@@ -260,8 +264,11 @@ channel_tls_handle_incoming(or_connection_t *orconn)
   if (is_local_addr(&(TO_CONN(orconn)->addr))) channel_mark_local(chan);
   channel_mark_incoming(chan);
 
-  chan->active_circuit_pqueue = smartlist_new();
-  chan->active_circuit_pqueue_last_recalibrated = cell_ewma_get_tick();
+  chan->cmux = circuitmux_alloc();
+  /* TODO set cmux policy 
+  chan->active_circuit_pqueue_last_recalibrated =
+    cell_ewma_get_tick();
+   */
 
   /* If we got one, we should register it */
   if (chan) channel_register(chan);
diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c
index cf6020d..bec3dc8 100644
--- a/src/or/circuitlist.c
+++ b/src/or/circuitlist.c
@@ -134,10 +134,20 @@ circuit_set_circid_chan_helper(circuit_t *circ, int direction,
     found = HT_REMOVE(chan_circid_map, &chan_circid_map, &search);
     if (found) {
       tor_free(found);
-      --old_chan->n_circuits;
+      if (direction == CELL_DIRECTION_OUT) {
+        /* One fewer circuits use old_chan as n_chan */
+        --(old_chan->num_n_circuits);
+      } else {
+        /* One fewer circuits use old_chan as p_chan */
+        --(old_chan->num_p_circuits);
+      }
+    }
+
+    /* If we're changing channels, detach the circuit */
+    if (old_chan != chan) {
+      tor_assert(old_chan->cmux);
+      circuitmux_detach_circuit(old_chan->cmux, circ);
     }
-    if (was_active && old_chan != chan)
-      make_circuit_inactive_on_chan(circ, old_chan);
   }
 
   /* Change the values only after we have possibly made the circuit inactive
@@ -161,10 +171,26 @@ circuit_set_circid_chan_helper(circuit_t *circ, int direction,
     found->circuit = circ;
     HT_INSERT(chan_circid_map, &chan_circid_map, found);
   }
+
+  /* Attach to the circuitmux if we're changing channels */
+  if (old_chan != chan) {
+    tor_assert(chan->cmux);
+    circuitmux_attach_circuit(chan->cmux, circ, direction);
+  }
+
+  /*
+   * This is a no-op if we have no cells, but if we do it marks us active to
+   * the circuitmux
+   */
   if (make_active && old_chan != chan)
-    make_circuit_active_on_chan(circ,chan);
+    update_circuit_on_cmux(circ, direction);
 
-  ++chan->n_circuits;
+  /* Adjust circuit counts on new channel */
+  if (direction == CELL_DIRECTION_OUT) {
+    ++chan->num_n_circuits;
+  } else {
+    ++chan->num_p_circuits;
+  }
 }
 
 /** Set the p_conn field of a circuit <b>circ</b>, along
@@ -994,7 +1020,7 @@ circuit_unlink_all_from_channel(channel_t *chan, int reason)
 {
   circuit_t *circ;
 
-  channel_unlink_all_active_circs(chan);
+  channel_unlink_all_circuits(chan);
 
   for (circ = global_circuitlist; circ; circ = circ->next) {
     int mark = 0;
diff --git a/src/or/connection_or.c b/src/or/connection_or.c
index bf69711..f143e9b 100644
--- a/src/or/connection_or.c
+++ b/src/or/connection_or.c
@@ -336,7 +336,7 @@ connection_or_get_num_circuits(or_connection_t *conn)
   tor_assert(conn);
 
   if (conn->chan) {
-    return TLS_CHAN_TO_BASE(conn->chan)->n_circuits;
+    return channel_num_circuits(TLS_CHAN_TO_BASE(conn->chan));
   } else return 0;
 }
 
diff --git a/src/or/or.h b/src/or/or.h
index 87ee7bb..a9b0361 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -2634,8 +2634,6 @@ typedef struct circuit_t {
   uint32_t magic; /**< For memory and type debugging: must equal
                    * ORIGIN_CIRCUIT_MAGIC or OR_CIRCUIT_MAGIC. */
 
-  /** Queue of cells waiting to be transmitted on n_conn. */
-  cell_queue_t n_chan_cells;
   /** The channel that is next in this circuit. */
   channel_t *n_chan;
 
@@ -2643,13 +2641,36 @@ typedef struct circuit_t {
    * The circuit_id used in the next (forward) hop of this circuit;
    * this is unique to n_chan, but this ordered pair is globally
    * unique:
+<<<<<<< HEAD
    *
+=======
+   * 
+>>>>>>> f1e8169... Use circuitmux_t in channels and when relaying cells
    * (n_chan->global_identifier, n_circ_id)
    */
   circid_t n_circ_id;
 
-  /** The hop to which we want to extend this circuit.  Should be NULL if
-   * the circuit has attached to a connection. */
+  /**
+   * Circuit mux associated with n_chan to which this circuit is attached;
+   * NULL if we have no n_chan.
+   */
+  circuitmux_t *mux;
+
+  /** Queue of cells waiting to be transmitted on n_chan */
+  cell_queue_t n_chan_cells;
+
+  /**
+   * The hop to which we want to extend this circuit.  Should be NULL if
+   * the circuit has attached to a connection.
+   *
+   * TODO:
+   *  - If this is NULL, we have extended.  Is it true that if this is
+   *    NULL then n_chan is not NULL?
+   *  - If n_chan is NULL, then what is n_circ_id?
+   *  - It doesn't matter, because we'll only ever attach to a circuitmux_t
+   *    when n_chan is not NULL, and that's what needs to use a unique ID
+   *    for circuits.
+   */
   extend_info_t *n_hop;
 
   /** True iff we are waiting for n_chan_cells to become less full before
@@ -2701,6 +2722,15 @@ typedef struct circuit_t {
   const char *marked_for_close_file; /**< For debugging: in which file was this
                                       * circuit marked for close? */
 
+  /** Unique ID for measuring tunneled network status requests. */
+  uint64_t dirreq_id;
+
+  /** TODO is this *all* circuits or all circuits on n_chan? */
+  struct circuit_t *next; /**< Next circuit in linked list of all circuits. */
+
+  /** TODO all this from here on down should go away in favor of
+   * circuitmux_t.
+   */
   /** Next circuit in the doubly-linked ring of circuits waiting to add
    * cells to n_conn.  NULL if we have no cells pending, or if we're not
    * linked to an OR connection. */
@@ -2709,10 +2739,6 @@ typedef struct circuit_t {
    * cells to n_conn.  NULL if we have no cells pending, or if we're not
    * linked to an OR connection. */
   struct circuit_t *prev_active_on_n_chan;
-  struct circuit_t *next; /**< Next circuit in linked list of all circuits. */
-
-  /** Unique ID for measuring tunneled network status requests. */
-  uint64_t dirreq_id;
 
   /** The EWMA count for the number of cells flushed from the
    * n_chan_cells queue.  Used to determine which circuit to flush from next.
diff --git a/src/or/relay.c b/src/or/relay.c
index 60f696c..f079243 100644
--- a/src/or/relay.c
+++ b/src/or/relay.c
@@ -1779,10 +1779,10 @@ circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint)
 }
 
 #ifdef ACTIVE_CIRCUITS_PARANOIA
-#define assert_active_circuits_ok_paranoid(conn) \
-     assert_active_circuits_ok(conn)
+#define assert_cmux_ok_paranoid(chan) \
+     assert_cmux_okay(chan)
 #else
-#define assert_active_circuits_ok_paranoid(conn)
+#define assert_cmux_ok_paranoid(chan)
 #endif
 
 /** The total number of cells we have allocated from the memory pool. */
@@ -2004,6 +2004,7 @@ prev_circ_on_chan_p(circuit_t *circ, channel_t *chan)
   }
 }
 
+#if 0
 /** Helper for sorting cell_ewma_t values in their priority queue. */
 static int
 compare_cell_ewma_counts(const void *p1, const void *p2)
@@ -2240,122 +2241,61 @@ pop_first_cell_ewma_from_chan(channel_t *chan)
                               compare_cell_ewma_counts,
                               STRUCT_OFFSET(cell_ewma_t, heap_index));
 }
+#endif
 
-/** Add <b>circ</b> to the list of circuits with pending cells on
- * <b>chan</b>.  No effect if <b>circ</b> is already linked. */
+/**
+ * Update the number of cells available on the circuit's n_chan or p_chan's
+ * circuit mux.
+ */
 void
-make_circuit_active_on_chan(circuit_t *circ, channel_t *chan)
+update_circuit_on_cmux(circuit_t *circ, cell_direction_t direction)
 {
-  circuit_t **nextp = NULL, **prevp = NULL;
+  channel_t *chan = NULL;
+  or_circuit_t *or_circ = NULL;
+  circuitmux_t *cmux = NULL;
 
-  tor_assert(chan);
   tor_assert(circ);
 
-  nextp = next_circ_on_chan_p(circ, chan);
-  prevp = prev_circ_on_chan_p(circ, chan);
-
-  if (*nextp && *prevp) {
-    /* Already active. */
-    return;
-  }
-
-  assert_active_circuits_ok_paranoid(chan);
-
-  if (!(chan->active_circuits)) {
-    chan->active_circuits = circ;
-    *prevp = *nextp = circ;
-  } else {
-    circuit_t *head = chan->active_circuits;
-    circuit_t *old_tail = *prev_circ_on_chan_p(head, chan);
-    *next_circ_on_chan_p(old_tail, chan) = circ;
-    *nextp = head;
-    *prev_circ_on_chan_p(head, chan) = circ;
-    *prevp = old_tail;
-  }
-
-  if (circ->n_chan == chan) {
-    add_cell_ewma_to_chan(chan, &circ->n_cell_ewma);
+  /* Okay, get the channel */
+  if (direction == CELL_DIRECTION_OUT) {
+    chan = circ->n_chan;
   } else {
-    or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
-    tor_assert(chan == orcirc->p_chan);
-    add_cell_ewma_to_chan(chan, &orcirc->p_cell_ewma);
+    or_circ = TO_OR_CIRCUIT(circ);
+    chan = or_circ->p_chan;
   }
 
-  assert_active_circuits_ok_paranoid(chan);
-}
-
-/** Remove <b>circ</b> from the list of circuits with pending cells on
- * <b>chan</b>.  No effect if <b>circ</b> is already unlinked. */
-void
-make_circuit_inactive_on_chan(circuit_t *circ, channel_t *chan)
-{
-  circuit_t **nextp = NULL, **prevp = NULL;
-  circuit_t *next = NULL, *prev = NULL;
-
   tor_assert(chan);
-  tor_assert(circ);
+  tor_assert(chan->cmux);
 
-  nextp = next_circ_on_chan_p(circ, chan);
-  prevp = prev_circ_on_chan_p(circ, chan);
-  next = *nextp;
-  prev = *prevp;
+  /* Now get the cmux */
+  cmux = chan->cmux;
 
-  if (!next && !prev) {
-    /* Already inactive. */
-    return;
-  }
-
-  assert_active_circuits_ok_paranoid(chan);
-
-  tor_assert(next && prev);
-  tor_assert(*prev_circ_on_chan_p(next, chan) == circ);
-  tor_assert(*next_circ_on_chan_p(prev, chan) == circ);
+  /* Cmux sanity check */
+  tor_assert(circuitmux_is_circuit_attached(cmux, circ));
+  tor_assert(circuitmux_attached_circuit_direction(cmux, circ) == direction);
 
-  if (next == circ) {
-    chan->active_circuits = NULL;
-  } else {
-    *prev_circ_on_chan_p(next, chan) = prev;
-    *next_circ_on_chan_p(prev, chan) = next;
-    if (chan->active_circuits == circ)
-      chan->active_circuits = next;
-  }
-  *prevp = *nextp = NULL;
+  assert_cmux_ok_paranoid(chan);
 
-  if (circ->n_chan == chan) {
-    remove_cell_ewma_from_chan(chan, &circ->n_cell_ewma);
+  /* Update the number of cells we have for the circuit mux */
+  if (direction == CELL_DIRECTION_OUT) {
+    circuitmux_set_num_cells(cmux, circ, circ->n_chan_cells.n);
   } else {
-    or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
-    tor_assert(chan == orcirc->p_chan);
-    remove_cell_ewma_from_chan(chan, &orcirc->p_cell_ewma);
+    circuitmux_set_num_cells(cmux, circ, or_circ->p_chan_cells.n);
   }
 
-  assert_active_circuits_ok_paranoid(chan);
+  assert_cmux_ok_paranoid(chan);
 }
 
-/** Remove all circuits from the list of circuits with pending cells on
- * <b>chan</b>. */
+/** Remove all circuits from the cmux on <b>chan</b>. */
 void
-channel_unlink_all_active_circs(channel_t *chan)
+channel_unlink_all_circuits(channel_t *chan)
 {
-  circuit_t *head = NULL, *cur = NULL;
-
   tor_assert(chan);
+  tor_assert(chan->cmux);
 
-  cur = head = chan->active_circuits;
-  if (! head)
-    return;
-  do {
-    circuit_t *next = *next_circ_on_chan_p(cur, chan);
-    *prev_circ_on_chan_p(cur, chan) = NULL;
-    *next_circ_on_chan_p(cur, chan) = NULL;
-    cur = next;
-  } while (cur != head);
-  chan->active_circuits = NULL;
-
-  SMARTLIST_FOREACH(chan->active_circuit_pqueue,
-                    cell_ewma_t *, e,
-                    e->heap_index = -1);
-  smartlist_clear(chan->active_circuit_pqueue);
+  circuitmux_detach_all_circuits(chan->cmux);
+  chan->num_n_circuits = 0;
+  chan->num_p_circuits = 0;
 }
 
 /** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
@@ -2419,53 +2359,71 @@ set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
 int
 channel_flush_from_first_active_circuit(channel_t *chan, int max)
 {
-  int n_flushed;
+  circuitmux_t *cmux = NULL;
+  int n_flushed = 0;
   cell_queue_t *queue;
   circuit_t *circ;
+  or_circuit_t *or_circ;
   int streams_blocked;
+  packed_cell_t *cell;
 
+#if 0
   /* The current (hi-res) time */
   struct timeval now_hires;
 
   /* The EWMA cell counter for the circuit we're flushing. */
   cell_ewma_t *cell_ewma = NULL;
   double ewma_increment = -1;
+#endif
 
+  /* Get the cmux */
   tor_assert(chan);
+  tor_assert(chan->cmux);
+  cmux = chan->cmux;
+
+  /* Main loop: pick a circuit, send a cell, update the cmux */
+  while (n_flushed < max) {
+    circ = circuitmux_get_first_active_circuit(cmux);
+    /* If it returns NULL, no cells left to send */
+    if (!circ) break;
+    assert_cmux_ok_paranoid(chan);
+
+#if 0
+    /* This will go in circuitmux_get_first_active_circuit() */
+    /* See if we're doing the ewma circuit selection algorithm. */
+    if (ewma_enabled) {
+      unsigned tick;
+      double fractional_tick;
+      tor_gettimeofday_cached(&now_hires);
+      tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick);
+
+      if (tick != chan->active_circuit_pqueue_last_recalibrated) {
+        scale_active_circuits(chan, tick);
+      }
 
-  circ = chan->active_circuits;
-  if (!circ) return 0;
-  assert_active_circuits_ok_paranoid(chan);
-
-  /* See if we're doing the ewma circuit selection algorithm. */
-  if (ewma_enabled) {
-    unsigned tick;
-    double fractional_tick;
-    tor_gettimeofday_cached(&now_hires);
-    tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick);
+      ewma_increment = pow(ewma_scale_factor, -fractional_tick);
 
-    if (tick != chan->active_circuit_pqueue_last_recalibrated) {
-      scale_active_circuits(chan, tick);
+      cell_ewma = smartlist_get(chan->active_circuit_pqueue, 0);
+      circ = cell_ewma_to_circuit(cell_ewma);
     }
+#endif
 
-    ewma_increment = pow(ewma_scale_factor, -fractional_tick);
-
-    cell_ewma = smartlist_get(chan->active_circuit_pqueue, 0);
-    circ = cell_ewma_to_circuit(cell_ewma);
-  }
-
-  if (circ->n_chan == chan) {
-    queue = &circ->n_chan_cells;
-    streams_blocked = circ->streams_blocked_on_n_chan;
-  } else {
-    queue = &TO_OR_CIRCUIT(circ)->p_chan_cells;
-    streams_blocked = circ->streams_blocked_on_p_chan;
-  }
-  tor_assert(*next_circ_on_chan_p(circ, chan));
+    if (circ->n_chan == chan) {
+      queue = &circ->n_chan_cells;
+      streams_blocked = circ->streams_blocked_on_n_chan;
+    } else {
+      or_circ = TO_OR_CIRCUIT(circ);
+      tor_assert(or_circ->p_chan == chan);
+      queue = &TO_OR_CIRCUIT(circ)->p_chan_cells;
+      streams_blocked = circ->streams_blocked_on_p_chan;
+    }
 
-  for (n_flushed = 0; n_flushed < max && queue->head; ) {
-    packed_cell_t *cell = cell_queue_pop(queue);
-    tor_assert(*next_circ_on_chan_p(circ, chan));
+    /*
+     * Get just one cell here; once we've sent it, that can change the circuit
+     * selection, so we have to loop around for another even if this circuit
+     * has more than one.
+     */
+    cell = cell_queue_pop(queue);
 
     /* Calculate the exact time that this cell has spent in the queue. */
     if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) {
@@ -2481,8 +2439,8 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
                              "Looks like the CellStatistics option was "
                              "recently enabled.");
       } else {
-        or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
         insertion_time_elem_t *elem = it_queue->first;
+        or_circ = TO_OR_CIRCUIT(circ);
         cell_waiting_time =
             (uint32_t)((flushed * 10L + SECONDS_IN_A_DAY * 1000L -
                         elem->insertion_time * 10L) %
@@ -2495,8 +2453,8 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
             it_queue->last = NULL;
           mp_pool_release(elem);
         }
-        orcirc->total_cell_waiting_time += cell_waiting_time;
-        orcirc->processed_cells++;
+        or_circ->total_cell_waiting_time += cell_waiting_time;
+        or_circ->processed_cells++;
       }
     }
 
@@ -2507,14 +2465,34 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
                                 DIRREQ_TUNNELED,
                                 DIRREQ_CIRC_QUEUE_FLUSHED);
 
+    /* Now send the cell */
     channel_write_packed_cell(chan, cell);
+    cell = NULL;
+
     /*
      * Don't packed_cell_free_unchecked(cell) here because the channel will
      * do so when it gets out of the channel queue (probably already did, in
      * which case that was an immediate double-free bug).
      */
 
+    /* Update the counter */
     ++n_flushed;
+
+    /*
+     * Now update the cmux; tell it we've just sent a cell, and how many
+     * we have left.
+     */
+    circuitmux_notify_xmit_cells(cmux, circ, 1);
+    circuitmux_set_num_cells(cmux, circ, queue->n);
+    if (queue->n == 0)
+      log_debug(LD_GENERAL, "Made a circuit inactive.");
+
+    /* Is the cell queue low enough to unblock all the streams that are waiting
+     * to write to this circuit? */
+    if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
+      set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
+
+#if 0
     if (cell_ewma) {
       cell_ewma_t *tmp;
       cell_ewma->cell_count += ewma_increment;
@@ -2534,22 +2512,13 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
       assert_active_circuits_ok_paranoid(chan);
       goto done;
     }
-  }
-  tor_assert(*next_circ_on_chan_p(circ, chan));
-  assert_active_circuits_ok_paranoid(chan);
-  chan->active_circuits = *next_circ_on_chan_p(circ, chan);
-
-  /* Is the cell queue low enough to unblock all the streams that are waiting
-   * to write to this circuit? */
-  if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
-    set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
+#endif
 
-  /* Did we just run out of cells on this circuit's queue? */
-  if (queue->n == 0) {
-    log_debug(LD_GENERAL, "Made a circuit inactive.");
-    make_circuit_inactive_on_chan(circ, chan);
+    /* If n_flushed < max still, loop around and pick another circuit */
   }
- done:
+
+  /* Okay, we're done sending now */
+  assert_cmux_ok_paranoid(chan);
 
   return n_flushed;
 }
@@ -2587,11 +2556,11 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
     set_streams_blocked_on_circ(circ, chan, 1, fromstream);
   }
 
+  update_circuit_on_cmux(circ, direction);
   if (queue->n == 1) {
-    /* This was the first cell added to the queue.  We need to make this
+    /* This was the first cell added to the queue.  We just made this
      * circuit active. */
     log_debug(LD_GENERAL, "Made a circuit active.");
-    make_circuit_active_on_chan(circ, chan);
   }
 
   if (!channel_has_queued_writes(chan)) {
@@ -2669,20 +2638,37 @@ void
 circuit_clear_cell_queue(circuit_t *circ, channel_t *chan)
 {
   cell_queue_t *queue;
+  cell_direction_t direction;
+
   if (circ->n_chan == chan) {
     queue = &circ->n_chan_cells;
+    direction = CELL_DIRECTION_OUT;
   } else {
     or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
     tor_assert(orcirc->p_chan == chan);
     queue = &orcirc->p_chan_cells;
+    direction = CELL_DIRECTION_IN;
   }
 
-  if (queue->n)
-    make_circuit_inactive_on_chan(circ, chan);
-
+  /* Clear the queue */
   cell_queue_clear(queue);
+
+  /* Update the cell counter in the cmux */
+  update_circuit_on_cmux(circ, direction);
+}
+
+/** Fail with an assert if the circuit mux on chan is corrupt
+ */
+void
+assert_circuit_mux_okay(channel_t *chan)
+{
+  tor_assert(chan);
+  tor_assert(chan->cmux);
+
+  circuitmux_assert_okay(chan->cmux);
 }
 
+#if 0
 /** Fail with an assert if the active circuits ring on <b>orconn</b> is
  * corrupt.  */
 void
@@ -2721,6 +2707,7 @@ assert_active_circuits_ok(channel_t *chan)
 
   tor_assert(n == smartlist_len(chan->active_circuit_pqueue));
 }
+#endif
 
 /** Return 1 if we shouldn't restart reading on this circuit, even if
  * we get a SENDME.  Else return 0.
diff --git a/src/or/relay.h b/src/or/relay.h
index 7f96d59..ef5074b 100644
--- a/src/or/relay.h
+++ b/src/or/relay.h
@@ -51,11 +51,10 @@ void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell);
 void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
                                   cell_t *cell, cell_direction_t direction,
                                   streamid_t fromstream);
-void channel_unlink_all_active_circs(channel_t *chan);
+void channel_unlink_all_circuits(channel_t *chan);
 int channel_flush_from_first_active_circuit(channel_t *chan, int max);
-void assert_active_circuits_ok(channel_t *chan);
-void make_circuit_inactive_on_chan(circuit_t *circ, channel_t *chan);
-void make_circuit_active_on_chan(circuit_t *circ, channel_t *chan);
+void assert_circuit_mux_okay(channel_t *chan);
+void update_circuit_on_cmux(circuit_t *circ, cell_direction_t direction);
 
 int append_address_to_payload(uint8_t *payload_out, const tor_addr_t *addr);
 const uint8_t *decode_address_from_payload(tor_addr_t *addr_out,





More information about the tor-commits mailing list