summaryrefslogtreecommitdiff
path: root/src/or
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2013-06-13 10:31:02 -0400
committerNick Mathewson <nickm@torproject.org>2013-06-13 10:31:02 -0400
commit4b781e24fb920ad4cd2268b609e4b4b0bd5adb7d (patch)
tree81cf1e73e3ecb06333cbc22d80ab045bcea348ba /src/or
parent29498491432040168c5df7091097a7b8a9c9ad86 (diff)
parente61df2ec651345f1c46777105bbae69916402ecd (diff)
downloadtor-4b781e24fb920ad4cd2268b609e4b4b0bd5adb7d.tar.gz
tor-4b781e24fb920ad4cd2268b609e4b4b0bd5adb7d.zip
Merge remote-tracking branch 'public/bug7912_squashed'
Diffstat (limited to 'src/or')
-rw-r--r--src/or/channel.c58
-rw-r--r--src/or/circuitlist.c150
-rw-r--r--src/or/circuitlist.h5
-rw-r--r--src/or/circuitmux.c135
-rw-r--r--src/or/circuitmux.h8
-rw-r--r--src/or/or.h7
-rw-r--r--src/or/relay.c21
-rw-r--r--src/or/relay.h2
8 files changed, 354 insertions, 32 deletions
diff --git a/src/or/channel.c b/src/or/channel.c
index 4e9086f2e6..33a32102ee 100644
--- a/src/or/channel.c
+++ b/src/or/channel.c
@@ -122,6 +122,8 @@ static cell_queue_entry_t *
cell_queue_entry_new_fixed(cell_t *cell);
static cell_queue_entry_t *
cell_queue_entry_new_var(var_cell_t *var_cell);
+static int is_destroy_cell(channel_t *chan,
+ const cell_queue_entry_t *q, circid_t *circid_out);
/* Functions to maintain the digest map */
static void channel_add_to_digest_map(channel_t *chan);
@@ -1685,6 +1687,13 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
chan->timestamp_last_added_nonpadding = approx_time();
}
+ {
+ circid_t circ_id;
+ if (is_destroy_cell(chan, q, &circ_id)) {
+ channel_note_destroy_not_pending(chan, circ_id);
+ }
+ }
+
/* Can we send it right out? If so, try */
if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) &&
chan->state == CHANNEL_STATE_OPEN) {
@@ -2607,6 +2616,42 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
}
}
+/** DOCDOC */
+static int
+is_destroy_cell(channel_t *chan,
+ const cell_queue_entry_t *q, circid_t *circid_out)
+{
+ *circid_out = 0;
+ switch (q->type) {
+ case CELL_QUEUE_FIXED:
+ if (q->u.fixed.cell->command == CELL_DESTROY) {
+ *circid_out = q->u.fixed.cell->circ_id;
+ return 1;
+ }
+ break;
+ case CELL_QUEUE_VAR:
+ if (q->u.var.var_cell->command == CELL_DESTROY) {
+ *circid_out = q->u.var.var_cell->circ_id;
+ return 1;
+ }
+ break;
+ case CELL_QUEUE_PACKED:
+ if (chan->wide_circ_ids) {
+ if (q->u.packed.packed_cell->body[4] == CELL_DESTROY) {
+ *circid_out = ntohl(get_uint32(q->u.packed.packed_cell->body));
+ return 1;
+ }
+ } else {
+ if (q->u.packed.packed_cell->body[2] == CELL_DESTROY) {
+ *circid_out = ntohs(get_uint16(q->u.packed.packed_cell->body));
+ return 1;
+ }
+ }
+ break;
+ }
+ return 0;
+}
+
/**
* Send destroy cell on a channel
*
@@ -2618,25 +2663,20 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
int
channel_send_destroy(circid_t circ_id, channel_t *chan, int reason)
{
- cell_t cell;
-
tor_assert(chan);
/* Check to make sure we can send on this channel first */
if (!(chan->state == CHANNEL_STATE_CLOSING ||
chan->state == CHANNEL_STATE_CLOSED ||
- chan->state == CHANNEL_STATE_ERROR)) {
- memset(&cell, 0, sizeof(cell_t));
- cell.circ_id = circ_id;
- cell.command = CELL_DESTROY;
- cell.payload[0] = (uint8_t) reason;
+ chan->state == CHANNEL_STATE_ERROR) &&
+ chan->cmux) {
+ channel_note_destroy_pending(chan, circ_id);
+ circuitmux_append_destroy_cell(chan, chan->cmux, circ_id, reason);
log_debug(LD_OR,
"Sending destroy (circID %u) on channel %p "
"(global ID " U64_FORMAT ")",
(unsigned)circ_id, chan,
U64_PRINTF_ARG(chan->global_identifier));
-
- channel_write_cell(chan, &cell);
} else {
log_warn(LD_BUG,
"Someone called channel_send_destroy() for circID %u "
diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c
index 1903fbe2eb..e50ab603e9 100644
--- a/src/or/circuitlist.c
+++ b/src/or/circuitlist.c
@@ -207,18 +207,123 @@ circuit_set_circid_chan_helper(circuit_t *circ, int direction,
}
}
+/** Mark that circuit id <b>id</b> shouldn't be used on channel <b>chan</b>,
+ * even if there is no circuit on the channel. We use this to keep the
+ * circuit id from getting re-used while we have queued but not yet sent
+ * a destroy cell. */
+void
+channel_mark_circid_unusable(channel_t *chan, circid_t id)
+{
+ chan_circid_circuit_map_t search;
+ chan_circid_circuit_map_t *ent;
+
+ /* See if there's an entry there. That wouldn't be good. */
+ memset(&search, 0, sizeof(search));
+ search.chan = chan;
+ search.circ_id = id;
+ ent = HT_FIND(chan_circid_map, &chan_circid_map, &search);
+
+ if (ent && ent->circuit) {
+ /* we have a problem. */
+ log_warn(LD_BUG, "Tried to mark %u unusable on %p, but there was already "
+ "a circuit there.", (unsigned)id, chan);
+ } else if (ent) {
+ /* It's already marked. */
+ } else {
+ ent = tor_malloc_zero(sizeof(chan_circid_circuit_map_t));
+ ent->chan = chan;
+ ent->circ_id = id;
+ /* leave circuit at NULL */
+ HT_INSERT(chan_circid_map, &chan_circid_map, ent);
+ }
+}
+
+/** Mark that a circuit id <b>id</b> can be used again on <b>chan</b>.
+ * We use this to re-enable the circuit ID after we've sent a destroy cell.
+ */
+void
+channel_mark_circid_usable(channel_t *chan, circid_t id)
+{
+ chan_circid_circuit_map_t search;
+ chan_circid_circuit_map_t *ent;
+
+ /* See if there's an entry there. That wouldn't be good. */
+ memset(&search, 0, sizeof(search));
+ search.chan = chan;
+ search.circ_id = id;
+ ent = HT_REMOVE(chan_circid_map, &chan_circid_map, &search);
+ if (ent && ent->circuit) {
+ log_warn(LD_BUG, "Tried to mark %u usable on %p, but there was already "
+ "a circuit there.", (unsigned)id, chan);
+ return;
+ }
+ if (_last_circid_chan_ent == ent)
+ _last_circid_chan_ent = NULL;
+ tor_free(ent);
+}
+
+/** Called to indicate that a DESTROY is pending on <b>chan</b> with
+ * circuit ID <b>id</b>, but hasn't been sent yet. */
+void
+channel_note_destroy_pending(channel_t *chan, circid_t id)
+{
+ circuit_t *circ = circuit_get_by_circid_channel_even_if_marked(id,chan);
+ if (circ) {
+ if (circ->n_chan == chan && circ->n_circ_id == id) {
+ circ->n_delete_pending = 1;
+ } else {
+ or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
+ if (orcirc->p_chan == chan && orcirc->p_circ_id == id) {
+ circ->p_delete_pending = 1;
+ }
+ }
+ return;
+ }
+ channel_mark_circid_unusable(chan, id);
+}
+
+/** Called to indicate that a DESTROY is no longer pending on <b>chan</b> with
+ * circuit ID <b>id</b> -- typically, because it has been sent. */
+void
+channel_note_destroy_not_pending(channel_t *chan, circid_t id)
+{
+ circuit_t *circ = circuit_get_by_circid_channel_even_if_marked(id,chan);
+ if (circ) {
+ if (circ->n_chan == chan && circ->n_circ_id == id) {
+ circ->n_delete_pending = 0;
+ } else {
+ or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
+ if (orcirc->p_chan == chan && orcirc->p_circ_id == id) {
+ circ->p_delete_pending = 0;
+ }
+ }
+ /* XXXX this shouldn't happen; log a bug here. */
+ return;
+ }
+ channel_mark_circid_usable(chan, id);
+}
+
/** Set the p_conn field of a circuit <b>circ</b>, along
* with the corresponding circuit ID, and add the circuit as appropriate
* to the (chan,id)-\>circuit map. */
void
-circuit_set_p_circid_chan(or_circuit_t *circ, circid_t id,
+circuit_set_p_circid_chan(or_circuit_t *or_circ, circid_t id,
channel_t *chan)
{
- circuit_set_circid_chan_helper(TO_CIRCUIT(circ), CELL_DIRECTION_IN,
- id, chan);
+ circuit_t *circ = TO_CIRCUIT(or_circ);
+ channel_t *old_chan = or_circ->p_chan;
+ circid_t old_id = or_circ->p_circ_id;
+
+ circuit_set_circid_chan_helper(circ, CELL_DIRECTION_IN, id, chan);
if (chan)
- tor_assert(bool_eq(circ->p_chan_cells.n, circ->next_active_on_p_chan));
+ tor_assert(bool_eq(or_circ->p_chan_cells.n,
+ or_circ->next_active_on_p_chan));
+
+ if (circ->p_delete_pending && old_chan) {
+ channel_mark_circid_unusable(old_chan, old_id);
+ circ->p_delete_pending = 0;
+ }
}
/** Set the n_conn field of a circuit <b>circ</b>, along
@@ -228,10 +333,18 @@ void
circuit_set_n_circid_chan(circuit_t *circ, circid_t id,
channel_t *chan)
{
+ channel_t *old_chan = circ->n_chan;
+ circid_t old_id = circ->n_circ_id;
+
circuit_set_circid_chan_helper(circ, CELL_DIRECTION_OUT, id, chan);
if (chan)
tor_assert(bool_eq(circ->n_chan_cells.n, circ->next_active_on_n_chan));
+
+ if (circ->n_delete_pending && old_chan) {
+ channel_mark_circid_unusable(old_chan, old_id);
+ circ->n_delete_pending = 0;
+ }
}
/** Change the state of <b>circ</b> to <b>state</b>, adding it to or removing
@@ -928,9 +1041,13 @@ circuit_get_by_global_id(uint32_t id)
* - circ-\>n_circ_id or circ-\>p_circ_id is equal to <b>circ_id</b>, and
* - circ is attached to <b>chan</b>, either as p_chan or n_chan.
* Return NULL if no such circuit exists.
+ *
+ * If <b>found_entry_out</b> is provided, set it to true if we have a
+ * placeholder entry for circid/chan, and leave it unset otherwise.
*/
static INLINE circuit_t *
-circuit_get_by_circid_channel_impl(circid_t circ_id, channel_t *chan)
+circuit_get_by_circid_channel_impl(circid_t circ_id, channel_t *chan,
+ int *found_entry_out)
{
chan_circid_circuit_map_t search;
chan_circid_circuit_map_t *found;
@@ -951,15 +1068,21 @@ circuit_get_by_circid_channel_impl(circid_t circ_id, channel_t *chan)
" circ_id %u, channel ID " U64_FORMAT " (%p)",
found->circuit, (unsigned)circ_id,
U64_PRINTF_ARG(chan->global_identifier), chan);
+ if (found_entry_out)
+ *found_entry_out = 1;
return found->circuit;
}
log_debug(LD_CIRC,
- "circuit_get_by_circid_channel_impl() found nothing for"
+ "circuit_get_by_circid_channel_impl() found %s for"
" circ_id %u, channel ID " U64_FORMAT " (%p)",
+ found ? "placeholder" : "nothing",
(unsigned)circ_id,
U64_PRINTF_ARG(chan->global_identifier), chan);
+ if (found_entry_out)
+ *found_entry_out = found ? 1 : 0;
+
return NULL;
/* The rest of this checks for bugs. Disabled by default. */
/* We comment it out because coverity complains otherwise.
@@ -993,7 +1116,7 @@ circuit_get_by_circid_channel_impl(circid_t circ_id, channel_t *chan)
circuit_t *
circuit_get_by_circid_channel(circid_t circ_id, channel_t *chan)
{
- circuit_t *circ = circuit_get_by_circid_channel_impl(circ_id, chan);
+ circuit_t *circ = circuit_get_by_circid_channel_impl(circ_id, chan, NULL);
if (!circ || circ->marked_for_close)
return NULL;
else
@@ -1009,7 +1132,7 @@ circuit_t *
circuit_get_by_circid_channel_even_if_marked(circid_t circ_id,
channel_t *chan)
{
- return circuit_get_by_circid_channel_impl(circ_id, chan);
+ return circuit_get_by_circid_channel_impl(circ_id, chan, NULL);
}
/** Return true iff the circuit ID <b>circ_id</b> is currently used by a
@@ -1017,7 +1140,9 @@ circuit_get_by_circid_channel_even_if_marked(circid_t circ_id,
int
circuit_id_in_use_on_channel(circid_t circ_id, channel_t *chan)
{
- return circuit_get_by_circid_channel_impl(circ_id, chan) != NULL;
+ int found = 0;
+ return circuit_get_by_circid_channel_impl(circ_id, chan, &found) != NULL
+ || found;
}
/** Return the circuit that a given edge connection is using. */
@@ -1585,15 +1710,16 @@ assert_circuit_ok(const circuit_t *c)
/* We use the _impl variant here to make sure we don't fail on marked
* circuits, which would not be returned by the regular function. */
circuit_t *c2 = circuit_get_by_circid_channel_impl(c->n_circ_id,
- c->n_chan);
+ c->n_chan, NULL);
tor_assert(c == c2);
}
}
if (or_circ && or_circ->p_chan) {
if (or_circ->p_circ_id) {
/* ibid */
- circuit_t *c2 = circuit_get_by_circid_channel_impl(or_circ->p_circ_id,
- or_circ->p_chan);
+ circuit_t *c2 =
+ circuit_get_by_circid_channel_impl(or_circ->p_circ_id,
+ or_circ->p_chan, NULL);
tor_assert(c == c2);
}
}
diff --git a/src/or/circuitlist.h b/src/or/circuitlist.h
index d67f80b065..94887d5faf 100644
--- a/src/or/circuitlist.h
+++ b/src/or/circuitlist.h
@@ -23,6 +23,8 @@ void circuit_set_p_circid_chan(or_circuit_t *circ, circid_t id,
channel_t *chan);
void circuit_set_n_circid_chan(circuit_t *circ, circid_t id,
channel_t *chan);
+void channel_mark_circid_unusable(channel_t *chan, circid_t id);
+void channel_mark_circid_usable(channel_t *chan, circid_t id);
void circuit_set_state(circuit_t *circ, uint8_t state);
void circuit_close_all_marked(void);
int32_t circuit_initial_package_window(void);
@@ -62,5 +64,8 @@ void assert_cpath_layer_ok(const crypt_path_t *cp);
void assert_circuit_ok(const circuit_t *c);
void circuit_free_all(void);
+void channel_note_destroy_pending(channel_t *chan, circid_t id);
+void channel_note_destroy_not_pending(channel_t *chan, circid_t id);
+
#endif
diff --git a/src/or/circuitmux.c b/src/or/circuitmux.c
index 545cfd0650..f579c3ead1 100644
--- a/src/or/circuitmux.c
+++ b/src/or/circuitmux.c
@@ -10,6 +10,7 @@
#include "channel.h"
#include "circuitlist.h"
#include "circuitmux.h"
+#include "relay.h"
/*
* Private typedefs for circuitmux.c
@@ -115,6 +116,22 @@ struct circuitmux_s {
*/
struct circuit_t *active_circuits_head, *active_circuits_tail;
+ /** List of queued destroy cells */
+ cell_queue_t destroy_cell_queue;
+ /** Boolean: True iff the last cell to circuitmux_get_first_active_circuit
+ * returned the destroy queue. Used to force alternation between
+ * destroy/non-destroy cells.
+ *
+ * XXXX There is no reason to think that alternating is a particularly good
+ * approach -- it's just designed to prevent destroys from starving other
+ * cells completely.
+ */
+ unsigned int last_cell_was_destroy : 1;
+ /** Destroy counter: increment this when a destroy gets queued, decrement
+ * when we unqueue it, so we can test to make sure they don't starve.
+ */
+ int64_t destroy_ctr;
+
/*
* Circuitmux policy; if this is non-NULL, it can override the built-
* in round-robin active circuits behavior. This is how EWMA works in
@@ -193,6 +210,11 @@ static void circuitmux_assert_okay_pass_one(circuitmux_t *cmux);
static void circuitmux_assert_okay_pass_two(circuitmux_t *cmux);
static void circuitmux_assert_okay_pass_three(circuitmux_t *cmux);
+/* Static global variables */
+
+/** Count the destroy balance to debug destroy queue logic */
+static int64_t global_destroy_ctr = 0;
+
/* Function definitions */
/**
@@ -508,6 +530,30 @@ circuitmux_free(circuitmux_t *cmux)
tor_free(cmux->chanid_circid_map);
}
+ /*
+ * We're throwing away some destroys; log the counter and
+ * adjust the global counter by the queue size.
+ */
+ if (cmux->destroy_cell_queue.n > 0) {
+ cmux->destroy_ctr -= cmux->destroy_cell_queue.n;
+ global_destroy_ctr -= cmux->destroy_cell_queue.n;
+ log_debug(LD_CIRC,
+ "Freeing cmux at %p with %u queued destroys; the last cmux "
+ "destroy balance was "I64_FORMAT", global is "I64_FORMAT,
+ cmux, cmux->destroy_cell_queue.n,
+ I64_PRINTF_ARG(cmux->destroy_ctr),
+ I64_PRINTF_ARG(global_destroy_ctr));
+ } else {
+ log_debug(LD_CIRC,
+ "Freeing cmux at %p with no queued destroys, the cmux destroy "
+ "balance was "I64_FORMAT", global is "I64_FORMAT,
+ cmux,
+ I64_PRINTF_ARG(cmux->destroy_ctr),
+ I64_PRINTF_ARG(global_destroy_ctr));
+ }
+
+ cell_queue_clear(&cmux->destroy_cell_queue);
+
tor_free(cmux);
}
@@ -816,7 +862,7 @@ circuitmux_num_cells(circuitmux_t *cmux)
{
tor_assert(cmux);
- return cmux->n_cells;
+ return cmux->n_cells + cmux->destroy_cell_queue.n;
}
/**
@@ -1368,16 +1414,36 @@ circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ,
/**
* Pick a circuit to send from, using the active circuits list or a
* circuitmux policy if one is available. This is called from channel.c.
+ *
+ * If we would rather send a destroy cell, return NULL and set
+ * *<b>destroy_queue_out</b> to the destroy queue.
+ *
+ * If we have nothing to send, set *<b>destroy_queue_out</b> to NULL and
+ * return NULL.
*/
circuit_t *
-circuitmux_get_first_active_circuit(circuitmux_t *cmux)
+circuitmux_get_first_active_circuit(circuitmux_t *cmux,
+ cell_queue_t **destroy_queue_out)
{
circuit_t *circ = NULL;
tor_assert(cmux);
+ tor_assert(destroy_queue_out);
+
+ *destroy_queue_out = NULL;
+
+ if (cmux->destroy_cell_queue.n &&
+ (!cmux->last_cell_was_destroy || cmux->n_active_circuits == 0)) {
+ /* We have destroy cells to send, and either we just sent a relay cell,
+ * or we have no relay cells to send. */
- if (cmux->n_active_circuits > 0) {
+ /* XXXX We should let the cmux policy have some say in this eventually. */
+ /* XXXX Alternating is not a terribly brilliant approach here. */
+ *destroy_queue_out = &cmux->destroy_cell_queue;
+
+ cmux->last_cell_was_destroy = 1;
+ } else if (cmux->n_active_circuits > 0) {
/* We also must have a cell available for this to be the case */
tor_assert(cmux->n_cells > 0);
/* Do we have a policy-provided circuit selector? */
@@ -1389,7 +1455,11 @@ circuitmux_get_first_active_circuit(circuitmux_t *cmux)
tor_assert(cmux->active_circuits_head);
circ = cmux->active_circuits_head;
}
- } else tor_assert(cmux->n_cells == 0);
+ cmux->last_cell_was_destroy = 0;
+ } else {
+ tor_assert(cmux->n_cells == 0);
+ tor_assert(cmux->destroy_cell_queue.n == 0);
+ }
return circ;
}
@@ -1463,6 +1533,26 @@ circuitmux_notify_xmit_cells(circuitmux_t *cmux, circuit_t *circ,
circuitmux_assert_okay_paranoid(cmux);
}
+/**
+ * Notify the circuitmux that a destroy was sent, so we can update
+ * the counter.
+ */
+
+void
+circuitmux_notify_xmit_destroy(circuitmux_t *cmux)
+{
+ tor_assert(cmux);
+
+ --(cmux->destroy_ctr);
+ --(global_destroy_ctr);
+ log_debug(LD_CIRC,
+ "Cmux at %p sent a destroy, cmux counter is now "I64_FORMAT", "
+ "global counter is now "I64_FORMAT,
+ cmux,
+ I64_PRINTF_ARG(cmux->destroy_ctr),
+ I64_PRINTF_ARG(global_destroy_ctr));
+}
+
/*
* Circuitmux consistency checking assertions
*/
@@ -1743,3 +1833,40 @@ circuitmux_assert_okay_pass_three(circuitmux_t *cmux)
}
}
+/*DOCDOC */
+void
+circuitmux_append_destroy_cell(channel_t *chan,
+ circuitmux_t *cmux,
+ circid_t circ_id,
+ uint8_t reason)
+{
+ cell_t cell;
+ memset(&cell, 0, sizeof(cell_t));
+ cell.circ_id = circ_id;
+ cell.command = CELL_DESTROY;
+ cell.payload[0] = (uint8_t) reason;
+
+ cell_queue_append_packed_copy(&cmux->destroy_cell_queue, &cell,
+ chan->wide_circ_ids, 0);
+
+ /* Destroy entering the queue, update counters */
+ ++(cmux->destroy_ctr);
+ ++global_destroy_ctr;
+ log_debug(LD_CIRC,
+ "Cmux at %p queued a destroy for circ %u, cmux counter is now "
+ I64_FORMAT", global counter is now "I64_FORMAT,
+ cmux, circ_id,
+ I64_PRINTF_ARG(cmux->destroy_ctr),
+ I64_PRINTF_ARG(global_destroy_ctr));
+
+ /* XXXX Duplicate code from append_cell_to_circuit_queue */
+ if (!channel_has_queued_writes(chan)) {
+ /* There is no data at all waiting to be sent on the outbuf. Add a
+ * cell, so that we can notice when it gets flushed, flushed_some can
+ * get called, and we can start putting more data onto the buffer then.
+ */
+ log_debug(LD_GENERAL, "Primed a buffer.");
+ channel_flush_from_first_active_circuit(chan, 1);
+ }
+}
+
diff --git a/src/or/circuitmux.h b/src/or/circuitmux.h
index 25644ffab7..9ff29de70a 100644
--- a/src/or/circuitmux.h
+++ b/src/or/circuitmux.h
@@ -120,9 +120,11 @@ unsigned int circuitmux_num_circuits(circuitmux_t *cmux);
unsigned int circuitmux_num_active_circuits(circuitmux_t *cmux);
/* Channel interface */
-circuit_t * circuitmux_get_first_active_circuit(circuitmux_t *cmux);
+circuit_t * circuitmux_get_first_active_circuit(circuitmux_t *cmux,
+ cell_queue_t **destroy_queue_out);
void circuitmux_notify_xmit_cells(circuitmux_t *cmux, circuit_t *circ,
unsigned int n_cells);
+void circuitmux_notify_xmit_destroy(circuitmux_t *cmux);
/* Circuit interface */
void circuitmux_attach_circuit(circuitmux_t *cmux, circuit_t *circ,
@@ -132,5 +134,9 @@ void circuitmux_clear_num_cells(circuitmux_t *cmux, circuit_t *circ);
void circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ,
unsigned int n_cells);
+void circuitmux_append_destroy_cell(channel_t *chan,
+ circuitmux_t *cmux, circid_t circ_id,
+ uint8_t reason);
+
#endif /* TOR_CIRCUITMUX_H */
diff --git a/src/or/or.h b/src/or/or.h
index daff6de933..0eebe395d9 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -2777,6 +2777,13 @@ typedef struct circuit_t {
* allowing n_streams to add any more cells. (OR circuit only.) */
unsigned int streams_blocked_on_p_chan : 1;
+ /** True iff we have queued a delete backwards on this circuit, but not put
+ * it on the output buffer. */
+ unsigned int p_delete_pending : 1;
+ /** True iff we have queued a delete forwards on this circuit, but not put
+ * it on the output buffer. */
+ unsigned int n_delete_pending : 1;
+
uint8_t state; /**< Current status of this circuit. */
uint8_t purpose; /**< Why are we creating this circuit? */
diff --git a/src/or/relay.c b/src/or/relay.c
index cef138e721..f0861f2f7c 100644
--- a/src/or/relay.c
+++ b/src/or/relay.c
@@ -2148,11 +2148,11 @@ cell_queue_append(cell_queue_t *queue, packed_cell_t *cell)
/** Append a newly allocated copy of <b>cell</b> to the end of <b>queue</b> */
void
cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell,
- int wide_circ_ids)
+ int wide_circ_ids, int use_stats)
{
packed_cell_t *copy = packed_cell_copy(cell, wide_circ_ids);
/* Remember the time when this cell was put in the queue. */
- if (get_options()->CellStatistics) {
+ if (get_options()->CellStatistics && use_stats) {
struct timeval now;
uint32_t added;
insertion_time_queue_t *it_queue = queue->insertion_times;
@@ -2347,7 +2347,7 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
{
circuitmux_t *cmux = NULL;
int n_flushed = 0;
- cell_queue_t *queue;
+ cell_queue_t *queue, *destroy_queue=NULL;
circuit_t *circ;
or_circuit_t *or_circ;
int streams_blocked;
@@ -2360,7 +2360,18 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
/* Main loop: pick a circuit, send a cell, update the cmux */
while (n_flushed < max) {
- circ = circuitmux_get_first_active_circuit(cmux);
+ circ = circuitmux_get_first_active_circuit(cmux, &destroy_queue);
+ if (destroy_queue) {
+ /* this code is duplicated from some of the logic below. Ugly! XXXX */
+ tor_assert(destroy_queue->n > 0);
+ cell = cell_queue_pop(destroy_queue);
+ channel_write_packed_cell(chan, cell);
+ /* Update the cmux destroy counter */
+ circuitmux_notify_xmit_destroy(cmux);
+ cell = NULL;
+ ++n_flushed;
+ continue;
+ }
/* If it returns NULL, no cells left to send */
if (!circ) break;
assert_cmux_ok_paranoid(chan);
@@ -2482,7 +2493,7 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
streams_blocked = circ->streams_blocked_on_p_chan;
}
- cell_queue_append_packed_copy(queue, cell, chan->wide_circ_ids);
+ cell_queue_append_packed_copy(queue, cell, chan->wide_circ_ids, 1);
/* If we have too many cells on the circuit, we should stop reading from
* the edge streams for a while. */
diff --git a/src/or/relay.h b/src/or/relay.h
index 229fb4f737..c5f0df2f0e 100644
--- a/src/or/relay.h
+++ b/src/or/relay.h
@@ -53,7 +53,7 @@ void packed_cell_free(packed_cell_t *cell);
void cell_queue_clear(cell_queue_t *queue);
void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell);
void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell,
- int wide_circ_ids);
+ int wide_circ_ids, int use_stats);
void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
cell_t *cell, cell_direction_t direction,