diff options
Diffstat (limited to 'src/or/relay.c')
-rw-r--r-- | src/or/relay.c | 751 |
1 files changed, 682 insertions, 69 deletions
diff --git a/src/or/relay.c b/src/or/relay.c index 02671473e6..c951cab560 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -10,8 +10,26 @@ * receiving from circuits, plus queuing on circuits. **/ +#include <math.h> #include "or.h" +#include "buffers.h" +#include "circuitbuild.h" +#include "circuitlist.h" +#include "config.h" +#include "connection.h" +#include "connection_edge.h" +#include "connection_or.h" +#include "control.h" +#include "geoip.h" +#include "main.h" #include "mempool.h" +#include "networkstatus.h" +#include "policies.h" +#include "reasons.h" +#include "relay.h" +#include "rendcommon.h" +#include "routerlist.h" +#include "routerparse.h" static int relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, @@ -20,20 +38,46 @@ static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, crypt_path_t *layer_hint); -static int -connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, - edge_connection_t *conn, - crypt_path_t *layer_hint); -static void -circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint); +static int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, + edge_connection_t *conn, + crypt_path_t *layer_hint); +static void circuit_consider_sending_sendme(circuit_t *circ, + crypt_path_t *layer_hint); +static void circuit_resume_edge_reading(circuit_t *circ, + crypt_path_t *layer_hint); +static int circuit_resume_edge_reading_helper(edge_connection_t *conn, + circuit_t *circ, + crypt_path_t *layer_hint); +static int circuit_consider_stop_edge_reading(circuit_t *circ, + crypt_path_t *layer_hint); +static int circuit_queue_streams_are_blocked(circuit_t *circ); + +/** Cache the current hi-res time; the cache gets reset when libevent + * calls us. */ + +static struct timeval cached_time_hires = {0, 0}; + +/** Stop reading on edge connections when we have this many cells + * waiting on the appropriate queue. */ +#define CELL_QUEUE_HIGHWATER_SIZE 256 +/** Start reading from edge connections again when we get down to this many + * cells. */ +#define CELL_QUEUE_LOWWATER_SIZE 64 + static void -circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint); -static int -circuit_resume_edge_reading_helper(edge_connection_t *conn, - circuit_t *circ, - crypt_path_t *layer_hint); -static int -circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint); +tor_gettimeofday_cached(struct timeval *tv) +{ + if (cached_time_hires.tv_sec == 0) { + tor_gettimeofday(&cached_time_hires); + } + *tv = cached_time_hires; +} + +void +tor_gettimeofday_cache_clear(void) +{ + cached_time_hires.tv_sec = 0; +} /** Stats: how many relay cells have originated at this hop, or have * been relayed onward (not recognized at this hop)? @@ -230,7 +274,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ, * we might kill the circ before we relay * the cells. */ - append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction); + append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0); return 0; } @@ -327,7 +371,7 @@ relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, static int circuit_package_relay_cell(cell_t *cell, circuit_t *circ, cell_direction_t cell_direction, - crypt_path_t *layer_hint) + crypt_path_t *layer_hint, streamid_t on_stream) { or_connection_t *conn; /* where to send the cell */ @@ -371,7 +415,7 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ, } ++stats_n_relay_cells_relayed; - append_cell_to_circuit_queue(circ, conn, cell, cell_direction); + append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream); return 0; } @@ -496,7 +540,7 @@ relay_command_to_string(uint8_t command) * return 0. */ int -relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ, +relay_send_command_from_edge(streamid_t stream_id, circuit_t *circ, uint8_t relay_command, const char *payload, size_t payload_len, crypt_path_t *cpath_layer) { @@ -531,6 +575,12 @@ relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ, log_debug(LD_OR,"delivering %d cell %s.", relay_command, cell_direction == CELL_DIRECTION_OUT ? "forward" : "backward"); + /* If we are sending an END cell and this circuit is used for a tunneled + * directory request, advance its state. */ + if (relay_command == RELAY_COMMAND_END && circ->dirreq_id) + geoip_change_dirreq_state(circ->dirreq_id, DIRREQ_TUNNELED, + DIRREQ_END_CELL_SENT); + if (cell_direction == CELL_DIRECTION_OUT && circ->n_conn) { /* if we're using relaybandwidthrate, this conn wants priority */ circ->n_conn->client_used = approx_time(); @@ -540,17 +590,11 @@ relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ, origin_circuit_t *origin_circ = TO_ORIGIN_CIRCUIT(circ); if (origin_circ->remaining_relay_early_cells > 0 && (relay_command == RELAY_COMMAND_EXTEND || - (cpath_layer != origin_circ->cpath && - !CIRCUIT_PURPOSE_IS_ESTABLISHED_REND(circ->purpose)))) { - /* If we've got any relay_early cells left, and we're sending - * an extend cell or (we're not talking to the first hop and we're - * not talking to a rendezvous circuit), use one of them. - * Don't worry about the conn protocol version: + cpath_layer != origin_circ->cpath)) { + /* If we've got any relay_early cells left and (we're sending + * an extend cell or we're not talking to the first hop), use + * one of them. Don't worry about the conn protocol version: * append_cell_to_circuit_queue will fix it up. */ - /* XXX For now, clients don't use RELAY_EARLY cells when sending - * relay cells on rendezvous circuits. See bug 1038. Eventually, - * we can take this behavior away in favor of having clients avoid - * rendezvous points running 0.2.1.3-alpha through 0.2.1.18. -RD */ cell.command = CELL_RELAY_EARLY; --origin_circ->remaining_relay_early_cells; log_debug(LD_OR, "Sending a RELAY_EARLY cell; %d remaining.", @@ -578,8 +622,8 @@ relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ, } } - if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer) - < 0) { + if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer, + stream_id) < 0) { log_warn(LD_BUG,"circuit_package_relay_cell failed. Closing."); circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL); return -1; @@ -901,7 +945,7 @@ connection_edge_process_relay_cell_not_open( } /* handle anything that might have queued */ - if (connection_edge_package_raw_inbuf(conn, 1) < 0) { + if (connection_edge_package_raw_inbuf(conn, 1, NULL) < 0) { /* (We already sent an end cell if possible) */ connection_mark_for_close(TO_CONN(conn)); return 0; @@ -999,7 +1043,8 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, relay_header_unpack(&rh, cell->payload); // log_fn(LOG_DEBUG,"command %d stream %d", rh.command, rh.stream_id); num_seen++; - log_debug(domain, "Now seen %d relay cells here.", num_seen); + log_debug(domain, "Now seen %d relay cells here (command %d, stream %d).", + num_seen, rh.command, rh.stream_id); if (rh.length > RELAY_PAYLOAD_SIZE) { log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, @@ -1038,6 +1083,16 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, "Begin cell for known stream. Dropping."); return 0; } + if (rh.command == RELAY_COMMAND_BEGIN_DIR) { + /* Assign this circuit and its app-ward OR connection a unique ID, + * so that we can measure download times. The local edge and dir + * connection will be assigned the same ID when they are created + * and linked. */ + static uint64_t next_id = 0; + circ->dirreq_id = ++next_id; + TO_CONN(TO_OR_CIRCUIT(circ)->p_conn)->dirreq_id = circ->dirreq_id; + } + return connection_exit_begin_conn(cell, circ); case RELAY_COMMAND_DATA: ++stats_n_data_cells_received; @@ -1131,6 +1186,7 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, } if (circ->n_conn) { uint8_t trunc_reason = *(uint8_t*)(cell->payload + RELAY_HEADER_SIZE); + circuit_clear_cell_queue(circ, circ->n_conn); connection_or_send_destroy(circ->n_circ_id, circ->n_conn, trunc_reason); circuit_set_n_circid_orconn(circ, 0, NULL); @@ -1179,9 +1235,13 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, conn->package_window += STREAMWINDOW_INCREMENT; log_debug(domain,"stream-level sendme, packagewindow now %d.", conn->package_window); + if (circuit_queue_streams_are_blocked(circ)) { + /* Still waiting for queue to flush; don't touch conn */ + return 0; + } connection_start_reading(TO_CONN(conn)); /* handle whatever might still be on the inbuf */ - if (connection_edge_package_raw_inbuf(conn, 1) < 0) { + if (connection_edge_package_raw_inbuf(conn, 1, NULL) < 0) { /* (We already sent an end cell if possible) */ connection_mark_for_close(TO_CONN(conn)); return 0; @@ -1247,15 +1307,19 @@ uint64_t stats_n_data_cells_received = 0; * ever received were completely full of data. */ uint64_t stats_n_data_bytes_received = 0; -/** While conn->inbuf has an entire relay payload of bytes on it, - * and the appropriate package windows aren't empty, grab a cell - * and send it down the circuit. +/** If <b>conn</b> has an entire relay payload of bytes on its inbuf (or + * <b>package_partial</b> is true), and the appropriate package windows aren't + * empty, grab a cell and send it down the circuit. + * + * If *<b>max_cells</b> is given, package no more than max_cells. Decrement + * *<b>max_cells</b> by the number of cells packaged. * * Return -1 (and send a RELAY_COMMAND_END cell if necessary) if conn should * be marked for close, else return 0. */ int -connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial) +connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, + int *max_cells) { size_t amount_to_process, length; char payload[CELL_PAYLOAD_SIZE]; @@ -1271,7 +1335,10 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial) return 0; } -repeat_connection_edge_package_raw_inbuf: + if (max_cells && *max_cells <= 0) + return 0; + + repeat_connection_edge_package_raw_inbuf: circ = circuit_get_by_edge_conn(conn); if (!circ) { @@ -1332,6 +1399,12 @@ repeat_connection_edge_package_raw_inbuf: } log_debug(domain,"conn->package_window is now %d",conn->package_window); + if (max_cells) { + *max_cells -= 1; + if (*max_cells <= 0) + return 0; + } + /* handle more if there's more, or return 0 if there isn't */ goto repeat_connection_edge_package_raw_inbuf; } @@ -1379,7 +1452,10 @@ connection_edge_consider_sending_sendme(edge_connection_t *conn) static void circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) { - + if (circuit_queue_streams_are_blocked(circ)) { + log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming"); + return; + } log_debug(layer_hint?LD_APP:LD_EXIT,"resuming"); if (CIRCUIT_IS_ORIGIN(circ)) @@ -1395,31 +1471,136 @@ circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) * of a linked list of edge streams that should each be considered. */ static int -circuit_resume_edge_reading_helper(edge_connection_t *conn, +circuit_resume_edge_reading_helper(edge_connection_t *first_conn, circuit_t *circ, crypt_path_t *layer_hint) { - for ( ; conn; conn=conn->next_stream) { - if (conn->_base.marked_for_close) + edge_connection_t *conn; + int n_packaging_streams, n_streams_left; + int packaged_this_round; + int cells_on_queue; + int cells_per_conn; + edge_connection_t *chosen_stream = NULL; + + /* How many cells do we have space for? It will be the minimum of + * the number needed to exhaust the package window, and the minimum + * needed to fill the cell queue. */ + int max_to_package = circ->package_window; + if (CIRCUIT_IS_ORIGIN(circ)) { + cells_on_queue = circ->n_conn_cells.n; + } else { + or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); + cells_on_queue = or_circ->p_conn_cells.n; + } + if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package) + max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue; + + /* Once we used to start listening on the streams in the order they + * appeared in the linked list. That leads to starvation on the + * streams that appeared later on the list, since the first streams + * would always get to read first. Instead, we just pick a random + * stream on the list, and enable reading for streams starting at that + * point (and wrapping around as if the list were circular). It would + * probably be better to actually remember which streams we've + * serviced in the past, but this is simple and effective. */ + + /* Select a stream uniformly at random from the linked list. We + * don't need cryptographic randomness here. */ + { + int num_streams = 0; + for (conn = first_conn; conn; conn = conn->next_stream) { + num_streams++; + if ((tor_weak_random() % num_streams)==0) + chosen_stream = conn; + /* Invariant: chosen_stream has been chosen uniformly at random from + * among the first num_streams streams on first_conn. */ + } + } + + /* Count how many non-marked streams there are that have anything on + * their inbuf, and enable reading on all of the connections. */ + n_packaging_streams = 0; + /* Activate reading starting from the chosen stream */ + for (conn=chosen_stream; conn; conn = conn->next_stream) { + /* Start reading for the streams starting from here */ + if (conn->_base.marked_for_close || conn->package_window <= 0) continue; - if ((!layer_hint && conn->package_window > 0) || - (layer_hint && conn->package_window > 0 && - conn->cpath_layer == layer_hint)) { + if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); + + if (buf_datalen(conn->_base.inbuf) > 0) + ++n_packaging_streams; + } + } + /* Go back and do the ones we skipped, circular-style */ + for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) { + if (conn->_base.marked_for_close || conn->package_window <= 0) + continue; + if (!layer_hint || conn->cpath_layer == layer_hint) { + connection_start_reading(TO_CONN(conn)); + + if (buf_datalen(conn->_base.inbuf) > 0) + ++n_packaging_streams; + } + } + + if (n_packaging_streams == 0) /* avoid divide-by-zero */ + return 0; + + again: + + cells_per_conn = CEIL_DIV(max_to_package, n_packaging_streams); + + packaged_this_round = 0; + n_streams_left = 0; + + /* Iterate over all connections. Package up to cells_per_conn cells on + * each. Update packaged_this_round with the total number of cells + * packaged, and n_streams_left with the number that still have data to + * package. + */ + for (conn=first_conn; conn; conn=conn->next_stream) { + if (conn->_base.marked_for_close || conn->package_window <= 0) + continue; + if (!layer_hint || conn->cpath_layer == layer_hint) { + int n = cells_per_conn, r; /* handle whatever might still be on the inbuf */ - if (connection_edge_package_raw_inbuf(conn, 1)<0) { - /* (We already sent an end cell if possible) */ + r = connection_edge_package_raw_inbuf(conn, 1, &n); + + /* Note how many we packaged */ + packaged_this_round += (cells_per_conn-n); + + if (r<0) { + /* Problem while packaging. (We already sent an end cell if + * possible) */ connection_mark_for_close(TO_CONN(conn)); continue; } + /* If there's still data to read, we'll be coming back to this stream. */ + if (buf_datalen(conn->_base.inbuf)) + ++n_streams_left; + /* If the circuit won't accept any more data, return without looking * at any more of the streams. Any connections that should be stopped * have already been stopped by connection_edge_package_raw_inbuf. */ if (circuit_consider_stop_edge_reading(circ, layer_hint)) return -1; + /* XXXX should we also stop immediately if we fill up the cell queue? + * Probably. */ } } + + /* If we made progress, and we are willing to package more, and there are + * any streams left that want to package stuff... try again! + */ + if (packaged_this_round && packaged_this_round < max_to_package && + n_streams_left) { + max_to_package -= packaged_this_round; + n_packaging_streams = n_streams_left; + goto again; + } + return 0; } @@ -1488,13 +1669,6 @@ circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint) } } -/** Stop reading on edge connections when we have this many cells - * waiting on the appropriate queue. */ -#define CELL_QUEUE_HIGHWATER_SIZE 256 -/** Start reading from edge connections again when we get down to this many - * cells. */ -#define CELL_QUEUE_LOWWATER_SIZE 64 - #ifdef ACTIVE_CIRCUITS_PARANOIA #define assert_active_circuits_ok_paranoid(conn) \ assert_active_circuits_ok(conn) @@ -1508,6 +1682,10 @@ static int total_cells_allocated = 0; /** A memory pool to allocate packed_cell_t objects. */ static mp_pool_t *cell_pool = NULL; +/** Memory pool to allocate insertion_time_elem_t objects used for cell + * statistics. */ +static mp_pool_t *it_pool = NULL; + /** Allocate structures to hold cells. */ void init_cell_pool(void) @@ -1516,7 +1694,8 @@ init_cell_pool(void) cell_pool = mp_pool_new(sizeof(packed_cell_t), 128*1024); } -/** Free all storage used to hold cells. */ +/** Free all storage used to hold cells (and insertion times if we measure + * cell statistics). */ void free_cell_pool(void) { @@ -1525,6 +1704,10 @@ free_cell_pool(void) mp_pool_destroy(cell_pool); cell_pool = NULL; } + if (it_pool) { + mp_pool_destroy(it_pool); + it_pool = NULL; + } } /** Free excess storage in cell pool. */ @@ -1537,7 +1720,7 @@ clean_cell_pool(void) /** Release storage held by <b>cell</b>. */ static INLINE void -packed_cell_free(packed_cell_t *cell) +packed_cell_free_unchecked(packed_cell_t *cell) { --total_cells_allocated; mp_pool_release(cell); @@ -1599,7 +1782,38 @@ cell_queue_append(cell_queue_t *queue, packed_cell_t *cell) void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell) { - cell_queue_append(queue, packed_cell_copy(cell)); + packed_cell_t *copy = packed_cell_copy(cell); + /* Remember the time when this cell was put in the queue. */ + if (get_options()->CellStatistics) { + struct timeval now; + uint32_t added; + insertion_time_queue_t *it_queue = queue->insertion_times; + if (!it_pool) + it_pool = mp_pool_new(sizeof(insertion_time_elem_t), 1024); + tor_gettimeofday_cached(&now); +#define SECONDS_IN_A_DAY 86400L + added = (uint32_t)(((now.tv_sec % SECONDS_IN_A_DAY) * 100L) + + ((uint32_t)now.tv_usec / (uint32_t)10000L)); + if (!it_queue) { + it_queue = tor_malloc_zero(sizeof(insertion_time_queue_t)); + queue->insertion_times = it_queue; + } + if (it_queue->last && it_queue->last->insertion_time == added) { + it_queue->last->counter++; + } else { + insertion_time_elem_t *elem = mp_pool_get(it_pool); + elem->next = NULL; + elem->insertion_time = added; + elem->counter = 1; + if (it_queue->last) { + it_queue->last->next = elem; + it_queue->last = elem; + } else { + it_queue->first = it_queue->last = elem; + } + } + } + cell_queue_append(queue, copy); } /** Remove and free every cell in <b>queue</b>. */ @@ -1610,11 +1824,19 @@ cell_queue_clear(cell_queue_t *queue) cell = queue->head; while (cell) { next = cell->next; - packed_cell_free(cell); + packed_cell_free_unchecked(cell); cell = next; } queue->head = queue->tail = NULL; queue->n = 0; + if (queue->insertion_times) { + while (queue->insertion_times->first) { + insertion_time_elem_t *elem = queue->insertion_times->first; + queue->insertion_times->first = elem->next; + mp_pool_release(elem); + } + tor_free(queue->insertion_times); + } } /** Extract and return the cell at the head of <b>queue</b>; return NULL if @@ -1666,8 +1888,226 @@ prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn) } } +/** Helper for sorting cell_ewma_t values in their priority queue. */ +static int +compare_cell_ewma_counts(const void *p1, const void *p2) +{ + const cell_ewma_t *e1=p1, *e2=p2; + if (e1->cell_count < e2->cell_count) + return -1; + else if (e1->cell_count > e2->cell_count) + return 1; + else + return 0; +} + +/** Given a cell_ewma_t, return a pointer to the circuit containing it. */ +static circuit_t * +cell_ewma_to_circuit(cell_ewma_t *ewma) +{ + if (ewma->is_for_p_conn) { + /* This is an or_circuit_t's p_cell_ewma. */ + or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma); + return TO_CIRCUIT(orcirc); + } else { + /* This is some circuit's n_cell_ewma. */ + return SUBTYPE_P(ewma, circuit_t, n_cell_ewma); + } +} + +/* ==== Functions for scaling cell_ewma_t ==== + + When choosing which cells to relay first, we favor circuits that have been + quiet recently. This gives better latency on connections that aren't + pushing lots of data, and makes the network feel more interactive. + + Conceptually, we take an exponentially weighted mean average of the number + of cells a circuit has sent, and allow active circuits (those with cells to + relay) to send cells in reverse order of their exponentially-weighted mean + average (EWMA) cell count. [That is, a cell sent N seconds ago 'counts' + F^N times as much as a cell sent now, for 0<F<1.0, and we favor the + circuit that has sent the fewest cells] + + If 'double' had infinite precision, we could do this simply by counting a + cell sent at startup as having weight 1.0, and a cell sent N seconds later + as having weight F^-N. This way, we would never need to re-scale + any already-sent cells. + + To prevent double from overflowing, we could count a cell sent now as + having weight 1.0 and a cell sent N seconds ago as having weight F^N. + This, however, would mean we'd need to re-scale *ALL* old circuits every + time we wanted to send a cell. + + So as a compromise, we divide time into 'ticks' (currently, 10-second + increments) and say that a cell sent at the start of a current tick is + worth 1.0, a cell sent N seconds before the start of the current tick is + worth F^N, and a cell sent N seconds after the start of the current tick is + worth F^-N. This way we don't overflow, and we don't need to constantly + rescale. + */ + +/** How long does a tick last (seconds)? */ +#define EWMA_TICK_LEN 10 + +/** The default per-tick scale factor, if it hasn't been overridden by a + * consensus or a configuration setting. zero means "disabled". */ +#define EWMA_DEFAULT_HALFLIFE 0.0 + +/** Given a timeval <b>now</b>, compute the cell_ewma tick in which it occurs + * and the fraction of the tick that has elapsed between the start of the tick + * and <b>now</b>. Return the former and store the latter in + * *<b>remainder_out</b>. + * + * These tick values are not meant to be shared between Tor instances, or used + * for other purposes. */ +static unsigned +cell_ewma_tick_from_timeval(const struct timeval *now, + double *remainder_out) +{ + unsigned res = (unsigned) (now->tv_sec / EWMA_TICK_LEN); + /* rem */ + double rem = (now->tv_sec % EWMA_TICK_LEN) + + ((double)(now->tv_usec)) / 1.0e6; + *remainder_out = rem / EWMA_TICK_LEN; + return res; +} + +/** Compute and return the current cell_ewma tick. */ +unsigned +cell_ewma_get_tick(void) +{ + return ((unsigned)approx_time() / EWMA_TICK_LEN); +} + +/** The per-tick scale factor to be used when computing cell-count EWMA + * values. (A cell sent N ticks before the start of the current tick + * has value ewma_scale_factor ** N.) + */ +static double ewma_scale_factor = 0.1; +static int ewma_enabled = 0; + +#define EPSILON 0.00001 +#define LOG_ONEHALF -0.69314718055994529 + +/** Adjust the global cell scale factor based on <b>options</b> */ +void +cell_ewma_set_scale_factor(or_options_t *options, networkstatus_t *consensus) +{ + int32_t halflife_ms; + double halflife; + const char *source; + if (options && options->CircuitPriorityHalflife >= -EPSILON) { + halflife = options->CircuitPriorityHalflife; + source = "CircuitPriorityHalflife in configuration"; + } else if (consensus && + (halflife_ms = networkstatus_get_param( + consensus, "CircuitPriorityHalflifeMsec", -1)) >= 0) { + halflife = ((double)halflife_ms)/1000.0; + source = "CircuitPriorityHalflifeMsec in consensus"; + } else { + halflife = EWMA_DEFAULT_HALFLIFE; + source = "Default value"; + } + + if (halflife <= EPSILON) { + /* The cell EWMA algorithm is disabled. */ + ewma_scale_factor = 0.1; + ewma_enabled = 0; + log_info(LD_OR, + "Disabled cell_ewma algorithm because of value in %s", + source); + } else { + /* convert halflife into halflife-per-tick. */ + halflife /= EWMA_TICK_LEN; + /* compute per-tick scale factor. */ + ewma_scale_factor = exp( LOG_ONEHALF / halflife ); + ewma_enabled = 1; + log_info(LD_OR, + "Enabled cell_ewma algorithm because of value in %s; " + "scale factor is %lf per %d seconds", + source, ewma_scale_factor, EWMA_TICK_LEN); + } +} + +/** Return the multiplier necessary to convert the value of a cell sent in + * 'from_tick' to one sent in 'to_tick'. */ +static INLINE double +get_scale_factor(unsigned from_tick, unsigned to_tick) +{ + /* This math can wrap around, but that's okay: unsigned overflow is + well-defined */ + int diff = (int)(to_tick - from_tick); + return pow(ewma_scale_factor, diff); +} + +/** Adjust the cell count of <b>ewma</b> so that it is scaled with respect to + * <b>cur_tick</b> */ +static void +scale_single_cell_ewma(cell_ewma_t *ewma, unsigned cur_tick) +{ + double factor = get_scale_factor(ewma->last_adjusted_tick, cur_tick); + ewma->cell_count *= factor; + ewma->last_adjusted_tick = cur_tick; +} + +/** Adjust the cell count of every active circuit on <b>conn</b> so + * that they are scaled with respect to <b>cur_tick</b> */ +static void +scale_active_circuits(or_connection_t *conn, unsigned cur_tick) +{ + + double factor = get_scale_factor( + conn->active_circuit_pqueue_last_recalibrated, + cur_tick); + /** Ordinarily it isn't okay to change the value of an element in a heap, + * but it's okay here, since we are preserving the order. */ + SMARTLIST_FOREACH(conn->active_circuit_pqueue, cell_ewma_t *, e, { + tor_assert(e->last_adjusted_tick == + conn->active_circuit_pqueue_last_recalibrated); + e->cell_count *= factor; + e->last_adjusted_tick = cur_tick; + }); + conn->active_circuit_pqueue_last_recalibrated = cur_tick; +} + +/** Rescale <b>ewma</b> to the same scale as <b>conn</b>, and add it to + * <b>conn</b>'s priority queue of active circuits */ +static void +add_cell_ewma_to_conn(or_connection_t *conn, cell_ewma_t *ewma) +{ + tor_assert(ewma->heap_index == -1); + scale_single_cell_ewma(ewma, + conn->active_circuit_pqueue_last_recalibrated); + + smartlist_pqueue_add(conn->active_circuit_pqueue, + compare_cell_ewma_counts, + STRUCT_OFFSET(cell_ewma_t, heap_index), + ewma); +} + +/** Remove <b>ewma</b> from <b>conn</b>'s priority queue of active circuits */ +static void +remove_cell_ewma_from_conn(or_connection_t *conn, cell_ewma_t *ewma) +{ + tor_assert(ewma->heap_index != -1); + smartlist_pqueue_remove(conn->active_circuit_pqueue, + compare_cell_ewma_counts, + STRUCT_OFFSET(cell_ewma_t, heap_index), + ewma); +} + +/** Remove and return the first cell_ewma_t from conn's priority queue of + * active circuits. Requires that the priority queue is nonempty. */ +static cell_ewma_t * +pop_first_cell_ewma_from_conn(or_connection_t *conn) +{ + return smartlist_pqueue_pop(conn->active_circuit_pqueue, + compare_cell_ewma_counts, + STRUCT_OFFSET(cell_ewma_t, heap_index)); +} + /** Add <b>circ</b> to the list of circuits with pending cells on - * <b>conn</b>. No effect if <b>circ</b> is already unlinked. */ + * <b>conn</b>. No effect if <b>circ</b> is already linked. */ void make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn) { @@ -1679,6 +2119,8 @@ make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn) return; } + assert_active_circuits_ok_paranoid(conn); + if (! conn->active_circuits) { conn->active_circuits = circ; *prevp = *nextp = circ; @@ -1690,10 +2132,19 @@ make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn) *prev_circ_on_conn_p(head, conn) = circ; *prevp = old_tail; } + + if (circ->n_conn == conn) { + add_cell_ewma_to_conn(conn, &circ->n_cell_ewma); + } else { + or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); + tor_assert(conn == orcirc->p_conn); + add_cell_ewma_to_conn(conn, &orcirc->p_cell_ewma); + } + assert_active_circuits_ok_paranoid(conn); } -/** Remove <b>circ</b> to the list of circuits with pending cells on +/** Remove <b>circ</b> from the list of circuits with pending cells on * <b>conn</b>. No effect if <b>circ</b> is already unlinked. */ void make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn) @@ -1707,6 +2158,8 @@ make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn) return; } + assert_active_circuits_ok_paranoid(conn); + tor_assert(next && prev); tor_assert(*prev_circ_on_conn_p(next, conn) == circ); tor_assert(*next_circ_on_conn_p(prev, conn) == circ); @@ -1720,6 +2173,15 @@ make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn) conn->active_circuits = next; } *prevp = *nextp = NULL; + + if (circ->n_conn == conn) { + remove_cell_ewma_from_conn(conn, &circ->n_cell_ewma); + } else { + or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); + tor_assert(conn == orcirc->p_conn); + remove_cell_ewma_from_conn(conn, &orcirc->p_cell_ewma); + } + assert_active_circuits_ok_paranoid(conn); } @@ -1739,16 +2201,27 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn) cur = next; } while (cur != head); orconn->active_circuits = NULL; + + SMARTLIST_FOREACH(orconn->active_circuit_pqueue, cell_ewma_t *, e, + e->heap_index = -1); + smartlist_clear(orconn->active_circuit_pqueue); } /** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false) * every edge connection that is using <b>circ</b> to write to <b>orconn</b>, - * and start or stop reading as appropriate. */ -static void + * and start or stop reading as appropriate. + * + * If <b>stream_id</b> is nonzero, block only the edge connection whose + * stream_id matches it. + * + * Returns the number of streams whose status we changed. + */ +static int set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, - int block) + int block, streamid_t stream_id) { edge_connection_t *edge = NULL; + int n = 0; if (circ->n_conn == orconn) { circ->streams_blocked_on_n_conn = block; if (CIRCUIT_IS_ORIGIN(circ)) @@ -1761,7 +2234,13 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, for (; edge; edge = edge->next_stream) { connection_t *conn = TO_CONN(edge); - edge->edge_blocked_on_circ = block; + if (stream_id && edge->stream_id != stream_id) + continue; + + if (edge->edge_blocked_on_circ != block) { + ++n; + edge->edge_blocked_on_circ = block; + } if (!conn->read_event) { /* This connection is a placeholder for something; probably a DNS @@ -1778,10 +2257,12 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, connection_start_reading(conn); } } + + return n; } /** Pull as many cells as possible (but no more than <b>max</b>) from the - * queue of the first active circuit on <b>conn</b>, and write then to + * queue of the first active circuit on <b>conn</b>, and write them to * <b>conn</b>->outbuf. Return the number of cells written. Advance * the active circuit pointer to the next active circuit in the ring. */ int @@ -1792,9 +2273,35 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, cell_queue_t *queue; circuit_t *circ; int streams_blocked; + + /* The current (hi-res) time */ + struct timeval now_hires; + + /* The EWMA cell counter for the circuit we're flushing. */ + cell_ewma_t *cell_ewma = NULL; + double ewma_increment = -1; + circ = conn->active_circuits; if (!circ) return 0; assert_active_circuits_ok_paranoid(conn); + + /* See if we're doing the ewma circuit selection algorithm. */ + if (ewma_enabled) { + unsigned tick; + double fractional_tick; + tor_gettimeofday_cached(&now_hires); + tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick); + + if (tick != conn->active_circuit_pqueue_last_recalibrated) { + scale_active_circuits(conn, tick); + } + + ewma_increment = pow(ewma_scale_factor, -fractional_tick); + + cell_ewma = smartlist_get(conn->active_circuit_pqueue, 0); + circ = cell_ewma_to_circuit(cell_ewma); + } + if (circ->n_conn == conn) { queue = &circ->n_conn_cells; streams_blocked = circ->streams_blocked_on_n_conn; @@ -1808,10 +2315,60 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, packed_cell_t *cell = cell_queue_pop(queue); tor_assert(*next_circ_on_conn_p(circ,conn)); + /* Calculate the exact time that this cell has spent in the queue. */ + if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) { + struct timeval now; + uint32_t flushed; + uint32_t cell_waiting_time; + insertion_time_queue_t *it_queue = queue->insertion_times; + tor_gettimeofday_cached(&now); + flushed = (uint32_t)((now.tv_sec % SECONDS_IN_A_DAY) * 100L + + (uint32_t)now.tv_usec / (uint32_t)10000L); + if (!it_queue || !it_queue->first) { + log_info(LD_GENERAL, "Cannot determine insertion time of cell. " + "Looks like the CellStatistics option was " + "recently enabled."); + } else { + or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); + insertion_time_elem_t *elem = it_queue->first; + cell_waiting_time = + (uint32_t)((flushed * 10L + SECONDS_IN_A_DAY * 1000L - + elem->insertion_time * 10L) % + (SECONDS_IN_A_DAY * 1000L)); +#undef SECONDS_IN_A_DAY + elem->counter--; + if (elem->counter < 1) { + it_queue->first = elem->next; + if (elem == it_queue->last) + it_queue->last = NULL; + mp_pool_release(elem); + } + orcirc->total_cell_waiting_time += cell_waiting_time; + orcirc->processed_cells++; + } + } + + /* If we just flushed our queue and this circuit is used for a + * tunneled directory request, possibly advance its state. */ + if (queue->n == 0 && TO_CONN(conn)->dirreq_id) + geoip_change_dirreq_state(TO_CONN(conn)->dirreq_id, + DIRREQ_TUNNELED, + DIRREQ_CIRC_QUEUE_FLUSHED); + connection_write_to_buf(cell->body, CELL_NETWORK_SIZE, TO_CONN(conn)); - packed_cell_free(cell); + packed_cell_free_unchecked(cell); ++n_flushed; + if (cell_ewma) { + cell_ewma_t *tmp; + cell_ewma->cell_count += ewma_increment; + /* We pop and re-add the cell_ewma_t here, not above, since we need to + * re-add it immediately to keep the priority queue consistent with + * the linked-list implementation */ + tmp = pop_first_cell_ewma_from_conn(conn); + tor_assert(tmp == cell_ewma); + add_cell_ewma_to_conn(conn, cell_ewma); + } if (circ != conn->active_circuits) { /* If this happens, the current circuit just got made inactive by * a call in connection_write_to_buf(). That's nothing to worry about: @@ -1829,9 +2386,9 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, /* Is the cell queue low enough to unblock all the streams that are waiting * to write to this circuit? */ if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) - set_streams_blocked_on_circ(circ, conn, 0); /* unblock streams */ + set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */ - /* Did we just ran out of cells on this queue? */ + /* Did we just run out of cells on this circuit's queue? */ if (queue->n == 0) { log_debug(LD_GENERAL, "Made a circuit inactive."); make_circuit_inactive_on_conn(circ, conn); @@ -1846,10 +2403,14 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, * transmitting in <b>direction</b>. */ void append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, - cell_t *cell, cell_direction_t direction) + cell_t *cell, cell_direction_t direction, + streamid_t fromstream) { cell_queue_t *queue; int streams_blocked; + if (circ->marked_for_close) + return; + if (direction == CELL_DIRECTION_OUT) { queue = &circ->n_conn_cells; streams_blocked = circ->streams_blocked_on_n_conn; @@ -1868,7 +2429,12 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, /* If we have too many cells on the circuit, we should stop reading from * the edge streams for a while. */ if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE) - set_streams_blocked_on_circ(circ, orconn, 1); /* block streams */ + set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */ + + if (streams_blocked && fromstream) { + /* This edge connection is apparently not blocked; block it. */ + set_streams_blocked_on_circ(circ, orconn, 1, fromstream); + } if (queue->n == 1) { /* This was the first cell added to the queue. We need to make this @@ -1947,6 +2513,25 @@ decode_address_from_payload(tor_addr_t *addr_out, const uint8_t *payload, return payload + 2 + payload[1]; } +/** Remove all the cells queued on <b>circ</b> for <b>orconn</b>. */ +void +circuit_clear_cell_queue(circuit_t *circ, or_connection_t *orconn) +{ + cell_queue_t *queue; + if (circ->n_conn == orconn) { + queue = &circ->n_conn_cells; + } else { + or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); + tor_assert(orcirc->p_conn == orconn); + queue = &orcirc->p_conn_cells; + } + + if (queue->n) + make_circuit_inactive_on_conn(circ,orconn); + + cell_queue_clear(queue); +} + /** Fail with an assert if the active circuits ring on <b>orconn</b> is * corrupt. */ void @@ -1954,16 +2539,44 @@ assert_active_circuits_ok(or_connection_t *orconn) { circuit_t *head = orconn->active_circuits; circuit_t *cur = head; + int n = 0; if (! head) return; do { circuit_t *next = *next_circ_on_conn_p(cur, orconn); circuit_t *prev = *prev_circ_on_conn_p(cur, orconn); + cell_ewma_t *ewma; tor_assert(next); tor_assert(prev); tor_assert(*next_circ_on_conn_p(prev, orconn) == cur); tor_assert(*prev_circ_on_conn_p(next, orconn) == cur); + if (orconn == cur->n_conn) { + ewma = &cur->n_cell_ewma; + tor_assert(!ewma->is_for_p_conn); + } else { + ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma; + tor_assert(ewma->is_for_p_conn); + } + tor_assert(ewma->heap_index != -1); + tor_assert(ewma == smartlist_get(orconn->active_circuit_pqueue, + ewma->heap_index)); + n++; cur = next; } while (cur != head); + + tor_assert(n == smartlist_len(orconn->active_circuit_pqueue)); +} + +/** Return 1 if we shouldn't restart reading on this circuit, even if + * we get a SENDME. Else return 0. +*/ +static int +circuit_queue_streams_are_blocked(circuit_t *circ) +{ + if (CIRCUIT_IS_ORIGIN(circ)) { + return circ->streams_blocked_on_n_conn; + } else { + return circ->streams_blocked_on_p_conn; + } } |