diff options
author | Nick Mathewson <nickm@torproject.org> | 2009-12-18 22:33:02 -0500 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2009-12-18 22:33:02 -0500 |
commit | 05a2473b7f38d6c5cab0c98aec7ab61aec3ac6cc (patch) | |
tree | 84f3e62634ef7f331f44d0c0f5f98afd9bbbaee6 /src | |
parent | 7edae5898416dbd291d9e5e9f75e0b24153e73ea (diff) | |
parent | 5e2eaa37f1984e3de122568062d364283402bdf3 (diff) | |
download | tor-05a2473b7f38d6c5cab0c98aec7ab61aec3ac6cc.tar.gz tor-05a2473b7f38d6c5cab0c98aec7ab61aec3ac6cc.zip |
Merge branch 'ewma'
Diffstat (limited to 'src')
-rw-r--r-- | src/common/container.c | 100 | ||||
-rw-r--r-- | src/common/container.h | 11 | ||||
-rw-r--r-- | src/or/circuitlist.c | 16 | ||||
-rw-r--r-- | src/or/config.c | 6 | ||||
-rw-r--r-- | src/or/connection.c | 31 | ||||
-rw-r--r-- | src/or/dns.c | 10 | ||||
-rw-r--r-- | src/or/networkstatus.c | 1 | ||||
-rw-r--r-- | src/or/or.h | 63 | ||||
-rw-r--r-- | src/or/relay.c | 322 | ||||
-rw-r--r-- | src/test/test_containers.c | 110 |
10 files changed, 618 insertions, 52 deletions
diff --git a/src/common/container.c b/src/common/container.c index 7690b4c0ba..f452a51e42 100644 --- a/src/common/container.c +++ b/src/common/container.c @@ -605,6 +605,38 @@ smartlist_uniq_strings(smartlist_t *sl) /* Heap-based priority queue implementation for O(lg N) insert and remove. * Recall that the heap property is that, for every index I, h[I] < * H[LEFT_CHILD[I]] and h[I] < H[RIGHT_CHILD[I]]. + * + * For us to remove items other than the topmost item, each item must store + * its own index within the heap. When calling the pqueue functions, tell + * them about the offset of the field that stores the index within the item. + * + * Example: + * + * typedef struct timer_t { + * struct timeval tv; + * int heap_index; + * } timer_t; + * + * static int compare(const void *p1, const void *p2) { + * const timer_t *t1 = p1, *t2 = p2; + * if (t1->tv.tv_sec < t2->tv.tv_sec) { + * return -1; + * } else if (t1->tv.tv_sec > t2->tv.tv_sec) { + * return 1; + * } else { + * return t1->tv.tv_usec - t2->tv_usec; + * } + * } + * + * void timer_heap_insert(smartlist_t *heap, timer_t *timer) { + * smartlist_pqueue_add(heap, compare, STRUCT_OFFSET(timer_t, heap_index), + * timer); + * } + * + * void timer_heap_pop(smartlist_t *heap) { + * return smartlist_pqueue_pop(heap, compare, + * STRUCT_OFFSET(timer_t, heap_index)); + * } */ /* For a 1-indexed array, we would use LEFT_CHILD[x] = 2*x and RIGHT_CHILD[x] @@ -616,12 +648,22 @@ smartlist_uniq_strings(smartlist_t *sl) #define RIGHT_CHILD(i) ( 2*(i) + 2 ) #define PARENT(i) ( ((i)-1) / 2 ) +#define IDXP(p) ((int*)STRUCT_VAR_P(p, idx_field_offset)) + +#define UPDATE_IDX(i) do { \ + void *updated = sl->list[i]; \ + *IDXP(updated) = i; \ + } while (0) + +#define IDX_OF_ITEM(p) (*IDXP(p)) + /** Helper. <b>sl</b> may have at most one violation of the heap property: * the item at <b>idx</b> may be greater than one or both of its children. * Restore the heap property. */ static INLINE void smartlist_heapify(smartlist_t *sl, int (*compare)(const void *a, const void *b), + int idx_field_offset, int idx) { while (1) { @@ -644,21 +686,28 @@ smartlist_heapify(smartlist_t *sl, void *tmp = sl->list[idx]; sl->list[idx] = sl->list[best_idx]; sl->list[best_idx] = tmp; + UPDATE_IDX(idx); + UPDATE_IDX(best_idx); idx = best_idx; } } } -/** Insert <b>item</b> into the heap stored in <b>sl</b>, where order - * is determined by <b>compare</b>. */ +/** Insert <b>item</b> into the heap stored in <b>sl</b>, where order is + * determined by <b>compare</b> and the offset of the item in the heap is + * stored in an int-typed field at position <b>idx_field_offset</b> within + * item. + */ void smartlist_pqueue_add(smartlist_t *sl, int (*compare)(const void *a, const void *b), + int idx_field_offset, void *item) { int idx; smartlist_add(sl,item); + UPDATE_IDX(sl->num_used-1); for (idx = sl->num_used - 1; idx; ) { int parent = PARENT(idx); @@ -666,6 +715,8 @@ smartlist_pqueue_add(smartlist_t *sl, void *tmp = sl->list[parent]; sl->list[parent] = sl->list[idx]; sl->list[idx] = tmp; + UPDATE_IDX(parent); + UPDATE_IDX(idx); idx = parent; } else { return; @@ -674,32 +725,63 @@ smartlist_pqueue_add(smartlist_t *sl, } /** Remove and return the top-priority item from the heap stored in <b>sl</b>, - * where order is determined by <b>compare</b>. <b>sl</b> must not be - * empty. */ + * where order is determined by <b>compare</b> and the item's position is + * stored at position <b>idx_field_offset</b> within the item. <b>sl</b> must + * not be empty. */ void * smartlist_pqueue_pop(smartlist_t *sl, - int (*compare)(const void *a, const void *b)) + int (*compare)(const void *a, const void *b), + int idx_field_offset) { void *top; tor_assert(sl->num_used); top = sl->list[0]; + *IDXP(top)=-1; if (--sl->num_used) { sl->list[0] = sl->list[sl->num_used]; - smartlist_heapify(sl, compare, 0); + UPDATE_IDX(0); + smartlist_heapify(sl, compare, idx_field_offset, 0); } return top; } +/** Remove the item <b>item</b> from the heap stored in <b>sl</b>, + * where order is determined by <b>compare</b> and the item's position is + * stored at position <b>idx_field_offset</b> within the item. <b>sl</b> must + * not be empty. */ +void +smartlist_pqueue_remove(smartlist_t *sl, + int (*compare)(const void *a, const void *b), + int idx_field_offset, + void *item) +{ + int idx = IDX_OF_ITEM(item); + tor_assert(idx >= 0); + tor_assert(sl->list[idx] == item); + --sl->num_used; + *IDXP(item) = -1; + if (idx == sl->num_used) { + return; + } else { + sl->list[idx] = sl->list[sl->num_used]; + UPDATE_IDX(idx); + smartlist_heapify(sl, compare, idx_field_offset, idx); + } +} + /** Assert that the heap property is correctly maintained by the heap stored * in <b>sl</b>, where order is determined by <b>compare</b>. */ void smartlist_pqueue_assert_ok(smartlist_t *sl, - int (*compare)(const void *a, const void *b)) + int (*compare)(const void *a, const void *b), + int idx_field_offset) { int i; - for (i = sl->num_used - 1; i > 0; --i) { - tor_assert(compare(sl->list[PARENT(i)], sl->list[i]) <= 0); + for (i = sl->num_used - 1; i >= 0; --i) { + if (i>0) + tor_assert(compare(sl->list[PARENT(i)], sl->list[i]) <= 0); + tor_assert(IDX_OF_ITEM(sl->list[i]) == i); } } diff --git a/src/common/container.h b/src/common/container.h index 41c0c68705..8077d56ebd 100644 --- a/src/common/container.h +++ b/src/common/container.h @@ -118,11 +118,18 @@ int smartlist_bsearch_idx(const smartlist_t *sl, const void *key, void smartlist_pqueue_add(smartlist_t *sl, int (*compare)(const void *a, const void *b), + int idx_field_offset, void *item); void *smartlist_pqueue_pop(smartlist_t *sl, - int (*compare)(const void *a, const void *b)); + int (*compare)(const void *a, const void *b), + int idx_field_offset); +void smartlist_pqueue_remove(smartlist_t *sl, + int (*compare)(const void *a, const void *b), + int idx_field_offset, + void *item); void smartlist_pqueue_assert_ok(smartlist_t *sl, - int (*compare)(const void *a, const void *b)); + int (*compare)(const void *a, const void *b), + int idx_field_offset); #define SPLIT_SKIP_SPACE 0x01 #define SPLIT_IGNORE_BLANK 0x02 diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c index 83cb75ea9f..24732f2aea 100644 --- a/src/or/circuitlist.c +++ b/src/or/circuitlist.c @@ -385,6 +385,12 @@ init_circuit_base(circuit_t *circ) circ->package_window = circuit_initial_package_window(); circ->deliver_window = CIRCWINDOW_START; + /* Initialize the cell_ewma_t structure */ + circ->n_cell_ewma.last_adjusted_tick = cell_ewma_get_tick(); + circ->n_cell_ewma.cell_count = 0.0; + circ->n_cell_ewma.heap_index = -1; + circ->n_cell_ewma.is_for_p_conn = 0; + circuit_add(circ); } @@ -432,6 +438,16 @@ or_circuit_new(circid_t p_circ_id, or_connection_t *p_conn) init_circuit_base(TO_CIRCUIT(circ)); + /* Initialize the cell_ewma_t structure */ + + /* Initialize the cell counts to 0 */ + circ->p_cell_ewma.cell_count = 0.0; + circ->p_cell_ewma.last_adjusted_tick = cell_ewma_get_tick(); + circ->p_cell_ewma.is_for_p_conn = 1; + + /* It's not in any heap yet. */ + circ->p_cell_ewma.heap_index = -1; + return circ; } diff --git a/src/or/config.c b/src/or/config.c index 8a207230d4..0c5da96dae 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -167,6 +167,7 @@ static config_var_t _option_vars[] = { V(CircuitBuildTimeout, INTERVAL, "0"), V(CircuitIdleTimeout, INTERVAL, "1 hour"), V(CircuitStreamTimeout, INTERVAL, "0"), + V(CircuitPriorityHalflife, DOUBLE, "-100.0"), /*negative:'Use default'*/ V(ClientDNSRejectInternalAddresses, BOOL,"1"), V(ClientOnly, BOOL, "0"), V(ConsensusParams, STRING, NULL), @@ -212,6 +213,7 @@ static config_var_t _option_vars[] = { V(ExitPolicyRejectPrivate, BOOL, "1"), V(ExitPortStatistics, BOOL, "0"), V(ExtraInfoStatistics, BOOL, "0"), + V(FallbackNetworkstatusFile, FILENAME, SHARE_DATADIR PATH_SEPARATOR "tor" PATH_SEPARATOR "fallback-consensus"), V(FascistFirewall, BOOL, "0"), @@ -355,6 +357,7 @@ static config_var_t _option_vars[] = { VAR("__HashedControlSessionPassword", LINELIST, HashedControlSessionPassword, NULL), V(MinUptimeHidServDirectoryV2, INTERVAL, "24 hours"), + { NULL, CONFIG_TYPE_OBSOLETE, 0, NULL } }; @@ -1406,6 +1409,9 @@ options_act(or_options_t *old_options) if (accounting_is_enabled(options)) configure_accounting(time(NULL)); + /* Change the cell EWMA settings */ + cell_ewma_set_scale_factor(options, networkstatus_get_latest_consensus()); + /* Check for transitions that need action. */ if (old_options) { if (options->UseEntryGuards && !old_options->UseEntryGuards) { diff --git a/src/or/connection.c b/src/or/connection.c index 8de7ad9e5a..9104f1bddd 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -180,6 +180,9 @@ or_connection_new(int socket_family) or_conn->timestamp_last_added_nonpadding = time(NULL); or_conn->next_circ_id = crypto_rand_int(1<<15); + or_conn->active_circuit_pqueue = smartlist_create(); + or_conn->active_circuit_pqueue_last_recalibrated = cell_ewma_get_tick(); + return or_conn; } @@ -375,6 +378,7 @@ _connection_free(connection_t *conn) or_conn->tls = NULL; or_handshake_state_free(or_conn->handshake_state); or_conn->handshake_state = NULL; + smartlist_free(or_conn->active_circuit_pqueue); tor_free(or_conn->nickname); } if (CONN_IS_EDGE(conn)) { @@ -2278,8 +2282,8 @@ connection_read_bucket_should_increase(or_connection_t *conn) * Mark the connection and return -1 if you want to close it, else * return 0. */ -int -connection_handle_read(connection_t *conn) +static int +connection_handle_read_impl(connection_t *conn) { int max_to_read=-1, try_to_read; size_t before, n_read = 0; @@ -2374,6 +2378,16 @@ loop_again: return 0; } +int +connection_handle_read(connection_t *conn) +{ + int res; + + tor_gettimeofday_cache_clear(); + res = connection_handle_read_impl(conn); + return res; +} + /** Pull in new bytes from conn-\>s or conn-\>linked_conn onto conn-\>inbuf, * either directly or via TLS. Reduce the token buckets by the number of bytes * read. @@ -2575,8 +2589,8 @@ connection_outbuf_too_full(connection_t *conn) * Mark the connection and return -1 if you want to close it, else * return 0. */ -int -connection_handle_write(connection_t *conn, int force) +static int +connection_handle_write_impl(connection_t *conn, int force) { int e; socklen_t len=(socklen_t)sizeof(e); @@ -2743,6 +2757,15 @@ connection_handle_write(connection_t *conn, int force) return 0; } +int +connection_handle_write(connection_t *conn, int force) +{ + int res; + tor_gettimeofday_cache_clear(); + res = connection_handle_write_impl(conn, force); + return res; +} + /** OpenSSL TLS record size is 16383; this is close. The goal here is to * push data out as soon as we know there's enough for a TLS record, so * during periods of high load we won't read entire megabytes from diff --git a/src/or/dns.c b/src/or/dns.c index 7d9c2d4159..08bff763a3 100644 --- a/src/or/dns.c +++ b/src/or/dns.c @@ -128,6 +128,8 @@ typedef struct cached_resolve_t { uint32_t ttl; /**< What TTL did the nameserver tell us? */ /** Connections that want to know when we get an answer for this resolve. */ pending_connection_t *pending_connections; + /** Position of this element in the heap*/ + int minheap_idx; } cached_resolve_t; static void purge_expired_resolves(time_t now); @@ -344,6 +346,7 @@ set_expiry(cached_resolve_t *resolve, time_t expires) resolve->expire = expires; smartlist_pqueue_add(cached_resolve_pqueue, _compare_cached_resolves_by_expiry, + STRUCT_OFFSET(cached_resolve_t, minheap_idx), resolve); } @@ -389,7 +392,8 @@ purge_expired_resolves(time_t now) if (resolve->expire > now) break; smartlist_pqueue_pop(cached_resolve_pqueue, - _compare_cached_resolves_by_expiry); + _compare_cached_resolves_by_expiry, + STRUCT_OFFSET(cached_resolve_t, minheap_idx)); if (resolve->state == CACHE_STATE_PENDING) { log_debug(LD_EXIT, @@ -751,6 +755,7 @@ dns_resolve_impl(edge_connection_t *exitconn, int is_resolve, resolve = tor_malloc_zero(sizeof(cached_resolve_t)); resolve->magic = CACHED_RESOLVE_MAGIC; resolve->state = CACHE_STATE_PENDING; + resolve->minheap_idx = -1; resolve->is_reverse = is_reverse; strlcpy(resolve->address, exitconn->_base.address, sizeof(resolve->address)); @@ -1736,7 +1741,8 @@ _assert_cache_ok(void) return; smartlist_pqueue_assert_ok(cached_resolve_pqueue, - _compare_cached_resolves_by_expiry); + _compare_cached_resolves_by_expiry, + STRUCT_OFFSET(cached_resolve_t, minheap_idx)); SMARTLIST_FOREACH(cached_resolve_pqueue, cached_resolve_t *, res, { diff --git a/src/or/networkstatus.c b/src/or/networkstatus.c index e1a42803fa..507875ab2a 100644 --- a/src/or/networkstatus.c +++ b/src/or/networkstatus.c @@ -1679,6 +1679,7 @@ networkstatus_set_current_consensus(const char *consensus, update_consensus_networkstatus_fetch_time(now); dirvote_recalculate_timing(get_options(), now); routerstatus_list_update_named_server_map(); + cell_ewma_set_scale_factor(get_options(), current_consensus); } if (!from_cache) { diff --git a/src/or/or.h b/src/or/or.h index 42b2ca08b3..52d92079bb 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1075,6 +1075,17 @@ typedef struct or_connection_t { * free up on this connection's outbuf. Every time we pull cells from a * circuit, we advance this pointer to the next circuit in the ring. */ struct circuit_t *active_circuits; + /** Priority queue of cell_ewma_t for circuits with queued cells waiting for + * room to free up on this connection's outbuf. Kept in heap order + * according to EWMA. + * + * This is redundant with active_circuits; if we ever decide only to use the + * cell_ewma algorithm for choosing circuits, we can remove active_circuits. + */ + smartlist_t *active_circuit_pqueue; + /** The tick on which the cell_ewma_ts in active_circuit_pqueue last had + * their ewma values rescaled. */ + unsigned active_circuit_pqueue_last_recalibrated; struct or_connection_t *next_with_same_id; /**< Next connection with same * identity digest as this one. */ } or_connection_t; @@ -1992,6 +2003,29 @@ typedef struct { time_t expiry_time; } cpath_build_state_t; +/** + * The cell_ewma_t structure keeps track of how many cells a circuit has + * transferred recently. It keeps an EWMA (exponentially weighted moving + * average) of the number of cells flushed from the circuit queue onto a + * connection in connection_or_flush_from_first_active_circuit(). + */ +typedef struct { + /** The last 'tick' at which we recalibrated cell_count. + * + * A cell sent at exactly the start of this tick has weight 1.0. Cells sent + * since the start of this tick have weight greater than 1.0; ones sent + * earlier have less weight. */ + unsigned last_adjusted_tick; + /** The EWMA of the cell count. */ + double cell_count; + /** True iff this is the cell count for a circuit's previous + * connection. */ + unsigned int is_for_p_conn : 1; + /** The position of the circuit within the or connection's priority + * queue. */ + int heap_index; +} cell_ewma_t; + #define ORIGIN_CIRCUIT_MAGIC 0x35315243u #define OR_CIRCUIT_MAGIC 0x98ABC04Fu @@ -2081,6 +2115,11 @@ typedef struct circuit_t { /** Unique ID for measuring tunneled network status requests. */ uint64_t dirreq_id; + + /** The EWMA count for the number of cells flushed from the + * n_conn_cells queue. Used to determine which circuit to flush from next. + */ + cell_ewma_t n_cell_ewma; } circuit_t; /** Largest number of relay_early cells that we can send on a given @@ -2212,6 +2251,10 @@ typedef struct or_circuit_t { * exit-ward queues of this circuit; reset every time when writing * buffer stats to disk. */ uint64_t total_cell_waiting_time; + + /** The EWMA count for the number of cells flushed from the + * p_conn_cells queue. */ + cell_ewma_t p_cell_ewma; } or_circuit_t; /** Convert a circuit subtype to a circuit_t.*/ @@ -2743,6 +2786,21 @@ typedef struct { * to make this false. */ int ReloadTorrcOnSIGHUP; + /* The main parameter for picking circuits within a connection. + * + * If this value is positive, when picking a cell to relay on a connection, + * we always relay from the circuit whose weighted cell count is lowest. + * Cells are weighted exponentially such that if one cell is sent + * 'CircuitPriorityHalflife' seconds before another, it counts for half as + * much. + * + * If this value is zero, we're disabling the cell-EWMA algorithm. + * + * If this value is negative, we're using the default approach + * according to either Tor or a parameter set in the consensus. + */ + double CircuitPriorityHalflife; + } or_options_t; /** Persistent state for an onion router, as saved to disk. */ @@ -4453,6 +4511,9 @@ int append_address_to_payload(char *payload_out, const tor_addr_t *addr); const char *decode_address_from_payload(tor_addr_t *addr_out, const char *payload, int payload_len); +unsigned cell_ewma_get_tick(void); +void cell_ewma_set_scale_factor(or_options_t *options, + networkstatus_t *consensus); /********************************* rephist.c ***************************/ @@ -5135,5 +5196,7 @@ int rend_parse_introduction_points(rend_service_descriptor_t *parsed, size_t intro_points_encoded_size); int rend_parse_client_keys(strmap_t *parsed_clients, const char *str); +void tor_gettimeofday_cache_clear(void); + #endif diff --git a/src/or/relay.c b/src/or/relay.c index ac305ce3df..ae1b062cf6 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -10,6 +10,7 @@ * receiving from circuits, plus queuing on circuits. **/ +#include <math.h> #include "or.h" #include "mempool.h" @@ -35,6 +36,26 @@ circuit_resume_edge_reading_helper(edge_connection_t *conn, static int circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint); +/** Cache the current hi-res time; the cache gets reset when libevent + * calls us. */ + +static struct timeval cached_time_hires = {0, 0}; + +static void +tor_gettimeofday_cached(struct timeval *tv) +{ + if (cached_time_hires.tv_sec == 0) { + tor_gettimeofday(&cached_time_hires); + } + *tv = cached_time_hires; +} + +void +tor_gettimeofday_cache_clear(void) +{ + cached_time_hires.tv_sec = 0; +} + /** Stats: how many relay cells have originated at this hop, or have * been relayed onward (not recognized at this hop)? */ @@ -1633,7 +1654,7 @@ cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell) insertion_time_queue_t *it_queue = queue->insertion_times; if (!it_pool) it_pool = mp_pool_new(sizeof(insertion_time_elem_t), 1024); - tor_gettimeofday(&now); + tor_gettimeofday_cached(&now); #define SECONDS_IN_A_DAY 86400L added = (uint32_t)(((now.tv_sec % SECONDS_IN_A_DAY) * 100L) + ((uint32_t)now.tv_usec / (uint32_t)10000L)); @@ -1731,6 +1752,224 @@ prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn) } } +/** Helper for sorting cell_ewma_t values in their priority queue. */ +static int +compare_cell_ewma_counts(const void *p1, const void *p2) +{ + const cell_ewma_t *e1=p1, *e2=p2; + if (e1->cell_count < e2->cell_count) + return -1; + else if (e1->cell_count > e2->cell_count) + return 1; + else + return 0; +} + +/** Given a cell_ewma_t, return a pointer to the circuit containing it. */ +static circuit_t * +cell_ewma_to_circuit(cell_ewma_t *ewma) +{ + if (ewma->is_for_p_conn) { + /* This is an or_circuit_t's p_cell_ewma. */ + or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma); + return TO_CIRCUIT(orcirc); + } else { + /* This is some circuit's n_cell_ewma. */ + return SUBTYPE_P(ewma, circuit_t, n_cell_ewma); + } +} + +/* ==== Functions for scaling cell_ewma_t ==== + + When choosing which cells to relay first, we favor circuits that have been + quiet recently. This gives better latency on connections that aren't + pushing lots of data, and makes the network feel more interactive. + + Conceptually, we take an exponentially weighted mean average of the number + of cells a circuit has sent, and allow active circuits (those with cells to + relay) to send cells in reverse order of their exponentially-weighted mean + average (EWMA) cell count. [That is, a cell sent N seconds ago 'counts' + F^N times as much as a cell sent now, for 0<F<1.0, and we favor the + circuit that has sent the fewest cells] + + If 'double' had infinite precision, we could do this simply by counting a + cell sent at startup as having weight 1.0, and a cell sent N seconds later + as having weight F^-N. This way, we would never need to re-scale + any already-sent cells. + + To prevent double from overflowing, we could count a cell sent now as + having weight 1.0 and a cell sent N seconds ago as having weight F^N. + This, however, would mean we'd need to re-scale *ALL* old circuits every + time we wanted to send a cell. + + So as a compromise, we divide time into 'ticks' (currently, 10-second + increments) and say that a cell sent at the start of a current tick is + worth 1.0, a cell sent N seconds before the start of the current tick is + worth F^N, and a cell sent N seconds after the start of the current tick is + worth F^-N. This way we don't overflow, and we don't need to constantly + rescale. + */ + +/** How long does a tick last (seconds)? */ +#define EWMA_TICK_LEN 10 + +/** The default per-tick scale factor, if it hasn't been overridden by a + * consensus or a configuration setting. zero means "disabled". */ +#define EWMA_DEFAULT_HALFLIFE 0.0 + +/** Given a timeval <b>now</b>, compute the cell_ewma tick in which it occurs + * and the fraction of the tick that has elapsed between the start of the tick + * and <b>now</b>. Return the former and store the latter in + * *<b>remainder_out</b>. + * + * These tick values are not meant to be shared between Tor instances, or used + * for other purposes. */ +static unsigned +cell_ewma_tick_from_timeval(const struct timeval *now, + double *remainder_out) +{ + unsigned res = (unsigned) (now->tv_sec / EWMA_TICK_LEN); + /* rem */ + double rem = (now->tv_sec % EWMA_TICK_LEN) + + ((double)(now->tv_usec)) / 1.0e6; + *remainder_out = rem / EWMA_TICK_LEN; + return res; +} + +/** Compute and return the current cell_ewma tick. */ +unsigned +cell_ewma_get_tick(void) +{ + return ((unsigned)approx_time() / EWMA_TICK_LEN); +} + +/** The per-tick scale factor to be used when computing cell-count EWMA + * values. (A cell sent N ticks before the start of the current tick + * has value ewma_scale_factor ** N.) + */ +static double ewma_scale_factor = 0.1; +static int ewma_enabled = 0; + +#define EPSILON 0.00001 +#define LOG_ONEHALF -0.69314718055994529 + +/** Adjust the global cell scale factor based on <b>options</b> */ +void +cell_ewma_set_scale_factor(or_options_t *options, networkstatus_t *consensus) +{ + int32_t halflife_ms; + double halflife; + const char *source; + if (options && options->CircuitPriorityHalflife >= -EPSILON) { + halflife = options->CircuitPriorityHalflife; + source = "CircuitPriorityHalflife in configuration"; + } else if (consensus && + (halflife_ms = networkstatus_get_param( + consensus, "CircPriorityHalflifeMsec", -1) >= 0)) { + halflife = ((double)halflife_ms)/1000.0; + source = "CircPriorityHalflifeMsec in consensus"; + } else { + halflife = EWMA_DEFAULT_HALFLIFE; + source = "Default value"; + } + + if (halflife <= EPSILON) { + /* The cell EWMA algorithm is disabled. */ + ewma_scale_factor = 0.1; + ewma_enabled = 0; + log_info(LD_OR, + "Disabled cell_ewma algorithm because of value in %s", + source); + } else { + /* convert halflife into halflife-per-tick. */ + halflife /= EWMA_TICK_LEN; + /* compute per-tick scale factor. */ + ewma_scale_factor = exp( LOG_ONEHALF / halflife ); + ewma_enabled = 1; + log_info(LD_OR, + "Enabled cell_ewma algorithm because of value in %s; " + "scale factor is %lf per %d seconds", + source, ewma_scale_factor, EWMA_TICK_LEN); + } +} + +/** Return the multiplier necessary to convert the value of a cell sent in + * 'from_tick' to one sent in 'to_tick'. */ +static INLINE double +get_scale_factor(unsigned from_tick, unsigned to_tick) +{ + /* This math can wrap around, but that's okay: unsigned overflow is + well-defined */ + int diff = (int)(to_tick - from_tick); + return pow(ewma_scale_factor, diff); +} + +/** Adjust the cell count of <b>ewma</b> so that it is scaled with respect to + * <b>cur_tick</b> */ +static void +scale_single_cell_ewma(cell_ewma_t *ewma, unsigned cur_tick) +{ + double factor = get_scale_factor(ewma->last_adjusted_tick, cur_tick); + ewma->cell_count *= factor; + ewma->last_adjusted_tick = cur_tick; +} + +/** Adjust the cell count of every active circuit on <b>conn</b> so + * that they are scaled with respect to <b>cur_tick</b> */ +static void +scale_active_circuits(or_connection_t *conn, unsigned cur_tick) +{ + + double factor = get_scale_factor( + conn->active_circuit_pqueue_last_recalibrated, + cur_tick); + /** Ordinarily it isn't okay to change the value of an element in a heap, + * but it's okay here, since we are preserving the order. */ + SMARTLIST_FOREACH(conn->active_circuit_pqueue, cell_ewma_t *, e, { + tor_assert(e->last_adjusted_tick == + conn->active_circuit_pqueue_last_recalibrated); + e->cell_count *= factor; + e->last_adjusted_tick = cur_tick; + }); + conn->active_circuit_pqueue_last_recalibrated = cur_tick; +} + +/** Rescale <b>ewma</b> to the same scale as <b>conn</b>, and add it to + * <b>conn</b>'s priority queue of active circuits */ +static void +add_cell_ewma_to_conn(or_connection_t *conn, cell_ewma_t *ewma) +{ + tor_assert(ewma->heap_index == -1); + scale_single_cell_ewma(ewma, + conn->active_circuit_pqueue_last_recalibrated); + + smartlist_pqueue_add(conn->active_circuit_pqueue, + compare_cell_ewma_counts, + STRUCT_OFFSET(cell_ewma_t, heap_index), + ewma); +} + +/** Remove <b>ewma</b> from <b>conn</b>'s priority queue of active circuits */ +static void +remove_cell_ewma_from_conn(or_connection_t *conn, cell_ewma_t *ewma) +{ + tor_assert(ewma->heap_index != -1); + smartlist_pqueue_remove(conn->active_circuit_pqueue, + compare_cell_ewma_counts, + STRUCT_OFFSET(cell_ewma_t, heap_index), + ewma); +} + +/** Remove and return the first cell_ewma_t from conn's priority queue of + * active circuits. Requires that the priority queue is nonempty. */ +static cell_ewma_t * +pop_first_cell_ewma_from_conn(or_connection_t *conn) +{ + return smartlist_pqueue_pop(conn->active_circuit_pqueue, + compare_cell_ewma_counts, + STRUCT_OFFSET(cell_ewma_t, heap_index)); +} + /** Add <b>circ</b> to the list of circuits with pending cells on * <b>conn</b>. No effect if <b>circ</b> is already linked. */ void @@ -1744,6 +1983,8 @@ make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn) return; } + assert_active_circuits_ok_paranoid(conn); + if (! conn->active_circuits) { conn->active_circuits = circ; *prevp = *nextp = circ; @@ -1755,6 +1996,15 @@ make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn) *prev_circ_on_conn_p(head, conn) = circ; *prevp = old_tail; } + + if (circ->n_conn == conn) { + add_cell_ewma_to_conn(conn, &circ->n_cell_ewma); + } else { + or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); + tor_assert(conn == orcirc->p_conn); + add_cell_ewma_to_conn(conn, &orcirc->p_cell_ewma); + } + assert_active_circuits_ok_paranoid(conn); } @@ -1772,6 +2022,8 @@ make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn) return; } + assert_active_circuits_ok_paranoid(conn); + tor_assert(next && prev); tor_assert(*prev_circ_on_conn_p(next, conn) == circ); tor_assert(*next_circ_on_conn_p(prev, conn) == circ); @@ -1785,6 +2037,15 @@ make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn) conn->active_circuits = next; } *prevp = *nextp = NULL; + + if (circ->n_conn == conn) { + remove_cell_ewma_from_conn(conn, &circ->n_cell_ewma); + } else { + or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); + tor_assert(conn == orcirc->p_conn); + remove_cell_ewma_from_conn(conn, &orcirc->p_cell_ewma); + } + assert_active_circuits_ok_paranoid(conn); } @@ -1804,6 +2065,10 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn) cur = next; } while (cur != head); orconn->active_circuits = NULL; + + SMARTLIST_FOREACH(orconn->active_circuit_pqueue, cell_ewma_t *, e, + e->heap_index = -1); + smartlist_clear(orconn->active_circuit_pqueue); } /** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false) @@ -1857,9 +2122,35 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, cell_queue_t *queue; circuit_t *circ; int streams_blocked; + + /* The current (hi-res) time */ + struct timeval now_hires; + + /* The EWMA cell counter for the circuit we're flushing. */ + cell_ewma_t *cell_ewma = NULL; + double ewma_increment = -1; + circ = conn->active_circuits; if (!circ) return 0; assert_active_circuits_ok_paranoid(conn); + + /* See if we're doing the ewma circuit selection algorithm. */ + if (ewma_enabled) { + unsigned tick; + double fractional_tick; + tor_gettimeofday_cached(&now_hires); + tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick); + + if (tick != conn->active_circuit_pqueue_last_recalibrated) { + scale_active_circuits(conn, tick); + } + + ewma_increment = pow(ewma_scale_factor, -fractional_tick); + + cell_ewma = smartlist_get(conn->active_circuit_pqueue, 0); + circ = cell_ewma_to_circuit(cell_ewma); + } + if (circ->n_conn == conn) { queue = &circ->n_conn_cells; streams_blocked = circ->streams_blocked_on_n_conn; @@ -1879,7 +2170,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, uint32_t flushed; uint32_t cell_waiting_time; insertion_time_queue_t *it_queue = queue->insertion_times; - tor_gettimeofday(&now); + tor_gettimeofday_cached(&now); flushed = (uint32_t)((now.tv_sec % SECONDS_IN_A_DAY) * 100L + (uint32_t)now.tv_usec / (uint32_t)10000L); if (!it_queue || !it_queue->first) { @@ -1915,6 +2206,16 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, packed_cell_free_unchecked(cell); ++n_flushed; + if (cell_ewma) { + cell_ewma_t *tmp; + cell_ewma->cell_count += ewma_increment; + /* We pop and re-add the cell_ewma_t here, not above, since we need to + * re-add it immediately to keep the priority queue consistent with + * the linked-list implementation */ + tmp = pop_first_cell_ewma_from_conn(conn); + tor_assert(tmp == cell_ewma); + add_cell_ewma_to_conn(conn, cell_ewma); + } if (circ != conn->active_circuits) { /* If this happens, the current circuit just got made inactive by * a call in connection_write_to_buf(). That's nothing to worry about: @@ -1934,7 +2235,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) set_streams_blocked_on_circ(circ, conn, 0); /* unblock streams */ - /* Did we just ran out of cells on this queue? */ + /* Did we just run out of cells on this circuit's queue? */ if (queue->n == 0) { log_debug(LD_GENERAL, "Made a circuit inactive."); make_circuit_inactive_on_conn(circ, conn); @@ -2057,16 +2358,31 @@ assert_active_circuits_ok(or_connection_t *orconn) { circuit_t *head = orconn->active_circuits; circuit_t *cur = head; + int n = 0; if (! head) return; do { circuit_t *next = *next_circ_on_conn_p(cur, orconn); circuit_t *prev = *prev_circ_on_conn_p(cur, orconn); + cell_ewma_t *ewma; tor_assert(next); tor_assert(prev); tor_assert(*next_circ_on_conn_p(prev, orconn) == cur); tor_assert(*prev_circ_on_conn_p(next, orconn) == cur); + if (orconn == cur->n_conn) { + ewma = &cur->n_cell_ewma; + tor_assert(!ewma->is_for_p_conn); + } else { + ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma; + tor_assert(ewma->is_for_p_conn); + } + tor_assert(ewma->heap_index != -1); + tor_assert(ewma == smartlist_get(orconn->active_circuit_pqueue, + ewma->heap_index)); + n++; cur = next; } while (cur != head); + + tor_assert(n == smartlist_len(orconn->active_circuit_pqueue)); } diff --git a/src/test/test_containers.c b/src/test/test_containers.c index fc48ac966a..efc879b8c2 100644 --- a/src/test/test_containers.c +++ b/src/test/test_containers.c @@ -515,11 +515,17 @@ test_container_digestset(void) smartlist_free(included); } -/** Helper: return a tristate based on comparing two strings. */ +typedef struct pq_entry_t { + const char *val; + int idx; +} pq_entry_t; + +/** Helper: return a tristate based on comparing two pq_entry_t values. */ static int -_compare_strings_for_pqueue(const void *s1, const void *s2) +_compare_strings_for_pqueue(const void *p1, const void *p2) { - return strcmp((const char*)s1, (const char*)s2); + const pq_entry_t *e1=p1, *e2=p2; + return strcmp(e1->val, e2->val); } /** Run unit tests for heap-based priority queue functions. */ @@ -528,50 +534,90 @@ test_container_pqueue(void) { smartlist_t *sl = smartlist_create(); int (*cmp)(const void *, const void*); -#define OK() smartlist_pqueue_assert_ok(sl, cmp) + const int offset = STRUCT_OFFSET(pq_entry_t, idx); +#define ENTRY(s) pq_entry_t s = { #s, -1 } + ENTRY(cows); + ENTRY(zebras); + ENTRY(fish); + ENTRY(frogs); + ENTRY(apples); + ENTRY(squid); + ENTRY(daschunds); + ENTRY(eggplants); + ENTRY(weissbier); + ENTRY(lobsters); + ENTRY(roquefort); + ENTRY(chinchillas); + ENTRY(fireflies); + +#define OK() smartlist_pqueue_assert_ok(sl, cmp, offset) cmp = _compare_strings_for_pqueue; - - smartlist_pqueue_add(sl, cmp, (char*)"cows"); - smartlist_pqueue_add(sl, cmp, (char*)"zebras"); - smartlist_pqueue_add(sl, cmp, (char*)"fish"); - smartlist_pqueue_add(sl, cmp, (char*)"frogs"); - smartlist_pqueue_add(sl, cmp, (char*)"apples"); - smartlist_pqueue_add(sl, cmp, (char*)"squid"); - smartlist_pqueue_add(sl, cmp, (char*)"daschunds"); - smartlist_pqueue_add(sl, cmp, (char*)"eggplants"); - smartlist_pqueue_add(sl, cmp, (char*)"weissbier"); - smartlist_pqueue_add(sl, cmp, (char*)"lobsters"); - smartlist_pqueue_add(sl, cmp, (char*)"roquefort"); + smartlist_pqueue_add(sl, cmp, offset, &cows); + smartlist_pqueue_add(sl, cmp, offset, &zebras); + smartlist_pqueue_add(sl, cmp, offset, &fish); + smartlist_pqueue_add(sl, cmp, offset, &frogs); + smartlist_pqueue_add(sl, cmp, offset, &apples); + smartlist_pqueue_add(sl, cmp, offset, &squid); + smartlist_pqueue_add(sl, cmp, offset, &daschunds); + smartlist_pqueue_add(sl, cmp, offset, &eggplants); + smartlist_pqueue_add(sl, cmp, offset, &weissbier); + smartlist_pqueue_add(sl, cmp, offset, &lobsters); + smartlist_pqueue_add(sl, cmp, offset, &roquefort); OK(); test_eq(smartlist_len(sl), 11); - test_streq(smartlist_get(sl, 0), "apples"); - test_streq(smartlist_pqueue_pop(sl, cmp), "apples"); + test_eq_ptr(smartlist_get(sl, 0), &apples); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &apples); test_eq(smartlist_len(sl), 10); OK(); - test_streq(smartlist_pqueue_pop(sl, cmp), "cows"); - test_streq(smartlist_pqueue_pop(sl, cmp), "daschunds"); - smartlist_pqueue_add(sl, cmp, (char*)"chinchillas"); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &cows); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &daschunds); + smartlist_pqueue_add(sl, cmp, offset, &chinchillas); + OK(); + smartlist_pqueue_add(sl, cmp, offset, &fireflies); + OK(); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &chinchillas); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &eggplants); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &fireflies); + OK(); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &fish); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &frogs); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &lobsters); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &roquefort); + OK(); + test_eq(smartlist_len(sl), 3); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &squid); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &weissbier); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &zebras); + test_eq(smartlist_len(sl), 0); OK(); - smartlist_pqueue_add(sl, cmp, (char*)"fireflies"); + + /* Now test remove. */ + smartlist_pqueue_add(sl, cmp, offset, &cows); + smartlist_pqueue_add(sl, cmp, offset, &fish); + smartlist_pqueue_add(sl, cmp, offset, &frogs); + smartlist_pqueue_add(sl, cmp, offset, &apples); + smartlist_pqueue_add(sl, cmp, offset, &squid); + smartlist_pqueue_add(sl, cmp, offset, &zebras); + test_eq(smartlist_len(sl), 6); OK(); - test_streq(smartlist_pqueue_pop(sl, cmp), "chinchillas"); - test_streq(smartlist_pqueue_pop(sl, cmp), "eggplants"); - test_streq(smartlist_pqueue_pop(sl, cmp), "fireflies"); + smartlist_pqueue_remove(sl, cmp, offset, &zebras); + test_eq(smartlist_len(sl), 5); OK(); - test_streq(smartlist_pqueue_pop(sl, cmp), "fish"); - test_streq(smartlist_pqueue_pop(sl, cmp), "frogs"); - test_streq(smartlist_pqueue_pop(sl, cmp), "lobsters"); - test_streq(smartlist_pqueue_pop(sl, cmp), "roquefort"); + smartlist_pqueue_remove(sl, cmp, offset, &cows); + test_eq(smartlist_len(sl), 4); OK(); + smartlist_pqueue_remove(sl, cmp, offset, &apples); test_eq(smartlist_len(sl), 3); - test_streq(smartlist_pqueue_pop(sl, cmp), "squid"); - test_streq(smartlist_pqueue_pop(sl, cmp), "weissbier"); - test_streq(smartlist_pqueue_pop(sl, cmp), "zebras"); + OK(); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &fish); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &frogs); + test_eq_ptr(smartlist_pqueue_pop(sl, cmp, offset), &squid); test_eq(smartlist_len(sl), 0); OK(); + #undef OK done: |