[or-cvs] r9904: Initial version of circuit-based cell queues. Instead of ham (in tor/trunk: . doc src/or)

nickm at seul.org nickm at seul.org
Mon Mar 26 14:08:09 UTC 2007


Author: nickm
Date: 2007-03-26 10:07:59 -0400 (Mon, 26 Mar 2007)
New Revision: 9904

Modified:
   tor/trunk/
   tor/trunk/ChangeLog
   tor/trunk/doc/TODO
   tor/trunk/src/or/circuitbuild.c
   tor/trunk/src/or/circuitlist.c
   tor/trunk/src/or/command.c
   tor/trunk/src/or/connection_or.c
   tor/trunk/src/or/or.h
   tor/trunk/src/or/relay.c
Log:
 r12651 at Kushana:  nickm | 2007-03-24 18:26:42 -0400
 Initial version of circuit-based cell queues.  Instead of hammering or_conns with piles of cells, queue cells on their corresponding circuits, and append them to the or_conn as needed.  This seems to work so far, but needs a bit more work.  This will break the memory-use-limitation patch for begin_dir conns: the solution will be a fun but fiddly.



Property changes on: tor/trunk
___________________________________________________________________
 svk:merge ticket from /tor/trunk [r12651] on c95137ef-5f19-0410-b913-86e773d04f59

Modified: tor/trunk/ChangeLog
===================================================================
--- tor/trunk/ChangeLog	2007-03-26 13:30:17 UTC (rev 9903)
+++ tor/trunk/ChangeLog	2007-03-26 14:07:59 UTC (rev 9904)
@@ -1,4 +1,12 @@
 Changes in version 0.2.0.1-alpha - 2007-??-??
+  o Major features:
+    - Change the way that Tor buffers data that it is waiting to write.
+      Instead of queueing data cells in an enormous ring buffer for each
+      client->OR or OR->OR connection, we now queue cells on a separate
+      queue for each circuit.  This lets us use less slack memory, and
+      will eventually let us be smarter about prioritizing different kinds
+      of traffic.
+
   o Security fixes:
     - Directory authorities now call routers stable if they have an
       uptime of at least 30 days, even if that's not the median uptime
@@ -57,8 +65,8 @@
       to 'getinfo addr-mappings/*'.
 
   o Code simplifications and refactoring
-    - Stop passing around crypt_path_t pointers that are implicit in other
-      procedure arguments.
+    - Stop passing around circuit_t and crypt_path_t pointers that are
+      implicit in other procedure arguments.
 
 
 Changes in version 0.1.2.12-rc - 2007-03-16

Modified: tor/trunk/doc/TODO
===================================================================
--- tor/trunk/doc/TODO	2007-03-26 13:30:17 UTC (rev 9903)
+++ tor/trunk/doc/TODO	2007-03-26 14:07:59 UTC (rev 9904)
@@ -61,11 +61,21 @@
       _on_ on a socks connection: have edge_connection_t and (say)
       dns_request_t both extend an edge_stream_t, and have p_streams and
       n_streams both be linked lists of edge_stream_t.
-    - Make cells get buffered on circuit, not on the or_conn.
-      - Don't move them into the target conn until there is space on the
+    . Make cells get buffered on circuit, not on the or_conn.
+      O Implement cell queues
+      o Keep doubly-linked list of active circuits on each or_conn.
+      o Put all relay data on the circuit cell queue, not on the outbuf.
+      o Don't move them into the target conn until there is space on the
         target conn's outbuf.
+      o When making a circuit active on a connection with an empty buf,
+        we need to "prime" the buffer, so that we can trigger the "I flushed
+        some" test.
+      - Change how directory-bridge-choking works: choke when circuit queue
+        is full, not when the orconn is "too full".
+      - Do we switch to arena-allocation for cells?
+      - Can we stop doing so many memcpys on cells?
       - Also, only package data from exitconns when there is space on the
