summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2007-03-26 14:07:59 +0000
committerNick Mathewson <nickm@torproject.org>2007-03-26 14:07:59 +0000
commit38c0bb3a99956a0ff2e570bd8f2b900d46741992 (patch)
tree9a447e7b431abdc819d3ba1f30755f5b40678c79
parent6e51bdd5e4d5ab39fc1dca12e468b9a1c573cc1b (diff)
downloadtor-38c0bb3a99956a0ff2e570bd8f2b900d46741992.tar.gz
tor-38c0bb3a99956a0ff2e570bd8f2b900d46741992.zip
r12651@Kushana: nickm | 2007-03-24 18:26:42 -0400
Initial version of circuit-based cell queues. Instead of hammering or_conns with piles of cells, queue cells on their corresponding circuits, and append them to the or_conn as needed. This seems to work so far, but needs a bit more work. This will break the memory-use-limitation patch for begin_dir conns: the solution will be a fun but fiddly. svn:r9904
-rw-r--r--ChangeLog12
-rw-r--r--doc/TODO16
-rw-r--r--src/or/circuitbuild.c7
-rw-r--r--src/or/circuitlist.c24
-rw-r--r--src/or/command.c29
-rw-r--r--src/or/connection_or.c21
-rw-r--r--src/or/or.h36
-rw-r--r--src/or/relay.c248
8 files changed, 351 insertions, 42 deletions
diff --git a/ChangeLog b/ChangeLog
index 0c7f3816d3..130d0e2321 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,12 @@
Changes in version 0.2.0.1-alpha - 2007-??-??
+ o Major features:
+ - Change the way that Tor buffers data that it is waiting to write.
+ Instead of queueing data cells in an enormous ring buffer for each
+ client->OR or OR->OR connection, we now queue cells on a separate
+ queue for each circuit. This lets us use less slack memory, and
+ will eventually let us be smarter about prioritizing different kinds
+ of traffic.
+
o Security fixes:
- Directory authorities now call routers stable if they have an
uptime of at least 30 days, even if that's not the median uptime
@@ -57,8 +65,8 @@ Changes in version 0.2.0.1-alpha - 2007-??-??
to 'getinfo addr-mappings/*'.
o Code simplifications and refactoring
- - Stop passing around crypt_path_t pointers that are implicit in other
- procedure arguments.
+ - Stop passing around circuit_t and crypt_path_t pointers that are
+ implicit in other procedure arguments.
Changes in version 0.1.2.12-rc - 2007-03-16
diff --git a/doc/TODO b/doc/TODO
index b16b451a01..84b00ae29b 100644
--- a/doc/TODO
+++ b/doc/TODO
@@ -61,11 +61,21 @@ Things we'd like to do in 0.2.0.x:
_on_ on a socks connection: have edge_connection_t and (say)
dns_request_t both extend an edge_stream_t, and have p_streams and
n_streams both be linked lists of edge_stream_t.
- - Make cells get buffered on circuit, not on the or_conn.
- - Don't move them into the target conn until there is space on the
+ . Make cells get buffered on circuit, not on the or_conn.
+ O Implement cell queues
+ o Keep doubly-linked list of active circuits on each or_conn.
+ o Put all relay data on the circuit cell queue, not on the outbuf.
+ o Don't move them into the target conn until there is space on the
target conn's outbuf.
+ o When making a circuit active on a connection with an empty buf,
+ we need to "prime" the buffer, so that we can trigger the "I flushed
+ some" test.
+ - Change how directory-bridge-choking works: choke when circuit queue
+ is full, not when the orconn is "too full".
+ - Do we switch to arena-allocation for cells?
+ - Can we stop doing so many memcpys on cells?
- Also, only package data from exitconns when there is space on the
- target OR conn's outbuf.
+ target OR conn's outbuf? or when the circuit is not too full.
- MAYBE kill stalled circuits rather than stalled connections; consider
anonymity implications.
- Move all status info out of routerinfo into local_routerstatus. Make
diff --git a/src/or/circuitbuild.c b/src/or/circuitbuild.c
index c7ac520059..7d1c5d9c4f 100644
--- a/src/or/circuitbuild.c
+++ b/src/or/circuitbuild.c
@@ -488,7 +488,7 @@ circuit_deliver_create_cell(circuit_t *circ, uint8_t cell_type,
cell.circ_id = circ->n_circ_id;
memcpy(cell.payload, payload, ONIONSKIN_CHALLENGE_LEN);
- connection_or_write_cell_to_buf(&cell, circ->n_conn);
+ append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT);
/* mark it so it gets better rate limiting treatment. */
circ->n_conn->client_used = 1;
@@ -650,7 +650,7 @@ circuit_send_next_onion_skin(origin_circuit_t *circ)
return - END_CIRC_REASON_INTERNAL;
}
- log_debug(LD_CIRC,"Sending extend relay cell.");
+ log_info(LD_CIRC,"Sending extend relay cell.");
/* send it to hop->prev, because it will transfer
* it to a create cell and then send to hop */
if (relay_send_command_from_edge(0, TO_CIRCUIT(circ),
@@ -988,7 +988,8 @@ onionskin_answer(or_circuit_t *circ, uint8_t cell_type, const char *payload,
circ->is_first_hop = (cell_type == CELL_CREATED_FAST);
- connection_or_write_cell_to_buf(&cell, circ->p_conn);
+ append_cell_to_circuit_queue(TO_CIRCUIT(circ),
+ circ->p_conn, &cell, CELL_DIRECTION_IN);
log_debug(LD_CIRC,"Finished sending 'created' cell.");
if (!is_local_IP(circ->p_conn->_base.addr) &&
diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c
index 8ed5c16a98..f76d2cba1b 100644
--- a/src/or/circuitlist.c
+++ b/src/or/circuitlist.c
@@ -71,10 +71,12 @@ HT_GENERATE(orconn_circid_map, orconn_circid_circuit_map_t, node,
*/
orconn_circid_circuit_map_t *_last_circid_orconn_ent = NULL;
+/** DOCDOC */
static void
circuit_set_circid_orconn_helper(circuit_t *circ, uint16_t id,
or_connection_t *conn,
- uint16_t old_id, or_connection_t *old_conn)
+ uint16_t old_id, or_connection_t *old_conn,
+ int active)
{
orconn_circid_circuit_map_t search;
orconn_circid_circuit_map_t *found;
@@ -96,6 +98,8 @@ circuit_set_circid_orconn_helper(circuit_t *circ, uint16_t id,
tor_free(found);
--old_conn->n_circuits;
}
+ if (active)
+ make_circuit_inactive_on_conn(circ,old_conn);
}
if (conn == NULL)
@@ -114,6 +118,9 @@ circuit_set_circid_orconn_helper(circuit_t *circ, uint16_t id,
found->circuit = circ;
HT_INSERT(orconn_circid_map, &orconn_circid_circuit_map, found);
}
+ if (active)
+ make_circuit_active_on_conn(circ,conn);
+
++conn->n_circuits;
}
@@ -126,16 +133,18 @@ circuit_set_p_circid_orconn(or_circuit_t *circ, uint16_t id,
{
uint16_t old_id;
or_connection_t *old_conn;
+ int active;
old_id = circ->p_circ_id;
old_conn = circ->p_conn;
circ->p_circ_id = id;
circ->p_conn = conn;
+ active = circ->p_conn_cells.n > 0;
if (id == old_id && conn == old_conn)
return;
circuit_set_circid_orconn_helper(TO_CIRCUIT(circ), id, conn,
- old_id, old_conn);
+ old_id, old_conn, active);
}
/** Set the n_conn field of a circuit <b>circ</b>, along
@@ -147,15 +156,17 @@ circuit_set_n_circid_orconn(circuit_t *circ, uint16_t id,
{
uint16_t old_id;
or_connection_t *old_conn;
+ int active;
old_id = circ->n_circ_id;
old_conn = circ->n_conn;
circ->n_circ_id = id;
circ->n_conn = conn;
+ active = circ->n_conn_cells.n > 0;
if (id == old_id && conn == old_conn)
return;
- circuit_set_circid_orconn_helper(circ, id, conn, old_id, old_conn);
+ circuit_set_circid_orconn_helper(circ, id, conn, old_id, old_conn, active);
}
/** Change the state of <b>circ</b> to <b>state</b>, adding it to or removing
@@ -380,12 +391,16 @@ circuit_free(circuit_t *circ)
other->rend_splice = NULL;
}
+ cell_queue_clear(&ocirc->p_conn_cells);
+
tor_free(circ->onionskin);
/* remove from map. */
circuit_set_p_circid_orconn(ocirc, 0, NULL);
}
+ cell_queue_clear(&circ->n_conn_cells);
+
/* Remove from map. */
circuit_set_n_circid_orconn(circ, 0, NULL);
@@ -637,6 +652,9 @@ void
circuit_unlink_all_from_or_conn(or_connection_t *conn, int reason)
{
circuit_t *circ;
+
+ connection_or_unlink_all_active_circs(conn);
+
for (circ = global_circuitlist; circ; circ = circ->next) {
int mark = 0;
if (circ->n_conn == conn) {
diff --git a/src/or/command.c b/src/or/command.c
index e278725e8f..381ca3756a 100644
--- a/src/or/command.c
+++ b/src/or/command.c
@@ -305,7 +305,7 @@ static void
command_process_relay_cell(cell_t *cell, or_connection_t *conn)
{
circuit_t *circ;
- int reason;
+ int reason, direction;
circ = circuit_get_by_circid_orconn(cell->circ_id, conn);
@@ -323,23 +323,16 @@ command_process_relay_cell(cell_t *cell, or_connection_t *conn)
}
if (!CIRCUIT_IS_ORIGIN(circ) &&
- cell->circ_id == TO_OR_CIRCUIT(circ)->p_circ_id) {
- /* it's an outgoing cell */
- if ((reason = circuit_receive_relay_cell(cell, circ,
- CELL_DIRECTION_OUT)) < 0) {
- log_fn(LOG_PROTOCOL_WARN,LD_PROTOCOL,"circuit_receive_relay_cell "
- "(forward) failed. Closing.");
- circuit_mark_for_close(circ, -reason);
- return;
- }
- } else { /* it's an ingoing cell */
- if ((reason = circuit_receive_relay_cell(cell, circ,
- CELL_DIRECTION_IN)) < 0) {
- log_fn(LOG_PROTOCOL_WARN,LD_PROTOCOL,"circuit_receive_relay_cell "
- "(backward) failed. Closing.");
- circuit_mark_for_close(circ, -reason);
- return;
- }
+ cell->circ_id == TO_OR_CIRCUIT(circ)->p_circ_id)
+ direction = CELL_DIRECTION_OUT;
+ else
+ direction = CELL_DIRECTION_IN;
+
+ if ((reason = circuit_receive_relay_cell(cell, circ, direction)) < 0) {
+ log_fn(LOG_PROTOCOL_WARN,LD_PROTOCOL,"circuit_receive_relay_cell "
+ "(%s) failed. Closing.",
+ direction==CELL_DIRECTION_OUT?"forward":"backward");
+ circuit_mark_for_close(circ, -reason);
}
}
diff --git a/src/or/connection_or.c b/src/or/connection_or.c
index d5874f6aa6..8ebb6b4753 100644
--- a/src/or/connection_or.c
+++ b/src/or/connection_or.c
@@ -232,7 +232,8 @@ connection_or_process_inbuf(or_connection_t *conn)
}
}
-/** Called whenever we have flushed some data on an or_conn. */
+/** Called whenever we have flushed some data on an or_conn: add more data
+ * from active circuits. */
int
connection_or_flushed_some(or_connection_t *conn)
{
@@ -240,6 +241,15 @@ connection_or_flushed_some(or_connection_t *conn)
connection_or_empty_enough_for_dirserv_data(conn)) {
connection_dirserv_stop_blocking_all_on_or_conn(conn);
}
+ if (buf_datalen(conn->_base.outbuf) < 16*1024) {
+ int n = (32*1024 - buf_datalen(conn->_base.outbuf)) / CELL_NETWORK_SIZE;
+ while (conn->active_circuits && n > 0) {
+ int flushed;
+ log_info(LD_GENERAL, "Loop, n==%d",n);
+ flushed = connection_or_flush_from_first_active_circuit(conn, 1);
+ n -= flushed;
+ }
+ }
return 0;
}
@@ -784,26 +794,27 @@ connection_or_send_destroy(uint16_t circ_id, or_connection_t *conn, int reason)
cell.command = CELL_DESTROY;
cell.payload[0] = (uint8_t) reason;
log_debug(LD_OR,"Sending destroy (circID %d).", circ_id);
+ /* XXXX clear the rest of the cell queue on the circuit. {cells} */
connection_or_write_cell_to_buf(&cell, conn);
return 0;
}
/** A high waterlevel for whether to refill this OR connection
* with more directory information, if any is pending. */
-#define BUF_FULLNESS_THRESHOLD (128*1024)
+#define DIR_BUF_FULLNESS_THRESHOLD (128*1024)
/** A bottom waterlevel for whether to refill this OR connection
* with more directory information, if any is pending. We don't want
* to make this too low, since we already run the risk of starving
* the pending dir connections if the OR conn is frequently busy with
* other things. */
-#define BUF_EMPTINESS_THRESHOLD (96*1024)
+#define DIR_BUF_EMPTINESS_THRESHOLD (96*1024)
/** Return true iff there is so much data waiting to be flushed on <b>conn</b>
* that we should stop writing directory data to it. */
int
connection_or_too_full_for_dirserv_data(or_connection_t *conn)
{
- return buf_datalen(conn->_base.outbuf) > BUF_FULLNESS_THRESHOLD;
+ return buf_datalen(conn->_base.outbuf) > DIR_BUF_FULLNESS_THRESHOLD;
}
/** Return true iff there is no longer so much data waiting to be flushed on
@@ -814,6 +825,6 @@ connection_or_empty_enough_for_dirserv_data(or_connection_t *conn)
/* Note that the threshold to stop writing is a bit higher than the
* threshold to start again: this should (with any luck) keep us from
* flapping about indefinitely. */
- return buf_datalen(conn->_base.outbuf) < BUF_EMPTINESS_THRESHOLD;
+ return buf_datalen(conn->_base.outbuf) < DIR_BUF_EMPTINESS_THRESHOLD;
}
diff --git a/src/or/or.h b/src/or/or.h
index 0201607c94..a3a10cf98c 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -674,14 +674,23 @@ typedef enum {
/** Largest number of bytes that can fit in a relay cell payload. */
#define RELAY_PAYLOAD_SIZE (CELL_PAYLOAD_SIZE-RELAY_HEADER_SIZE)
+typedef struct cell_t cell_t;
/** Parsed onion routing cell. All communication between nodes
* is via cells. */
-typedef struct {
+struct cell_t {
+ struct cell_t *next; /**< Next cell queued on a this circuit. */
uint16_t circ_id; /**< Circuit which received the cell. */
uint8_t command; /**< Type of the cell: one of PADDING, CREATE, RELAY,
* or DESTROY. */
char payload[CELL_PAYLOAD_SIZE]; /**< Cell body. */
-} cell_t;
+};
+
+/** DOCDOC */
+typedef struct cell_queue_t {
+ cell_t *head;
+ cell_t *tail;
+ int n;
+} cell_queue_t;
/** Beginning of a RELAY cell payload. */
typedef struct {
@@ -806,6 +815,7 @@ typedef struct or_connection_t {
* bandwidthburst. (OPEN ORs only) */
int n_circuits; /**< How many circuits use this connection as p_conn or
* n_conn ? */
+ struct circuit_t *active_circuits; /**< DOCDOC */
struct or_connection_t *next_with_same_id; /**< Next connection with same
* identity digest as this one. */
/** Linked list of bridged dirserver connections that can't write until
@@ -1374,6 +1384,8 @@ typedef struct circuit_t {
uint32_t magic; /**< For memory and type debugging: must equal
* ORIGIN_CIRCUIT_MAGIC or OR_CIRCUIT_MAGIC. */
+ /** DOCDOC */
+ cell_queue_t n_conn_cells;
/** The OR connection that is next in this circuit. */
or_connection_t *n_conn;
/** The identity hash of n_conn. */
@@ -1413,6 +1425,8 @@ typedef struct circuit_t {
const char *marked_for_close_file; /**< For debugging: in which file was this
* circuit marked for close? */
+ struct circuit_t *next_active_on_n_conn; /**< DOCDOC */
+ struct circuit_t *prev_active_on_n_conn; /**< DOCDOC */
struct circuit_t *next; /**< Next circuit in linked list. */
} circuit_t;
@@ -1467,8 +1481,13 @@ typedef struct origin_circuit_t {
typedef struct or_circuit_t {
circuit_t _base;
+ struct circuit_t *next_active_on_p_conn; /**< DOCDOC */
+ struct circuit_t *prev_active_on_p_conn; /**< DOCDOC */
+
/** The circuit_id used in the previous (backward) hop of this circuit. */
circid_t p_circ_id;
+ /** DOCDOC */
+ cell_queue_t p_conn_cells;
/** The OR connection that is previous in this circuit. */
or_connection_t *p_conn;
/** Linked list of Exit streams associated with this circuit. */
@@ -2631,6 +2650,19 @@ extern uint64_t stats_n_data_bytes_packaged;
extern uint64_t stats_n_data_cells_received;
extern uint64_t stats_n_data_bytes_received;
+void cell_queue_clear(cell_queue_t *queue);
+void cell_queue_append(cell_queue_t *queue, cell_t *cell);
+void cell_queue_append_copy(cell_queue_t *queue, const cell_t *cell);
+
+void append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
+ cell_t *cell, int direction);
+void connection_or_unlink_all_active_circs(or_connection_t *conn);
+int connection_or_flush_from_first_active_circuit(or_connection_t *conn,
+ int max);
+void assert_active_circuits_ok(or_connection_t *orconn);
+void make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn);
+void make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn);
+
/********************************* rephist.c ***************************/
void rep_hist_init(void);
diff --git a/src/or/relay.c b/src/or/relay.c
index 0a4a190d03..f3063ddd4f 100644
--- a/src/or/relay.c
+++ b/src/or/relay.c
@@ -9,7 +9,7 @@ const char relay_c_id[] =
/**
* \file relay.c
* \brief Handle relay cell encryption/decryption, plus packaging and
- * receiving from circuits.
+ * receiving from circuits, plus queueing on circuits.
**/
#include "or.h"
@@ -137,7 +137,7 @@ relay_crypt_one_payload(crypto_cipher_env_t *cipher, char *in,
* a conn that the cell is intended for, and deliver it to
* connection_edge.
* - Else connection_or_write_cell_to_buf to the conn on the other
- * side of the circuit.
+ * side of the circuit.DOCDOC
*
* Return -reason on failure.
*/
@@ -225,8 +225,12 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ, int cell_direction)
}
log_debug(LD_OR,"Passing on unrecognized cell.");
- ++stats_n_relay_cells_relayed;
- connection_or_write_cell_to_buf(cell, or_conn);
+
+ ++stats_n_relay_cells_relayed; /* XXXX no longer quite accurate {cells}
+ * we might kill the circ before we relay
+ * the cells. */
+
+ append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction);
return 0;
}
@@ -318,7 +322,7 @@ relay_crypt(circuit_t *circ, cell_t *cell, int cell_direction,
/** Package a relay cell from an edge:
* - Encrypt it to the right layer
- * - connection_or_write_cell_to_buf to the right conn
+ * - connection_or_write_cell_to_buf to the right conn DOCDOC
*/
static int
circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
@@ -364,7 +368,8 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
return -1;
}
++stats_n_relay_cells_relayed;
- connection_or_write_cell_to_buf(cell, conn);
+
+ append_cell_to_circuit_queue(circ, conn, cell, cell_direction);
return 0;
}
@@ -1457,3 +1462,234 @@ circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint)
}
}
+/** DOCDOC */
+static INLINE void
+cell_free(cell_t *cell)
+{
+ tor_free(cell);
+}
+
+/** DOCDOC */
+static INLINE cell_t *
+cell_copy(const cell_t *cell)
+{
+ cell_t *c = tor_malloc(sizeof(cell_t));
+ memcpy(c, cell, sizeof(cell_t));
+ c->next = NULL;
+ return c;
+}
+
+/** DOCDOC */
+void
+cell_queue_append(cell_queue_t *queue, cell_t *cell)
+{
+ if (queue->tail) {
+ tor_assert(!queue->tail->next);
+ queue->tail->next = cell;
+ } else {
+ queue->head = cell;
+ }
+ queue->tail = cell;
+ cell->next = NULL;
+ ++queue->n;
+}
+
+/** DOCDOC */
+void
+cell_queue_append_copy(cell_queue_t *queue, const cell_t *cell)
+{
+ cell_queue_append(queue, cell_copy(cell));
+}
+
+/** DOCDOC */
+void
+cell_queue_clear(cell_queue_t *queue)
+{
+ cell_t *cell, *next;
+ cell = queue->head;
+ while (cell) {
+ next = cell->next;
+ cell_free(cell);
+ cell = next;
+ }
+ queue->head = queue->tail = NULL;
+ queue->n = 0;
+}
+
+/** DOCDOC */
+static INLINE cell_t *
+cell_queue_pop(cell_queue_t *queue)
+{
+ cell_t *cell = queue->head;
+ if (!cell)
+ return NULL;
+ queue->head = cell->next;
+ if (cell == queue->tail) {
+ tor_assert(!queue->head);
+ queue->tail = NULL;
+ }
+ --queue->n;
+ return cell;
+}
+
+static INLINE circuit_t **
+next_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
+{
+ if (conn == circ->n_conn) {
+ return &circ->next_active_on_n_conn;
+ } else {
+ or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
+ tor_assert(conn == orcirc->p_conn);
+ return &orcirc->next_active_on_p_conn;
+ }
+}
+
+/** DOCDOC */
+static INLINE circuit_t **
+prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
+{
+ if (conn == circ->n_conn) {
+ return &circ->prev_active_on_n_conn;
+ } else {
+ or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
+ tor_assert(conn == orcirc->p_conn);
+ return &orcirc->prev_active_on_p_conn;
+ }
+}
+
+/** DOCDOC */
+void
+make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
+{
+ if (! conn->active_circuits) {
+ conn->active_circuits = circ;
+ *prev_circ_on_conn_p(circ, conn) = circ;
+ *next_circ_on_conn_p(circ, conn) = circ;
+ } else {
+ circuit_t *head = conn->active_circuits;
+ circuit_t *old_tail = *prev_circ_on_conn_p(head, conn);
+ *next_circ_on_conn_p(old_tail, conn) = circ;
+ *next_circ_on_conn_p(circ, conn) = head;
+ *prev_circ_on_conn_p(head, conn) = circ;
+ *prev_circ_on_conn_p(circ, conn) = old_tail;
+ }
+}
+
+/** DOCDOC */
+void
+make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
+{
+ // XXXX add some assert.
+ circuit_t *next = *next_circ_on_conn_p(circ, conn);
+ circuit_t *prev = *next_circ_on_conn_p(circ, conn);
+ if (next == circ) {
+ conn->active_circuits = NULL;
+ } else {
+ *prev_circ_on_conn_p(next, conn) = prev;
+ *next_circ_on_conn_p(prev, conn) = next;
+ if (conn->active_circuits == circ)
+ conn->active_circuits = next;
+ }
+ *prev_circ_on_conn_p(circ, conn) = NULL;
+ *next_circ_on_conn_p(circ, conn) = NULL;
+}
+
+/** DOCDOC */
+void
+connection_or_unlink_all_active_circs(or_connection_t *orconn)
+{
+ circuit_t *head = orconn->active_circuits;
+ circuit_t *cur = head;
+ if (! head)
+ return;
+ do {
+ circuit_t *next = *next_circ_on_conn_p(cur, orconn);
+ *prev_circ_on_conn_p(cur, orconn) = NULL;
+ *next_circ_on_conn_p(cur, orconn) = NULL;
+ cur = next;
+ } while (cur != head);
+ orconn->active_circuits = NULL;
+}
+
+/** DOCDOC */
+int
+connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max)
+{
+ int n_flushed;
+ cell_queue_t *queue;
+ circuit_t *circ;
+ circ = conn->active_circuits;
+ if (!circ) return 0;
+ if (circ->n_conn == conn)
+ queue = &circ->n_conn_cells;
+ else
+ queue = &TO_OR_CIRCUIT(circ)->p_conn_cells;
+
+ for (n_flushed = 0; n_flushed < max && queue->head; ++n_flushed) {
+ cell_t *cell = cell_queue_pop(queue);
+
+ connection_or_write_cell_to_buf(cell, conn);
+ cell_free(cell);
+ log_info(LD_GENERAL, "flushed a cell; n ==%d", queue->n);
+ ++n_flushed;
+ }
+ conn->active_circuits = *next_circ_on_conn_p(circ, conn);
+ if (queue->n == 0) {
+ log_info(LD_GENERAL, "Made a circuit inactive.");
+ make_circuit_inactive_on_conn(circ, conn);
+ }
+ return n_flushed;
+}
+
+/** DOCDOC */
+void
+append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
+ cell_t *cell, int direction)
+{
+ cell_queue_t *queue;
+ if (direction == CELL_DIRECTION_OUT) {
+ queue = &circ->n_conn_cells;
+ } else {
+ or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
+ queue = &orcirc->p_conn_cells;
+ }
+
+ cell_queue_append_copy(queue, cell);
+
+ log_info(LD_GENERAL, "Added a cell; n ==%d", queue->n);
+
+ if (queue->n == 1) {
+ /* This was the first cell added to the queue. We need to make this
+ * circuit active. */
+ log_info(LD_GENERAL, "Made a circuit active.");
+ make_circuit_active_on_conn(circ, orconn);
+ }
+
+ if (! buf_datalen(orconn->_base.outbuf)) {
+ /* XXXX Should this be a "<16K"? {cells} */
+ /* There is no data at all waiting to be sent on the outbuf. Add a
+ * cell, so that we can notice when it gets flushed, flushed_some can
+ * get called, and we can start putting more data onto the buffer then.
+ */
+ log_info(LD_GENERAL, "Primed a buffer.");
+ connection_or_flush_from_first_active_circuit(orconn, 1);
+ }
+}
+
+void
+assert_active_circuits_ok(or_connection_t *orconn)
+{
+ circuit_t *head = orconn->active_circuits;
+ circuit_t *cur = head;
+ if (! head)
+ return;
+ do {
+ circuit_t *next = *next_circ_on_conn_p(cur, orconn);
+ circuit_t *prev = *prev_circ_on_conn_p(cur, orconn);
+ tor_assert(next);
+ tor_assert(prev);
+ tor_assert(*next_circ_on_conn_p(prev, orconn) == cur);
+ tor_assert(*prev_circ_on_conn_p(next, orconn) == cur);
+ cur = next;
+ } while (cur != head);
+}