diff options
Diffstat (limited to 'src/or/circuitmux.c')
-rw-r--r-- | src/or/circuitmux.c | 296 |
1 files changed, 276 insertions, 20 deletions
diff --git a/src/or/circuitmux.c b/src/or/circuitmux.c index 545cfd0650..a77bffac90 100644 --- a/src/or/circuitmux.c +++ b/src/or/circuitmux.c @@ -1,4 +1,4 @@ -/* * Copyright (c) 2012-2013, The Tor Project, Inc. */ +/* * Copyright (c) 2012-2015, The Tor Project, Inc. */ /* See LICENSE for licensing information */ /** @@ -10,6 +10,7 @@ #include "channel.h" #include "circuitlist.h" #include "circuitmux.h" +#include "relay.h" /* * Private typedefs for circuitmux.c @@ -115,6 +116,22 @@ struct circuitmux_s { */ struct circuit_t *active_circuits_head, *active_circuits_tail; + /** List of queued destroy cells */ + cell_queue_t destroy_cell_queue; + /** Boolean: True iff the last cell to circuitmux_get_first_active_circuit + * returned the destroy queue. Used to force alternation between + * destroy/non-destroy cells. + * + * XXXX There is no reason to think that alternating is a particularly good + * approach -- it's just designed to prevent destroys from starving other + * cells completely. + */ + unsigned int last_cell_was_destroy : 1; + /** Destroy counter: increment this when a destroy gets queued, decrement + * when we unqueue it, so we can test to make sure they don't starve. + */ + int64_t destroy_ctr; + /* * Circuitmux policy; if this is non-NULL, it can override the built- * in round-robin active circuits behavior. This is how EWMA works in @@ -193,6 +210,11 @@ static void circuitmux_assert_okay_pass_one(circuitmux_t *cmux); static void circuitmux_assert_okay_pass_two(circuitmux_t *cmux); static void circuitmux_assert_okay_pass_three(circuitmux_t *cmux); +/* Static global variables */ + +/** Count the destroy balance to debug destroy queue logic */ +static int64_t global_destroy_ctr = 0; + /* Function definitions */ /** @@ -341,9 +363,9 @@ HT_HEAD(chanid_circid_muxinfo_map, chanid_circid_muxinfo_t); /* Emit a bunch of hash table stuff */ HT_PROTOTYPE(chanid_circid_muxinfo_map, chanid_circid_muxinfo_t, node, chanid_circid_entry_hash, chanid_circid_entries_eq); -HT_GENERATE(chanid_circid_muxinfo_map, chanid_circid_muxinfo_t, node, - chanid_circid_entry_hash, chanid_circid_entries_eq, 0.6, - malloc, realloc, free); +HT_GENERATE2(chanid_circid_muxinfo_map, chanid_circid_muxinfo_t, node, + chanid_circid_entry_hash, chanid_circid_entries_eq, 0.6, + tor_reallocarray_, tor_free_) /* * Circuitmux alloc/free functions @@ -361,16 +383,20 @@ circuitmux_alloc(void) rv = tor_malloc_zero(sizeof(*rv)); rv->chanid_circid_map = tor_malloc_zero(sizeof(*( rv->chanid_circid_map))); HT_INIT(chanid_circid_muxinfo_map, rv->chanid_circid_map); + cell_queue_init(&rv->destroy_cell_queue); return rv; } /** * Detach all circuits from a circuitmux (use before circuitmux_free()) + * + * If <b>detached_out</b> is non-NULL, add every detached circuit_t to + * detached_out. */ void -circuitmux_detach_all_circuits(circuitmux_t *cmux) +circuitmux_detach_all_circuits(circuitmux_t *cmux, smartlist_t *detached_out) { chanid_circid_muxinfo_t **i = NULL, *to_remove; channel_t *chan = NULL; @@ -386,7 +412,11 @@ circuitmux_detach_all_circuits(circuitmux_t *cmux) i = HT_START(chanid_circid_muxinfo_map, cmux->chanid_circid_map); while (i) { to_remove = *i; - if (to_remove) { + + if (! to_remove) { + log_warn(LD_BUG, "Somehow, an HT iterator gave us a NULL pointer."); + break; + } else { /* Find a channel and circuit */ chan = channel_find_by_global_id(to_remove->chan_id); if (chan) { @@ -407,6 +437,9 @@ circuitmux_detach_all_circuits(circuitmux_t *cmux) /* Clear n_mux */ circ->n_mux = NULL; + + if (detached_out) + smartlist_add(detached_out, circ); } else if (circ->magic == OR_CIRCUIT_MAGIC) { /* * Update active_circuits et al.; this does policy notifies, so @@ -422,6 +455,9 @@ circuitmux_detach_all_circuits(circuitmux_t *cmux) * so clear p_mux. */ TO_OR_CIRCUIT(circ)->p_mux = NULL; + + if (detached_out) + smartlist_add(detached_out, circ); } else { /* Complain and move on */ log_warn(LD_CIRC, @@ -476,6 +512,31 @@ circuitmux_detach_all_circuits(circuitmux_t *cmux) cmux->n_cells = 0; } +/** Reclaim all circuit IDs currently marked as unusable on <b>chan</b> because + * of pending destroy cells in <b>cmux</b>. + * + * This function must be called AFTER circuits are unlinked from the (channel, + * circuid-id) map with circuit_unlink_all_from_channel(), but before calling + * circuitmux_free(). + */ +void +circuitmux_mark_destroyed_circids_usable(circuitmux_t *cmux, channel_t *chan) +{ + packed_cell_t *cell; + int n_bad = 0; + TOR_SIMPLEQ_FOREACH(cell, &cmux->destroy_cell_queue.head, next) { + circid_t circid = 0; + if (packed_cell_is_destroy(chan, cell, &circid)) { + channel_mark_circid_usable(chan, circid); + } else { + ++n_bad; + } + } + if (n_bad) + log_warn(LD_BUG, "%d cell(s) on destroy queue did not look like a " + "DESTROY cell.", n_bad); +} + /** * Free a circuitmux_t; the circuits must be detached first with * circuitmux_detach_all_circuits(). @@ -508,6 +569,30 @@ circuitmux_free(circuitmux_t *cmux) tor_free(cmux->chanid_circid_map); } + /* + * We're throwing away some destroys; log the counter and + * adjust the global counter by the queue size. + */ + if (cmux->destroy_cell_queue.n > 0) { + cmux->destroy_ctr -= cmux->destroy_cell_queue.n; + global_destroy_ctr -= cmux->destroy_cell_queue.n; + log_debug(LD_CIRC, + "Freeing cmux at %p with %u queued destroys; the last cmux " + "destroy balance was "I64_FORMAT", global is "I64_FORMAT, + cmux, cmux->destroy_cell_queue.n, + I64_PRINTF_ARG(cmux->destroy_ctr), + I64_PRINTF_ARG(global_destroy_ctr)); + } else { + log_debug(LD_CIRC, + "Freeing cmux at %p with no queued destroys, the cmux destroy " + "balance was "I64_FORMAT", global is "I64_FORMAT, + cmux, + I64_PRINTF_ARG(cmux->destroy_ctr), + I64_PRINTF_ARG(global_destroy_ctr)); + } + + cell_queue_clear(&cmux->destroy_cell_queue); + tor_free(cmux); } @@ -536,8 +621,8 @@ circuitmux_clear_policy(circuitmux_t *cmux) * Return the policy currently installed on a circuitmux_t */ -const circuitmux_policy_t * -circuitmux_get_policy(circuitmux_t *cmux) +MOCK_IMPL(const circuitmux_policy_t *, +circuitmux_get_policy, (circuitmux_t *cmux)) { tor_assert(cmux); @@ -811,12 +896,12 @@ circuitmux_num_cells_for_circuit(circuitmux_t *cmux, circuit_t *circ) * Query total number of available cells on a circuitmux */ -unsigned int -circuitmux_num_cells(circuitmux_t *cmux) +MOCK_IMPL(unsigned int, +circuitmux_num_cells, (circuitmux_t *cmux)) { tor_assert(cmux); - return cmux->n_cells; + return cmux->n_cells + cmux->destroy_cell_queue.n; } /** @@ -851,9 +936,9 @@ circuitmux_num_circuits(circuitmux_t *cmux) * Attach a circuit to a circuitmux, for the specified direction. */ -void -circuitmux_attach_circuit(circuitmux_t *cmux, circuit_t *circ, - cell_direction_t direction) +MOCK_IMPL(void, +circuitmux_attach_circuit,(circuitmux_t *cmux, circuit_t *circ, + cell_direction_t direction)) { channel_t *chan = NULL; uint64_t channel_id; @@ -1000,15 +1085,18 @@ circuitmux_attach_circuit(circuitmux_t *cmux, circuit_t *circ, * no-op if not attached. */ -void -circuitmux_detach_circuit(circuitmux_t *cmux, circuit_t *circ) +MOCK_IMPL(void, +circuitmux_detach_circuit,(circuitmux_t *cmux, circuit_t *circ)) { chanid_circid_muxinfo_t search, *hashent = NULL; /* * Use this to keep track of whether we found it for n_chan or * p_chan for consistency checking. + * + * The 0 initializer is not a valid cell_direction_t value. + * We assert that it has been replaced with a valid value before it is used. */ - cell_direction_t last_searched_direction; + cell_direction_t last_searched_direction = 0; tor_assert(cmux); tor_assert(cmux->chanid_circid_map); @@ -1038,6 +1126,9 @@ circuitmux_detach_circuit(circuitmux_t *cmux, circuit_t *circ) } } + tor_assert(last_searched_direction == CELL_DIRECTION_OUT + || last_searched_direction == CELL_DIRECTION_IN); + /* * If hashent isn't NULL, we have a circuit to detach; don't remove it from * the map until later of circuitmux_make_circuit_inactive() breaks. @@ -1368,16 +1459,36 @@ circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ, /** * Pick a circuit to send from, using the active circuits list or a * circuitmux policy if one is available. This is called from channel.c. + * + * If we would rather send a destroy cell, return NULL and set + * *<b>destroy_queue_out</b> to the destroy queue. + * + * If we have nothing to send, set *<b>destroy_queue_out</b> to NULL and + * return NULL. */ circuit_t * -circuitmux_get_first_active_circuit(circuitmux_t *cmux) +circuitmux_get_first_active_circuit(circuitmux_t *cmux, + cell_queue_t **destroy_queue_out) { circuit_t *circ = NULL; tor_assert(cmux); + tor_assert(destroy_queue_out); + + *destroy_queue_out = NULL; + + if (cmux->destroy_cell_queue.n && + (!cmux->last_cell_was_destroy || cmux->n_active_circuits == 0)) { + /* We have destroy cells to send, and either we just sent a relay cell, + * or we have no relay cells to send. */ - if (cmux->n_active_circuits > 0) { + /* XXXX We should let the cmux policy have some say in this eventually. */ + /* XXXX Alternating is not a terribly brilliant approach here. */ + *destroy_queue_out = &cmux->destroy_cell_queue; + + cmux->last_cell_was_destroy = 1; + } else if (cmux->n_active_circuits > 0) { /* We also must have a cell available for this to be the case */ tor_assert(cmux->n_cells > 0); /* Do we have a policy-provided circuit selector? */ @@ -1389,7 +1500,11 @@ circuitmux_get_first_active_circuit(circuitmux_t *cmux) tor_assert(cmux->active_circuits_head); circ = cmux->active_circuits_head; } - } else tor_assert(cmux->n_cells == 0); + cmux->last_cell_was_destroy = 0; + } else { + tor_assert(cmux->n_cells == 0); + tor_assert(cmux->destroy_cell_queue.n == 0); + } return circ; } @@ -1463,6 +1578,26 @@ circuitmux_notify_xmit_cells(circuitmux_t *cmux, circuit_t *circ, circuitmux_assert_okay_paranoid(cmux); } +/** + * Notify the circuitmux that a destroy was sent, so we can update + * the counter. + */ + +void +circuitmux_notify_xmit_destroy(circuitmux_t *cmux) +{ + tor_assert(cmux); + + --(cmux->destroy_ctr); + --(global_destroy_ctr); + log_debug(LD_CIRC, + "Cmux at %p sent a destroy, cmux counter is now "I64_FORMAT", " + "global counter is now "I64_FORMAT, + cmux, + I64_PRINTF_ARG(cmux->destroy_ctr), + I64_PRINTF_ARG(global_destroy_ctr)); +} + /* * Circuitmux consistency checking assertions */ @@ -1743,3 +1878,124 @@ circuitmux_assert_okay_pass_three(circuitmux_t *cmux) } } +/*DOCDOC */ +void +circuitmux_append_destroy_cell(channel_t *chan, + circuitmux_t *cmux, + circid_t circ_id, + uint8_t reason) +{ + cell_t cell; + memset(&cell, 0, sizeof(cell_t)); + cell.circ_id = circ_id; + cell.command = CELL_DESTROY; + cell.payload[0] = (uint8_t) reason; + + cell_queue_append_packed_copy(NULL, &cmux->destroy_cell_queue, 0, &cell, + chan->wide_circ_ids, 0); + + /* Destroy entering the queue, update counters */ + ++(cmux->destroy_ctr); + ++global_destroy_ctr; + log_debug(LD_CIRC, + "Cmux at %p queued a destroy for circ %u, cmux counter is now " + I64_FORMAT", global counter is now "I64_FORMAT, + cmux, circ_id, + I64_PRINTF_ARG(cmux->destroy_ctr), + I64_PRINTF_ARG(global_destroy_ctr)); + + /* XXXX Duplicate code from append_cell_to_circuit_queue */ + if (!channel_has_queued_writes(chan)) { + /* There is no data at all waiting to be sent on the outbuf. Add a + * cell, so that we can notice when it gets flushed, flushed_some can + * get called, and we can start putting more data onto the buffer then. + */ + log_debug(LD_GENERAL, "Primed a buffer."); + channel_flush_from_first_active_circuit(chan, 1); + } +} + +/*DOCDOC; for debugging 12184. This runs slowly. */ +int64_t +circuitmux_count_queued_destroy_cells(const channel_t *chan, + const circuitmux_t *cmux) +{ + int64_t n_destroy_cells = cmux->destroy_ctr; + int64_t destroy_queue_size = cmux->destroy_cell_queue.n; + + int64_t manual_total = 0; + int64_t manual_total_in_map = 0; + packed_cell_t *cell; + + TOR_SIMPLEQ_FOREACH(cell, &cmux->destroy_cell_queue.head, next) { + circid_t id; + ++manual_total; + + id = packed_cell_get_circid(cell, chan->wide_circ_ids); + if (circuit_id_in_use_on_channel(id, (channel_t*)chan)) + ++manual_total_in_map; + } + + if (n_destroy_cells != destroy_queue_size || + n_destroy_cells != manual_total || + n_destroy_cells != manual_total_in_map) { + log_warn(LD_BUG, " Discrepancy in counts for queued destroy cells on " + "circuitmux. n="I64_FORMAT". queue_size="I64_FORMAT". " + "manual_total="I64_FORMAT". manual_total_in_map="I64_FORMAT".", + I64_PRINTF_ARG(n_destroy_cells), + I64_PRINTF_ARG(destroy_queue_size), + I64_PRINTF_ARG(manual_total), + I64_PRINTF_ARG(manual_total_in_map)); + } + + return n_destroy_cells; +} + +/** + * Compare cmuxes to see which is more preferred; return < 0 if + * cmux_1 has higher priority (i.e., cmux_1 < cmux_2 in the scheduler's + * sort order), > 0 if cmux_2 has higher priority, or 0 if they are + * equally preferred. + * + * If the cmuxes have different cmux policies or the policy does not + * support the cmp_cmux method, return 0. + */ + +MOCK_IMPL(int, +circuitmux_compare_muxes, (circuitmux_t *cmux_1, circuitmux_t *cmux_2)) +{ + const circuitmux_policy_t *policy; + + tor_assert(cmux_1); + tor_assert(cmux_2); + + if (cmux_1 == cmux_2) { + /* Equivalent because they're the same cmux */ + return 0; + } + + if (cmux_1->policy && cmux_2->policy) { + if (cmux_1->policy == cmux_2->policy) { + policy = cmux_1->policy; + + if (policy->cmp_cmux) { + /* Okay, we can compare! */ + return policy->cmp_cmux(cmux_1, cmux_1->policy_data, + cmux_2, cmux_2->policy_data); + } else { + /* + * Equivalent because the policy doesn't know how to compare between + * muxes. + */ + return 0; + } + } else { + /* Equivalent because they have different policies */ + return 0; + } + } else { + /* Equivalent because one or both are missing a policy */ + return 0; + } +} + |