-        target OR conn's outbuf.
+        target OR conn's outbuf?  or when the circuit is not too full.
       - MAYBE kill stalled circuits rather than stalled connections; consider
         anonymity implications.
     - Move all status info out of routerinfo into local_routerstatus.  Make

Modified: tor/trunk/src/or/circuitbuild.c
===================================================================
--- tor/trunk/src/or/circuitbuild.c	2007-03-26 13:30:17 UTC (rev 9903)
+++ tor/trunk/src/or/circuitbuild.c	2007-03-26 14:07:59 UTC (rev 9904)
@@ -488,7 +488,7 @@
   cell.circ_id = circ->n_circ_id;
 
   memcpy(cell.payload, payload, ONIONSKIN_CHALLENGE_LEN);
-  connection_or_write_cell_to_buf(&cell, circ->n_conn);
+  append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT);
 
   /* mark it so it gets better rate limiting treatment. */
   circ->n_conn->client_used = 1;
@@ -650,7 +650,7 @@
       return - END_CIRC_REASON_INTERNAL;
     }
 
-    log_debug(LD_CIRC,"Sending extend relay cell.");
+    log_info(LD_CIRC,"Sending extend relay cell.");
     /* send it to hop->prev, because it will transfer
      * it to a create cell and then send to hop */
     if (relay_send_command_from_edge(0, TO_CIRCUIT(circ),
@@ -988,7 +988,8 @@
 
   circ->is_first_hop = (cell_type == CELL_CREATED_FAST);
 
-  connection_or_write_cell_to_buf(&cell, circ->p_conn);
+  append_cell_to_circuit_queue(TO_CIRCUIT(circ),
+                               circ->p_conn, &cell, CELL_DIRECTION_IN);
   log_debug(LD_CIRC,"Finished sending 'created' cell.");
 
   if (!is_local_IP(circ->p_conn->_base.addr) &&

Modified: tor/trunk/src/or/circuitlist.c
===================================================================
--- tor/trunk/src/or/circuitlist.c	2007-03-26 13:30:17 UTC (rev 9903)
+++ tor/trunk/src/or/circuitlist.c	2007-03-26 14:07:59 UTC (rev 9904)
@@ -71,10 +71,12 @@
  */
 orconn_circid_circuit_map_t *_last_circid_orconn_ent = NULL;
 
+/** DOCDOC */
 static void
 circuit_set_circid_orconn_helper(circuit_t *circ, uint16_t id,
                                  or_connection_t *conn,
-                                 uint16_t old_id, or_connection_t *old_conn)
+                                 uint16_t old_id, or_connection_t *old_conn,
+                                 int active)
 {
   orconn_circid_circuit_map_t search;
   orconn_circid_circuit_map_t *found;
@@ -96,6 +98,8 @@
       tor_free(found);
       --old_conn->n_circuits;
     }
+    if (active)
+      make_circuit_inactive_on_conn(circ,old_conn);
   }
 
   if (conn == NULL)
@@ -114,6 +118,9 @@
     found->circuit = circ;
     HT_INSERT(orconn_circid_map, &orconn_circid_circuit_map, found);
   }
+  if (active)
+    make_circuit_active_on_conn(circ,conn);
+
   ++conn->n_circuits;
 }
 
@@ -126,16 +133,18 @@
 {
   uint16_t old_id;
   or_connection_t *old_conn;
+  int active;
 
   old_id = circ->p_circ_id;
   old_conn = circ->p_conn;
   circ->p_circ_id = id;
   circ->p_conn = conn;
+  active = circ->p_conn_cells.n > 0;
 
   if (id == old_id && conn == old_conn)
     return;
   circuit_set_circid_orconn_helper(TO_CIRCUIT(circ), id, conn,
-                                   old_id, old_conn);
+                                   old_id, old_conn, active);
 }
 
 /** Set the n_conn field of a circuit <b>circ</b>, along
@@ -147,15 +156,17 @@
 {
   uint16_t old_id;
   or_connection_t *old_conn;
+  int active;
 
   old_id = circ->n_circ_id;
   old_conn = circ->n_conn;
   circ->n_circ_id = id;
   circ->n_conn = conn;
+  active = circ->n_conn_cells.n > 0;
 
   if (id == old_id && conn == old_conn)
     return;
-  circuit_set_circid_orconn_helper(circ, id, conn, old_id, old_conn);
+  circuit_set_circid_orconn_helper(circ, id, conn, old_id, old_conn, active);
 }
 
 /** Change the state of <b>circ</b> to <b>state</b>, adding it to or removing
@@ -380,12 +391,16 @@
       other->rend_splice = NULL;
     }
 
+    cell_queue_clear(&ocirc->p_conn_cells);
+
     tor_free(circ->onionskin);
 
     /* remove from map. */
     circuit_set_p_circid_orconn(ocirc, 0, NULL);
   }
 
