summaryrefslogtreecommitdiff
path: root/src/or/circuitmux.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/or/circuitmux.c')
-rw-r--r--src/or/circuitmux.c171
1 files changed, 162 insertions, 9 deletions
diff --git a/src/or/circuitmux.c b/src/or/circuitmux.c
index 545cfd0650..f2af943937 100644
--- a/src/or/circuitmux.c
+++ b/src/or/circuitmux.c
@@ -10,6 +10,7 @@
#include "channel.h"
#include "circuitlist.h"
#include "circuitmux.h"
+#include "relay.h"
/*
* Private typedefs for circuitmux.c
@@ -115,6 +116,22 @@ struct circuitmux_s {
*/
struct circuit_t *active_circuits_head, *active_circuits_tail;
+ /** List of queued destroy cells */
+ cell_queue_t destroy_cell_queue;
+ /** Boolean: True iff the last cell to circuitmux_get_first_active_circuit
+ * returned the destroy queue. Used to force alternation between
+ * destroy/non-destroy cells.
+ *
+ * XXXX There is no reason to think that alternating is a particularly good
+ * approach -- it's just designed to prevent destroys from starving other
+ * cells completely.
+ */
+ unsigned int last_cell_was_destroy : 1;
+ /** Destroy counter: increment this when a destroy gets queued, decrement
+ * when we unqueue it, so we can test to make sure they don't starve.
+ */
+ int64_t destroy_ctr;
+
/*
* Circuitmux policy; if this is non-NULL, it can override the built-
* in round-robin active circuits behavior. This is how EWMA works in
@@ -193,6 +210,11 @@ static void circuitmux_assert_okay_pass_one(circuitmux_t *cmux);
static void circuitmux_assert_okay_pass_two(circuitmux_t *cmux);
static void circuitmux_assert_okay_pass_three(circuitmux_t *cmux);
+/* Static global variables */
+
+/** Count the destroy balance to debug destroy queue logic */
+static int64_t global_destroy_ctr = 0;
+
/* Function definitions */
/**
@@ -361,6 +383,7 @@ circuitmux_alloc(void)
rv = tor_malloc_zero(sizeof(*rv));
rv->chanid_circid_map = tor_malloc_zero(sizeof(*( rv->chanid_circid_map)));
HT_INIT(chanid_circid_muxinfo_map, rv->chanid_circid_map);
+ cell_queue_init(&rv->destroy_cell_queue);
return rv;
}
@@ -476,6 +499,31 @@ circuitmux_detach_all_circuits(circuitmux_t *cmux)
cmux->n_cells = 0;
}
+/** Reclaim all circuit IDs currently marked as unusable on <b>chan</b> because
+ * of pending destroy cells in <b>cmux</b>.
+ *
+ * This function must be called AFTER circuits are unlinked from the (channel,
+ * circuid-id) map with circuit_unlink_all_from_channel(), but before calling
+ * circuitmux_free().
+ */
+void
+circuitmux_mark_destroyed_circids_usable(circuitmux_t *cmux, channel_t *chan)
+{
+ packed_cell_t *cell;
+ int n_bad = 0;
+ TOR_SIMPLEQ_FOREACH(cell, &cmux->destroy_cell_queue.head, next) {
+ circid_t circid = 0;
+ if (packed_cell_is_destroy(chan, cell, &circid)) {
+ channel_mark_circid_usable(chan, circid);
+ } else {
+ ++n_bad;
+ }
+ }
+ if (n_bad)
+ log_warn(LD_BUG, "%d cell(s) on destroy queue did not look like a "
+ "DESTROY cell.", n_bad);
+}
+
/**
* Free a circuitmux_t; the circuits must be detached first with
* circuitmux_detach_all_circuits().
@@ -508,6 +556,30 @@ circuitmux_free(circuitmux_t *cmux)
tor_free(cmux->chanid_circid_map);
}
+ /*
+ * We're throwing away some destroys; log the counter and
+ * adjust the global counter by the queue size.
+ */
+ if (cmux->destroy_cell_queue.n > 0) {
+ cmux->destroy_ctr -= cmux->destroy_cell_queue.n;
+ global_destroy_ctr -= cmux->destroy_cell_queue.n;
+ log_debug(LD_CIRC,
+ "Freeing cmux at %p with %u queued destroys; the last cmux "
+ "destroy balance was "I64_FORMAT", global is "I64_FORMAT,
+ cmux, cmux->destroy_cell_queue.n,
+ I64_PRINTF_ARG(cmux->destroy_ctr),
+ I64_PRINTF_ARG(global_destroy_ctr));
+ } else {
+ log_debug(LD_CIRC,
+ "Freeing cmux at %p with no queued destroys, the cmux destroy "
+ "balance was "I64_FORMAT", global is "I64_FORMAT,
+ cmux,
+ I64_PRINTF_ARG(cmux->destroy_ctr),
+ I64_PRINTF_ARG(global_destroy_ctr));
+ }
+
+ cell_queue_clear(&cmux->destroy_cell_queue);
+
tor_free(cmux);
}
@@ -816,7 +888,7 @@ circuitmux_num_cells(circuitmux_t *cmux)
{
tor_assert(cmux);
- return cmux->n_cells;
+ return cmux->n_cells + cmux->destroy_cell_queue.n;
}
/**
@@ -851,9 +923,9 @@ circuitmux_num_circuits(circuitmux_t *cmux)
* Attach a circuit to a circuitmux, for the specified direction.
*/
-void
-circuitmux_attach_circuit(circuitmux_t *cmux, circuit_t *circ,
- cell_direction_t direction)
+MOCK_IMPL(void,
+circuitmux_attach_circuit,(circuitmux_t *cmux, circuit_t *circ,
+ cell_direction_t direction))
{
channel_t *chan = NULL;
uint64_t channel_id;
@@ -1000,8 +1072,8 @@ circuitmux_attach_circuit(circuitmux_t *cmux, circuit_t *circ,
* no-op if not attached.
*/
-void
-circuitmux_detach_circuit(circuitmux_t *cmux, circuit_t *circ)
+MOCK_IMPL(void,
+circuitmux_detach_circuit,(circuitmux_t *cmux, circuit_t *circ))
{
chanid_circid_muxinfo_t search, *hashent = NULL;
/*
@@ -1368,16 +1440,36 @@ circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ,
/**
* Pick a circuit to send from, using the active circuits list or a
* circuitmux policy if one is available. This is called from channel.c.
+ *
+ * If we would rather send a destroy cell, return NULL and set
+ * *<b>destroy_queue_out</b> to the destroy queue.
+ *
+ * If we have nothing to send, set *<b>destroy_queue_out</b> to NULL and
+ * return NULL.
*/
circuit_t *
-circuitmux_get_first_active_circuit(circuitmux_t *cmux)
+circuitmux_get_first_active_circuit(circuitmux_t *cmux,
+ cell_queue_t **destroy_queue_out)
{
circuit_t *circ = NULL;
tor_assert(cmux);
+ tor_assert(destroy_queue_out);
+
+ *destroy_queue_out = NULL;
+
+ if (cmux->destroy_cell_queue.n &&
+ (!cmux->last_cell_was_destroy || cmux->n_active_circuits == 0)) {
+ /* We have destroy cells to send, and either we just sent a relay cell,
+ * or we have no relay cells to send. */
+
+ /* XXXX We should let the cmux policy have some say in this eventually. */
+ /* XXXX Alternating is not a terribly brilliant approach here. */
+ *destroy_queue_out = &cmux->destroy_cell_queue;
- if (cmux->n_active_circuits > 0) {
+ cmux->last_cell_was_destroy = 1;
+ } else if (cmux->n_active_circuits > 0) {
/* We also must have a cell available for this to be the case */
tor_assert(cmux->n_cells > 0);
/* Do we have a policy-provided circuit selector? */
@@ -1389,7 +1481,11 @@ circuitmux_get_first_active_circuit(circuitmux_t *cmux)
tor_assert(cmux->active_circuits_head);
circ = cmux->active_circuits_head;
}
- } else tor_assert(cmux->n_cells == 0);
+ cmux->last_cell_was_destroy = 0;
+ } else {
+ tor_assert(cmux->n_cells == 0);
+ tor_assert(cmux->destroy_cell_queue.n == 0);
+ }
return circ;
}
@@ -1463,6 +1559,26 @@ circuitmux_notify_xmit_cells(circuitmux_t *cmux, circuit_t *circ,
circuitmux_assert_okay_paranoid(cmux);
}
+/**
+ * Notify the circuitmux that a destroy was sent, so we can update
+ * the counter.
+ */
+
+void
+circuitmux_notify_xmit_destroy(circuitmux_t *cmux)
+{
+ tor_assert(cmux);
+
+ --(cmux->destroy_ctr);
+ --(global_destroy_ctr);
+ log_debug(LD_CIRC,
+ "Cmux at %p sent a destroy, cmux counter is now "I64_FORMAT", "
+ "global counter is now "I64_FORMAT,
+ cmux,
+ I64_PRINTF_ARG(cmux->destroy_ctr),
+ I64_PRINTF_ARG(global_destroy_ctr));
+}
+
/*
* Circuitmux consistency checking assertions
*/
@@ -1743,3 +1859,40 @@ circuitmux_assert_okay_pass_three(circuitmux_t *cmux)
}
}
+/*DOCDOC */
+void
+circuitmux_append_destroy_cell(channel_t *chan,
+ circuitmux_t *cmux,
+ circid_t circ_id,
+ uint8_t reason)
+{
+ cell_t cell;
+ memset(&cell, 0, sizeof(cell_t));
+ cell.circ_id = circ_id;
+ cell.command = CELL_DESTROY;
+ cell.payload[0] = (uint8_t) reason;
+
+ cell_queue_append_packed_copy(NULL, &cmux->destroy_cell_queue, 0, &cell,
+ chan->wide_circ_ids, 0);
+
+ /* Destroy entering the queue, update counters */
+ ++(cmux->destroy_ctr);
+ ++global_destroy_ctr;
+ log_debug(LD_CIRC,
+ "Cmux at %p queued a destroy for circ %u, cmux counter is now "
+ I64_FORMAT", global counter is now "I64_FORMAT,
+ cmux, circ_id,
+ I64_PRINTF_ARG(cmux->destroy_ctr),
+ I64_PRINTF_ARG(global_destroy_ctr));
+
+ /* XXXX Duplicate code from append_cell_to_circuit_queue */
+ if (!channel_has_queued_writes(chan)) {
+ /* There is no data at all waiting to be sent on the outbuf. Add a
+ * cell, so that we can notice when it gets flushed, flushed_some can
+ * get called, and we can start putting more data onto the buffer then.
+ */
+ log_debug(LD_GENERAL, "Primed a buffer.");
+ channel_flush_from_first_active_circuit(chan, 1);
+ }
+}
+