+  cell_queue_clear(&circ->n_conn_cells);
+
   /* Remove from map. */
   circuit_set_n_circid_orconn(circ, 0, NULL);
 
@@ -637,6 +652,9 @@
 circuit_unlink_all_from_or_conn(or_connection_t *conn, int reason)
 {
   circuit_t *circ;
+
+  connection_or_unlink_all_active_circs(conn);
+
   for (circ = global_circuitlist; circ; circ = circ->next) {
     int mark = 0;
     if (circ->n_conn == conn) {

Modified: tor/trunk/src/or/command.c
===================================================================
--- tor/trunk/src/or/command.c	2007-03-26 13:30:17 UTC (rev 9903)
+++ tor/trunk/src/or/command.c	2007-03-26 14:07:59 UTC (rev 9904)
@@ -305,7 +305,7 @@
 command_process_relay_cell(cell_t *cell, or_connection_t *conn)
 {
   circuit_t *circ;
-  int reason;
+  int reason, direction;
 
   circ = circuit_get_by_circid_orconn(cell->circ_id, conn);
 
@@ -323,23 +323,16 @@
   }
 
   if (!CIRCUIT_IS_ORIGIN(circ) &&
-      cell->circ_id == TO_OR_CIRCUIT(circ)->p_circ_id) {
-    /* it's an outgoing cell */
-    if ((reason = circuit_receive_relay_cell(cell, circ,
-                                             CELL_DIRECTION_OUT)) < 0) {
-      log_fn(LOG_PROTOCOL_WARN,LD_PROTOCOL,"circuit_receive_relay_cell "
-             "(forward) failed. Closing.");
-      circuit_mark_for_close(circ, -reason);
-      return;
-    }
-  } else { /* it's an ingoing cell */
-    if ((reason = circuit_receive_relay_cell(cell, circ,
-                                             CELL_DIRECTION_IN)) < 0) {
-      log_fn(LOG_PROTOCOL_WARN,LD_PROTOCOL,"circuit_receive_relay_cell "
-             "(backward) failed. Closing.");
-      circuit_mark_for_close(circ, -reason);
-      return;
-    }
+      cell->circ_id == TO_OR_CIRCUIT(circ)->p_circ_id)
+    direction = CELL_DIRECTION_OUT;
+  else
+    direction = CELL_DIRECTION_IN;
+
+  if ((reason = circuit_receive_relay_cell(cell, circ, direction)) < 0) {
+    log_fn(LOG_PROTOCOL_WARN,LD_PROTOCOL,"circuit_receive_relay_cell "
+           "(%s) failed. Closing.",
+           direction==CELL_DIRECTION_OUT?"forward":"backward");
+    circuit_mark_for_close(circ, -reason);
   }
 }
 

Modified: tor/trunk/src/or/connection_or.c
===================================================================
--- tor/trunk/src/or/connection_or.c	2007-03-26 13:30:17 UTC (rev 9903)
+++ tor/trunk/src/or/connection_or.c	2007-03-26 14:07:59 UTC (rev 9904)
@@ -232,7 +232,8 @@
   }
 }
 
-/** Called whenever we have flushed some data on an or_conn. */
+/** Called whenever we have flushed some data on an or_conn: add more data
+ * from active circuits. */
 int
 connection_or_flushed_some(or_connection_t *conn)
 {
@@ -240,6 +241,15 @@
       connection_or_empty_enough_for_dirserv_data(conn)) {
     connection_dirserv_stop_blocking_all_on_or_conn(conn);
   }
+  if (buf_datalen(conn->_base.outbuf) < 16*1024) {
+    int n = (32*1024 - buf_datalen(conn->_base.outbuf)) / CELL_NETWORK_SIZE;
+    while (conn->active_circuits && n > 0) {
+      int flushed;
+      log_info(LD_GENERAL, "Loop, n==%d",n);
+      flushed = connection_or_flush_from_first_active_circuit(conn, 1);
+      n -= flushed;
+    }
+  }
   return 0;
 }
 
@@ -784,26 +794,27 @@
   cell.command = CELL_DESTROY;
   cell.payload[0] = (uint8_t) reason;
   log_debug(LD_OR,"Sending destroy (circID %d).", circ_id);
+  /* XXXX clear the rest of the cell queue on the circuit. {cells} */
   connection_or_write_cell_to_buf(&cell, conn);
   return 0;
 }
 
 /** A high waterlevel for whether to refill this OR connection
  * with more directory information, if any is pending. */
-#define BUF_FULLNESS_THRESHOLD (128*1024)
+#define DIR_BUF_FULLNESS_THRESHOLD (128*1024)
 /** A bottom waterlevel for whether to refill this OR connection
  * with more directory information, if any is pending. We don't want
  * to make this too low, since we already run the risk of starving
  * the pending dir connections if the OR conn is frequently busy with
  * other things. */
-#define BUF_EMPTINESS_THRESHOLD (96*1024)
+#define DIR_BUF_EMPTINESS_THRESHOLD (96*1024)
 
 /** Return true iff there is so much data waiting to be flushed on <b>conn</b>
  * that we should stop writing directory data to it. */
 int
 connection_or_too_full_for_dirserv_data(or_connection_t *conn)
 {
-  return buf_datalen(conn->_base.outbuf) > BUF_FULLNESS_THRESHOLD;
+  return buf_datalen(conn->_base.outbuf) > DIR_BUF_FULLNESS_THRESHOLD;
 }
 
 /** Return true iff there is no longer so much data waiting to be flushed on
@@ -814,6 +825,6 @@
   /* Note that the threshold to stop writing is a bit higher than the
    * threshold to start again: this should (with any luck) keep us from
    * flapping about indefinitely. */
-  return buf_datalen(conn->_base.outbuf) < BUF_EMPTINESS_THRESHOLD;
+  return buf_datalen(conn->_base.outbuf) < DIR_BUF_EMPTINESS_THRESHOLD;
 }
 

Modified: tor/trunk/src/or/or.h
===================================================================
--- tor/trunk/src/or/or.h	2007-03-26 13:30:17 UTC (rev 9903)
+++ tor/trunk/src/or/or.h	2007-03-26 14:07:59 UTC (rev 9904)
@@ -674,15 +674,24 @@
 /** Largest number of bytes that can fit in a relay cell payload. */
 #define RELAY_PAYLOAD_SIZE (CELL_PAYLOAD_SIZE-RELAY_HEADER_SIZE)
 
+typedef struct cell_t cell_t;
 /** Parsed onion routing cell.  All communication between nodes
  * is via cells. */
-typedef struct {
+struct cell_t {
+  struct cell_t *next; /**< Next cell queued on a this circuit. */
   uint16_t circ_id; /**< Circuit which received the cell. */
   uint8_t command; /**< Type of the cell: one of PADDING, CREATE, RELAY,
                     * or DESTROY. */
   char payload[CELL_PAYLOAD_SIZE]; /**< Cell body. */
-} cell_t;
+};
 
+/** DOCDOC */
+typedef struct cell_queue_t {
+  cell_t *head;
+  cell_t *tail;
+  int n;
+} cell_queue_t;
+
 /** Beginning of a RELAY cell payload. */
 typedef struct {
   uint8_t command; /**< The end-to-end relay command. */
@@ -806,6 +815,7 @@
                     * bandwidthburst. (OPEN ORs only) */
   int n_circuits; /**< How many circuits use this connection as p_conn or
                    * n_conn ? */
+  struct circuit_t *active_circuits; /**< DOCDOC */
   struct or_connection_t *next_with_same_id; /**< Next connection with same
                                               * identity digest as this one. */
   /** Linked list of bridged dirserver connections that can't write until
@@ -1374,6 +1384,8 @@
   uint32_t magic; /**< For memory and type debugging: must equal
                    * ORIGIN_CIRCUIT_MAGIC or OR_CIRCUIT_MAGIC. */
 
+  /** DOCDOC */
+  cell_queue_t n_conn_cells;
   /** The OR connection that is next in this circuit. */
   or_connection_t *n_conn;
   /** The identity hash of n_conn. */
@@ -1413,6 +1425,8 @@
   const char *marked_for_close_file; /**< For debugging: in which file was this
                                       * circuit marked for close? */
 
+  struct circuit_t *next_active_on_n_conn; /**< DOCDOC */
+  struct circuit_t *prev_active_on_n_conn; /**< DOCDOC */
   struct circuit_t *next; /**< Next circuit in linked list. */
 } circuit_t;
 
@@ -1467,8 +1481,13 @@
 typedef struct or_circuit_t {
   circuit_t _base;
 
+  struct circuit_t *next_active_on_p_conn; /**< DOCDOC */
+  struct circuit_t *prev_active_on_p_conn; /**< DOCDOC */
+
   /** The circuit_id used in the previous (backward) hop of this circuit. */
   circid_t p_circ_id;
+  /** DOCDOC */
+  cell_queue_t p_conn_cells;
   /** The OR connection that is previous in this circuit. */
   or_connection_t *p_conn;
   /** Linked list of Exit streams associated with this circuit. */
@@ -2631,6 +2650,19 @@
 extern uint64_t stats_n_data_cells_received;
 extern uint64_t stats_n_data_bytes_received;
 
+void cell_queue_clear(cell_queue_t *queue);
+void cell_queue_append(cell_queue_t *queue, cell_t *cell);
+void cell_queue_append_copy(cell_queue_t *queue, const cell_t *cell);
+
+void append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
+                                  cell_t *cell, int direction);
+void connection_or_unlink_all_active_circs(or_connection_t *conn);
+int connection_or_flush_from_first_active_circuit(or_connection_t *conn,
+                                                  int max);
+void assert_active_circuits_ok(or_connection_t *orconn);
+void make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn);
+void make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn);
+
 /********************************* rephist.c ***************************/
 
 void rep_hist_init(void);

Modified: tor/trunk/src/or/relay.c
===================================================================
--- tor/trunk/src/or/relay.c	2007-03-26 13:30:17 UTC (rev 9903)
+++ tor/trunk/src/or/relay.c	2007-03-26 14:07:59 UTC (rev 9904)
@@ -9,7 +9,7 @@
 /**
  * \file relay.c
  * \brief Handle relay cell encryption/decryption, plus packaging and
- * receiving from circuits.
+ *    receiving from circuits, plus queueing on circuits.
  **/
 
 #include "or.h"
@@ -137,7 +137,7 @@
  *    a conn that the cell is intended for, and deliver it to
  *    connection_edge.
  *  - Else connection_or_write_cell_to_buf to the conn on the other
- *    side of the circuit.
+ *    side of the circuit.DOCDOC
  *
  * Return -reason on failure.
  */
@@ -225,8 +225,12 @@
   }
 
   log_debug(LD_OR,"Passing on unrecognized cell.");
-  ++stats_n_relay_cells_relayed;
-  connection_or_write_cell_to_buf(cell, or_conn);
+
+  ++stats_n_relay_cells_relayed; /* XXXX no longer quite accurate {cells}
+                                  * we might kill the circ before we relay
+                                  * the cells. */
+
+  append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction);
   return 0;
 }
 
@@ -318,7 +322,7 @@
 
 /** Package a relay cell from an edge:
  *  - Encrypt it to the right layer
- *  - connection_or_write_cell_to_buf to the right conn
+ *  - connection_or_write_cell_to_buf to the right conn DOCDOC
  */
 static int
 circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
@@ -364,7 +368,8 @@
       return -1;
   }
   ++stats_n_relay_cells_relayed;
-  connection_or_write_cell_to_buf(cell, conn);
+
+  append_cell_to_circuit_queue(circ, conn, cell, cell_direction);
   return 0;
 }
 
@@ -1457,3 +1462,234 @@
   }
 }
 
+/** DOCDOC */
+static INLINE void
+cell_free(cell_t *cell)
+{
+  tor_free(cell);
+}
+
+/** DOCDOC */
+static INLINE cell_t *
+cell_copy(const cell_t *cell)
+{
+  cell_t *c = tor_malloc(sizeof(cell_t));
+  memcpy(c, cell, sizeof(cell_t));
+  c->next = NULL;
+  return c;
+}
+
+/** DOCDOC */
+void
+cell_queue_append(cell_queue_t *queue, cell_t *cell)
+{
+  if (queue->tail) {
+    tor_assert(!queue->tail->next);
+    queue->tail->next = cell;
+  } else {
+    queue->head = cell;
+  }
+  queue->tail = cell;
+  cell->next = NULL;
+  ++queue->n;
+}
+
+/** DOCDOC */
+void
+cell_queue_append_copy(cell_queue_t *queue, const cell_t *cell)
+{
+  cell_queue_append(queue, cell_copy(cell));
+}
+
+/** DOCDOC */
+void
+cell_queue_clear(cell_queue_t *queue)
+{
+  cell_t *cell, *next;
+  cell = queue->head;
+  while (cell) {
+    next = cell->next;
+    cell_free(cell);
+    cell = next;
+  }
+  queue->head = queue->tail = NULL;
+  queue->n = 0;
+}
+
+/** DOCDOC */
+static INLINE cell_t *
+cell_queue_pop(cell_queue_t *queue)
+{
+  cell_t *cell = queue->head;
+  if (!cell)
+    return NULL;
+  queue->head = cell->next;
+  if (cell == queue->tail) {
+    tor_assert(!queue->head);
+    queue->tail = NULL;
+  }
+  --queue->n;
+  return cell;
+}
+
+static INLINE circuit_t **
+next_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
+{
+  if (conn == circ->n_conn) {
+    return &circ->next_active_on_n_conn;
+  } else {
+    or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
+    tor_assert(conn == orcirc->p_conn);
+    return &orcirc->next_active_on_p_conn;
+  }
+}
+
+/** DOCDOC */
+static INLINE circuit_t **
+prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
+{
+  if (conn == circ->n_conn) {
+    return &circ->prev_active_on_n_conn;
+  } else {
+    or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
+    tor_assert(conn == orcirc->p_conn);
+    return &orcirc->prev_active_on_p_conn;
+  }
+}
+
+/** DOCDOC */
+void
+make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
+{
+  if (! conn->active_circuits) {
+    conn->active_circuits = circ;
+    *prev_circ_on_conn_p(circ, conn) = circ;
+    *next_circ_on_conn_p(circ, conn) = circ;
+  } else {
+    circuit_t *head = conn->active_circuits;
+    circuit_t *old_tail = *prev_circ_on_conn_p(head, conn);
+    *next_circ_on_conn_p(old_tail, conn) = circ;
+    *next_circ_on_conn_p(circ, conn) = head;
+    *prev_circ_on_conn_p(head, conn) = circ;
+    *prev_circ_on_conn_p(circ, conn) = old_tail;
+  }
+}
+
+/** DOCDOC */
+void
+make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
+{
+  // XXXX add some assert.
+  circuit_t *next = *next_circ_on_conn_p(circ, conn);
+  circuit_t *prev = *next_circ_on_conn_p(circ, conn);
+  if (next == circ) {
+    conn->active_circuits = NULL;
+  } else {
+    *prev_circ_on_conn_p(next, conn) = prev;
+    *next_circ_on_conn_p(prev, conn) = next;
+    if (conn->active_circuits == circ)
+      conn->active_circuits = next;
+  }
+  *prev_circ_on_conn_p(circ, conn) = NULL;
+  *next_circ_on_conn_p(circ, conn) = NULL;
+}
+
+/** DOCDOC */
+void
+connection_or_unlink_all_active_circs(or_connection_t *orconn)
+{
+  circuit_t *head = orconn->active_circuits;
+  circuit_t *cur = head;
+  if (! head)
+    return;
+  do {
+    circuit_t *next = *next_circ_on_conn_p(cur, orconn);
+    *prev_circ_on_conn_p(cur, orconn) = NULL;
+    *next_circ_on_conn_p(cur, orconn) = NULL;
+    cur = next;
+  } while (cur != head);
+  orconn->active_circuits = NULL;
+}
+
+/** DOCDOC */
+int
+connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max)
+{
+  int n_flushed;
+  cell_queue_t *queue;
+  circuit_t *circ;
+  circ = conn->active_circuits;
+  if (!circ) return 0;
+  if (circ->n_conn == conn)
+    queue = &circ->n_conn_cells;
+  else
+    queue = &TO_OR_CIRCUIT(circ)->p_conn_cells;
+
+  for (n_flushed = 0; n_flushed < max && queue->head; ++n_flushed) {
+    cell_t *cell = cell_queue_pop(queue);
+
+    connection_or_write_cell_to_buf(cell, conn);
+    cell_free(cell);
+    log_info(LD_GENERAL, "flushed a cell; n ==%d", queue->n);
+    ++n_flushed;
+  }
+  conn->active_circuits = *next_circ_on_conn_p(circ, conn);
+  if (queue->n == 0) {
+    log_info(LD_GENERAL, "Made a circuit inactive.");
+    make_circuit_inactive_on_conn(circ, conn);
+  }
+  return n_flushed;
+}
+
+/** DOCDOC */
+void
+append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
+                             cell_t *cell, int direction)
+{
+  cell_queue_t *queue;
+  if (direction == CELL_DIRECTION_OUT) {
+    queue = &circ->n_conn_cells;
+  } else {
+    or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
+    queue = &orcirc->p_conn_cells;
+  }
+
+  cell_queue_append_copy(queue, cell);
+
+  log_info(LD_GENERAL, "Added a cell; n ==%d", queue->n);
+
+  if (queue->n == 1) {
+    /* This was the first cell added to the queue.  We need to make this
+     * circuit active. */
+    log_info(LD_GENERAL, "Made a circuit active.");
+    make_circuit_active_on_conn(circ, orconn);
+  }
+
+  if (! buf_datalen(orconn->_base.outbuf)) {
+    /* XXXX Should this be a "<16K"? {cells} */
+    /* There is no data at all waiting to be sent on the outbuf.  Add a
+     * cell, so that we can notice when it gets flushed, flushed_some can
+     * get called, and we can start putting more data onto the buffer then.
+     */
+    log_info(LD_GENERAL, "Primed a buffer.");
+    connection_or_flush_from_first_active_circuit(orconn, 1);
+  }
+}
+
+void
+assert_active_circuits_ok(or_connection_t *orconn)
+{
+  circuit_t *head = orconn->active_circuits;
+  circuit_t *cur = head;
+  if (! head)
+    return;
+  do {
+    circuit_t *next = *next_circ_on_conn_p(cur, orconn);
+    circuit_t *prev = *prev_circ_on_conn_p(cur, orconn);
+    tor_assert(next);
+    tor_assert(prev);
+    tor_assert(*next_circ_on_conn_p(prev, orconn) == cur);
+    tor_assert(*prev_circ_on_conn_p(next, orconn) == cur);
+    cur = next;
+  } while (cur != head);
+}



More information about the tor-commits mailing list