aboutsummaryrefslogtreecommitdiff
path: root/src/or/relay.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/or/relay.c')
-rw-r--r--src/or/relay.c751
1 files changed, 682 insertions, 69 deletions
diff --git a/src/or/relay.c b/src/or/relay.c
index b40d7ad244..a6c25062a3 100644
--- a/src/or/relay.c
+++ b/src/or/relay.c
@@ -10,8 +10,26 @@
* receiving from circuits, plus queuing on circuits.
**/
+#include <math.h>
#include "or.h"
+#include "buffers.h"
+#include "circuitbuild.h"
+#include "circuitlist.h"
+#include "config.h"
+#include "connection.h"
+#include "connection_edge.h"
+#include "connection_or.h"
+#include "control.h"
+#include "geoip.h"
+#include "main.h"
#include "mempool.h"
+#include "networkstatus.h"
+#include "policies.h"
+#include "reasons.h"
+#include "relay.h"
+#include "rendcommon.h"
+#include "routerlist.h"
+#include "routerparse.h"
static int relay_crypt(circuit_t *circ, cell_t *cell,
cell_direction_t cell_direction,
@@ -20,20 +38,46 @@ static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
cell_direction_t cell_direction,
crypt_path_t *layer_hint);
-static int
-connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
- edge_connection_t *conn,
- crypt_path_t *layer_hint);
-static void
-circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint);
+static int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
+ edge_connection_t *conn,
+ crypt_path_t *layer_hint);
+static void circuit_consider_sending_sendme(circuit_t *circ,
+ crypt_path_t *layer_hint);
+static void circuit_resume_edge_reading(circuit_t *circ,
+ crypt_path_t *layer_hint);
+static int circuit_resume_edge_reading_helper(edge_connection_t *conn,
+ circuit_t *circ,
+ crypt_path_t *layer_hint);
+static int circuit_consider_stop_edge_reading(circuit_t *circ,
+ crypt_path_t *layer_hint);
+static int circuit_queue_streams_are_blocked(circuit_t *circ);
+
+/** Cache the current hi-res time; the cache gets reset when libevent
+ * calls us. */
+
+static struct timeval cached_time_hires = {0, 0};
+
+/** Stop reading on edge connections when we have this many cells
+ * waiting on the appropriate queue. */
+#define CELL_QUEUE_HIGHWATER_SIZE 256
+/** Start reading from edge connections again when we get down to this many
+ * cells. */
+#define CELL_QUEUE_LOWWATER_SIZE 64
+
static void
-circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint);
-static int
-circuit_resume_edge_reading_helper(edge_connection_t *conn,
- circuit_t *circ,
- crypt_path_t *layer_hint);
-static int
-circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint);
+tor_gettimeofday_cached(struct timeval *tv)
+{
+ if (cached_time_hires.tv_sec == 0) {
+ tor_gettimeofday(&cached_time_hires);
+ }
+ *tv = cached_time_hires;
+}
+
+void
+tor_gettimeofday_cache_clear(void)
+{
+ cached_time_hires.tv_sec = 0;
+}
/** Stats: how many relay cells have originated at this hop, or have
* been relayed onward (not recognized at this hop)?
@@ -230,7 +274,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
* we might kill the circ before we relay
* the cells. */
- append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction);
+ append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0);
return 0;
}
@@ -327,7 +371,7 @@ relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction,
static int
circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
cell_direction_t cell_direction,
- crypt_path_t *layer_hint)
+ crypt_path_t *layer_hint, streamid_t on_stream)
{
or_connection_t *conn; /* where to send the cell */
@@ -371,7 +415,7 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
}
++stats_n_relay_cells_relayed;
- append_cell_to_circuit_queue(circ, conn, cell, cell_direction);
+ append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream);
return 0;
}
@@ -496,7 +540,7 @@ relay_command_to_string(uint8_t command)
* return 0.
*/
int
-relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ,
+relay_send_command_from_edge(streamid_t stream_id, circuit_t *circ,
uint8_t relay_command, const char *payload,
size_t payload_len, crypt_path_t *cpath_layer)
{
@@ -531,6 +575,12 @@ relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ,
log_debug(LD_OR,"delivering %d cell %s.", relay_command,
cell_direction == CELL_DIRECTION_OUT ? "forward" : "backward");
+ /* If we are sending an END cell and this circuit is used for a tunneled
+ * directory request, advance its state. */
+ if (relay_command == RELAY_COMMAND_END && circ->dirreq_id)
+ geoip_change_dirreq_state(circ->dirreq_id, DIRREQ_TUNNELED,
+ DIRREQ_END_CELL_SENT);
+
if (cell_direction == CELL_DIRECTION_OUT && circ->n_conn) {
/* if we're using relaybandwidthrate, this conn wants priority */
circ->n_conn->client_used = approx_time();
@@ -540,17 +590,11 @@ relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ,
origin_circuit_t *origin_circ = TO_ORIGIN_CIRCUIT(circ);
if (origin_circ->remaining_relay_early_cells > 0 &&
(relay_command == RELAY_COMMAND_EXTEND ||
- (cpath_layer != origin_circ->cpath &&
- !CIRCUIT_PURPOSE_IS_ESTABLISHED_REND(circ->purpose)))) {
- /* If we've got any relay_early cells left, and we're sending
- * an extend cell or (we're not talking to the first hop and we're
- * not talking to a rendezvous circuit), use one of them.
- * Don't worry about the conn protocol version:
+ cpath_layer != origin_circ->cpath)) {
+ /* If we've got any relay_early cells left and (we're sending
+ * an extend cell or we're not talking to the first hop), use
+ * one of them. Don't worry about the conn protocol version:
* append_cell_to_circuit_queue will fix it up. */
- /* XXX For now, clients don't use RELAY_EARLY cells when sending
- * relay cells on rendezvous circuits. See bug 1038. Eventually,
- * we can take this behavior away in favor of having clients avoid
- * rendezvous points running 0.2.1.3-alpha through 0.2.1.18. -RD */
cell.command = CELL_RELAY_EARLY;
--origin_circ->remaining_relay_early_cells;
log_debug(LD_OR, "Sending a RELAY_EARLY cell; %d remaining.",
@@ -578,8 +622,8 @@ relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ,
}
}
- if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer)
- < 0) {
+ if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer,
+ stream_id) < 0) {
log_warn(LD_BUG,"circuit_package_relay_cell failed. Closing.");
circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
return -1;
@@ -901,7 +945,7 @@ connection_edge_process_relay_cell_not_open(
}
/* handle anything that might have queued */
- if (connection_edge_package_raw_inbuf(conn, 1) < 0) {
+ if (connection_edge_package_raw_inbuf(conn, 1, NULL) < 0) {
/* (We already sent an end cell if possible) */
connection_mark_for_close(TO_CONN(conn));
return 0;
@@ -999,7 +1043,8 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
relay_header_unpack(&rh, cell->payload);
// log_fn(LOG_DEBUG,"command %d stream %d", rh.command, rh.stream_id);
num_seen++;
- log_debug(domain, "Now seen %d relay cells here.", num_seen);
+ log_debug(domain, "Now seen %d relay cells here (command %d, stream %d).",
+ num_seen, rh.command, rh.stream_id);
if (rh.length > RELAY_PAYLOAD_SIZE) {
log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
@@ -1038,6 +1083,16 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
"Begin cell for known stream. Dropping.");
return 0;
}
+ if (rh.command == RELAY_COMMAND_BEGIN_DIR) {
+ /* Assign this circuit and its app-ward OR connection a unique ID,
+ * so that we can measure download times. The local edge and dir
+ * connection will be assigned the same ID when they are created
+ * and linked. */
+ static uint64_t next_id = 0;
+ circ->dirreq_id = ++next_id;
+ TO_CONN(TO_OR_CIRCUIT(circ)->p_conn)->dirreq_id = circ->dirreq_id;
+ }
+
return connection_exit_begin_conn(cell, circ);
case RELAY_COMMAND_DATA:
++stats_n_data_cells_received;
@@ -1131,6 +1186,7 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
}
if (circ->n_conn) {
uint8_t trunc_reason = *(uint8_t*)(cell->payload + RELAY_HEADER_SIZE);
+ circuit_clear_cell_queue(circ, circ->n_conn);
connection_or_send_destroy(circ->n_circ_id, circ->n_conn,
trunc_reason);
circuit_set_n_circid_orconn(circ, 0, NULL);
@@ -1179,9 +1235,13 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
conn->package_window += STREAMWINDOW_INCREMENT;
log_debug(domain,"stream-level sendme, packagewindow now %d.",
conn->package_window);
+ if (circuit_queue_streams_are_blocked(circ)) {
+ /* Still waiting for queue to flush; don't touch conn */
+ return 0;
+ }
connection_start_reading(TO_CONN(conn));
/* handle whatever might still be on the inbuf */
- if (connection_edge_package_raw_inbuf(conn, 1) < 0) {
+ if (connection_edge_package_raw_inbuf(conn, 1, NULL) < 0) {
/* (We already sent an end cell if possible) */
connection_mark_for_close(TO_CONN(conn));
return 0;
@@ -1247,15 +1307,19 @@ uint64_t stats_n_data_cells_received = 0;
* ever received were completely full of data. */
uint64_t stats_n_data_bytes_received = 0;
-/** While conn->inbuf has an entire relay payload of bytes on it,
- * and the appropriate package windows aren't empty, grab a cell
- * and send it down the circuit.
+/** If <b>conn</b> has an entire relay payload of bytes on its inbuf (or
+ * <b>package_partial</b> is true), and the appropriate package windows aren't
+ * empty, grab a cell and send it down the circuit.
+ *
+ * If *<b>max_cells</b> is given, package no more than max_cells. Decrement
+ * *<b>max_cells</b> by the number of cells packaged.
*
* Return -1 (and send a RELAY_COMMAND_END cell if necessary) if conn should
* be marked for close, else return 0.
*/
int
-connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial)
+connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
+ int *max_cells)
{
size_t amount_to_process, length;
char payload[CELL_PAYLOAD_SIZE];
@@ -1271,7 +1335,10 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial)
return 0;
}
-repeat_connection_edge_package_raw_inbuf:
+ if (max_cells && *max_cells <= 0)
+ return 0;
+
+ repeat_connection_edge_package_raw_inbuf:
circ = circuit_get_by_edge_conn(conn);
if (!circ) {
@@ -1332,6 +1399,12 @@ repeat_connection_edge_package_raw_inbuf:
}
log_debug(domain,"conn->package_window is now %d",conn->package_window);
+ if (max_cells) {
+ *max_cells -= 1;
+ if (*max_cells <= 0)
+ return 0;
+ }
+
/* handle more if there's more, or return 0 if there isn't */
goto repeat_connection_edge_package_raw_inbuf;
}
@@ -1379,7 +1452,10 @@ connection_edge_consider_sending_sendme(edge_connection_t *conn)
static void
circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
{
-
+ if (circuit_queue_streams_are_blocked(circ)) {
+ log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming");
+ return;
+ }
log_debug(layer_hint?LD_APP:LD_EXIT,"resuming");
if (CIRCUIT_IS_ORIGIN(circ))
@@ -1395,31 +1471,136 @@ circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
* of a linked list of edge streams that should each be considered.
*/
static int
-circuit_resume_edge_reading_helper(edge_connection_t *conn,
+circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
circuit_t *circ,
crypt_path_t *layer_hint)
{
- for ( ; conn; conn=conn->next_stream) {
- if (conn->_base.marked_for_close)
+ edge_connection_t *conn;
+ int n_packaging_streams, n_streams_left;
+ int packaged_this_round;
+ int cells_on_queue;
+ int cells_per_conn;
+ edge_connection_t *chosen_stream = NULL;
+
+ /* How many cells do we have space for? It will be the minimum of
+ * the number needed to exhaust the package window, and the minimum
+ * needed to fill the cell queue. */
+ int max_to_package = circ->package_window;
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ cells_on_queue = circ->n_conn_cells.n;
+ } else {
+ or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
+ cells_on_queue = or_circ->p_conn_cells.n;
+ }
+ if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package)
+ max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue;
+
+ /* Once we used to start listening on the streams in the order they
+ * appeared in the linked list. That leads to starvation on the
+ * streams that appeared later on the list, since the first streams
+ * would always get to read first. Instead, we just pick a random
+ * stream on the list, and enable reading for streams starting at that
+ * point (and wrapping around as if the list were circular). It would
+ * probably be better to actually remember which streams we've
+ * serviced in the past, but this is simple and effective. */
+
+ /* Select a stream uniformly at random from the linked list. We
+ * don't need cryptographic randomness here. */
+ {
+ int num_streams = 0;
+ for (conn = first_conn; conn; conn = conn->next_stream) {
+ num_streams++;
+ if ((tor_weak_random() % num_streams)==0)
+ chosen_stream = conn;
+ /* Invariant: chosen_stream has been chosen uniformly at random from
+ * among the first num_streams streams on first_conn. */
+ }
+ }
+
+ /* Count how many non-marked streams there are that have anything on
+ * their inbuf, and enable reading on all of the connections. */
+ n_packaging_streams = 0;
+ /* Activate reading starting from the chosen stream */
+ for (conn=chosen_stream; conn; conn = conn->next_stream) {
+ /* Start reading for the streams starting from here */
+ if (conn->_base.marked_for_close || conn->package_window <= 0)
continue;
- if ((!layer_hint && conn->package_window > 0) ||
- (layer_hint && conn->package_window > 0 &&
- conn->cpath_layer == layer_hint)) {
+ if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
+
+ if (buf_datalen(conn->_base.inbuf) > 0)
+ ++n_packaging_streams;
+ }
+ }
+ /* Go back and do the ones we skipped, circular-style */
+ for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
+ if (conn->_base.marked_for_close || conn->package_window <= 0)
+ continue;
+ if (!layer_hint || conn->cpath_layer == layer_hint) {
+ connection_start_reading(TO_CONN(conn));
+
+ if (buf_datalen(conn->_base.inbuf) > 0)
+ ++n_packaging_streams;
+ }
+ }
+
+ if (n_packaging_streams == 0) /* avoid divide-by-zero */
+ return 0;
+
+ again:
+
+ cells_per_conn = CEIL_DIV(max_to_package, n_packaging_streams);
+
+ packaged_this_round = 0;
+ n_streams_left = 0;
+
+ /* Iterate over all connections. Package up to cells_per_conn cells on
+ * each. Update packaged_this_round with the total number of cells
+ * packaged, and n_streams_left with the number that still have data to
+ * package.
+ */
+ for (conn=first_conn; conn; conn=conn->next_stream) {
+ if (conn->_base.marked_for_close || conn->package_window <= 0)
+ continue;
+ if (!layer_hint || conn->cpath_layer == layer_hint) {
+ int n = cells_per_conn, r;
/* handle whatever might still be on the inbuf */
- if (connection_edge_package_raw_inbuf(conn, 1)<0) {
- /* (We already sent an end cell if possible) */
+ r = connection_edge_package_raw_inbuf(conn, 1, &n);
+
+ /* Note how many we packaged */
+ packaged_this_round += (cells_per_conn-n);
+
+ if (r<0) {
+ /* Problem while packaging. (We already sent an end cell if
+ * possible) */
connection_mark_for_close(TO_CONN(conn));
continue;
}
+ /* If there's still data to read, we'll be coming back to this stream. */
+ if (buf_datalen(conn->_base.inbuf))
+ ++n_streams_left;
+
/* If the circuit won't accept any more data, return without looking
* at any more of the streams. Any connections that should be stopped
* have already been stopped by connection_edge_package_raw_inbuf. */
if (circuit_consider_stop_edge_reading(circ, layer_hint))
return -1;
+ /* XXXX should we also stop immediately if we fill up the cell queue?
+ * Probably. */
}
}
+
+ /* If we made progress, and we are willing to package more, and there are
+ * any streams left that want to package stuff... try again!
+ */
+ if (packaged_this_round && packaged_this_round < max_to_package &&
+ n_streams_left) {
+ max_to_package -= packaged_this_round;
+ n_packaging_streams = n_streams_left;
+ goto again;
+ }
+
return 0;
}
@@ -1488,13 +1669,6 @@ circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint)
}
}
-/** Stop reading on edge connections when we have this many cells
- * waiting on the appropriate queue. */
-#define CELL_QUEUE_HIGHWATER_SIZE 256
-/** Start reading from edge connections again when we get down to this many
- * cells. */
-#define CELL_QUEUE_LOWWATER_SIZE 64
-
#ifdef ACTIVE_CIRCUITS_PARANOIA
#define assert_active_circuits_ok_paranoid(conn) \
assert_active_circuits_ok(conn)
@@ -1508,6 +1682,10 @@ static int total_cells_allocated = 0;
/** A memory pool to allocate packed_cell_t objects. */
static mp_pool_t *cell_pool = NULL;
+/** Memory pool to allocate insertion_time_elem_t objects used for cell
+ * statistics. */
+static mp_pool_t *it_pool = NULL;
+
/** Allocate structures to hold cells. */
void
init_cell_pool(void)
@@ -1516,7 +1694,8 @@ init_cell_pool(void)
cell_pool = mp_pool_new(sizeof(packed_cell_t), 128*1024);
}
-/** Free all storage used to hold cells. */
+/** Free all storage used to hold cells (and insertion times if we measure
+ * cell statistics). */
void
free_cell_pool(void)
{
@@ -1525,6 +1704,10 @@ free_cell_pool(void)
mp_pool_destroy(cell_pool);
cell_pool = NULL;
}
+ if (it_pool) {
+ mp_pool_destroy(it_pool);
+ it_pool = NULL;
+ }
}
/** Free excess storage in cell pool. */
@@ -1537,7 +1720,7 @@ clean_cell_pool(void)
/** Release storage held by <b>cell</b>. */
static INLINE void
-packed_cell_free(packed_cell_t *cell)
+packed_cell_free_unchecked(packed_cell_t *cell)
{
--total_cells_allocated;
mp_pool_release(cell);
@@ -1599,7 +1782,38 @@ cell_queue_append(cell_queue_t *queue, packed_cell_t *cell)
void
cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell)
{
- cell_queue_append(queue, packed_cell_copy(cell));
+ packed_cell_t *copy = packed_cell_copy(cell);
+ /* Remember the time when this cell was put in the queue. */
+ if (get_options()->CellStatistics) {
+ struct timeval now;
+ uint32_t added;
+ insertion_time_queue_t *it_queue = queue->insertion_times;
+ if (!it_pool)
+ it_pool = mp_pool_new(sizeof(insertion_time_elem_t), 1024);
+ tor_gettimeofday_cached(&now);
+#define SECONDS_IN_A_DAY 86400L
+ added = (uint32_t)(((now.tv_sec % SECONDS_IN_A_DAY) * 100L)
+ + ((uint32_t)now.tv_usec / (uint32_t)10000L));
+ if (!it_queue) {
+ it_queue = tor_malloc_zero(sizeof(insertion_time_queue_t));
+ queue->insertion_times = it_queue;
+ }
+ if (it_queue->last && it_queue->last->insertion_time == added) {
+ it_queue->last->counter++;
+ } else {
+ insertion_time_elem_t *elem = mp_pool_get(it_pool);
+ elem->next = NULL;
+ elem->insertion_time = added;
+ elem->counter = 1;
+ if (it_queue->last) {
+ it_queue->last->next = elem;
+ it_queue->last = elem;
+ } else {
+ it_queue->first = it_queue->last = elem;
+ }
+ }
+ }
+ cell_queue_append(queue, copy);
}
/** Remove and free every cell in <b>queue</b>. */
@@ -1610,11 +1824,19 @@ cell_queue_clear(cell_queue_t *queue)
cell = queue->head;
while (cell) {
next = cell->next;
- packed_cell_free(cell);
+ packed_cell_free_unchecked(cell);
cell = next;
}
queue->head = queue->tail = NULL;
queue->n = 0;
+ if (queue->insertion_times) {
+ while (queue->insertion_times->first) {
+ insertion_time_elem_t *elem = queue->insertion_times->first;
+ queue->insertion_times->first = elem->next;
+ mp_pool_release(elem);
+ }
+ tor_free(queue->insertion_times);
+ }
}
/** Extract and return the cell at the head of <b>queue</b>; return NULL if
@@ -1666,8 +1888,226 @@ prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
}
}
+/** Helper for sorting cell_ewma_t values in their priority queue. */
+static int
+compare_cell_ewma_counts(const void *p1, const void *p2)
+{
+ const cell_ewma_t *e1=p1, *e2=p2;
+ if (e1->cell_count < e2->cell_count)
+ return -1;
+ else if (e1->cell_count > e2->cell_count)
+ return 1;
+ else
+ return 0;
+}
+
+/** Given a cell_ewma_t, return a pointer to the circuit containing it. */
+static circuit_t *
+cell_ewma_to_circuit(cell_ewma_t *ewma)
+{
+ if (ewma->is_for_p_conn) {
+ /* This is an or_circuit_t's p_cell_ewma. */
+ or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma);
+ return TO_CIRCUIT(orcirc);
+ } else {
+ /* This is some circuit's n_cell_ewma. */
+ return SUBTYPE_P(ewma, circuit_t, n_cell_ewma);
+ }
+}
+
+/* ==== Functions for scaling cell_ewma_t ====
+
+ When choosing which cells to relay first, we favor circuits that have been
+ quiet recently. This gives better latency on connections that aren't
+ pushing lots of data, and makes the network feel more interactive.
+
+ Conceptually, we take an exponentially weighted mean average of the number
+ of cells a circuit has sent, and allow active circuits (those with cells to
+ relay) to send cells in reverse order of their exponentially-weighted mean
+ average (EWMA) cell count. [That is, a cell sent N seconds ago 'counts'
+ F^N times as much as a cell sent now, for 0<F<1.0, and we favor the
+ circuit that has sent the fewest cells]
+
+ If 'double' had infinite precision, we could do this simply by counting a
+ cell sent at startup as having weight 1.0, and a cell sent N seconds later
+ as having weight F^-N. This way, we would never need to re-scale
+ any already-sent cells.
+
+ To prevent double from overflowing, we could count a cell sent now as
+ having weight 1.0 and a cell sent N seconds ago as having weight F^N.
+ This, however, would mean we'd need to re-scale *ALL* old circuits every
+ time we wanted to send a cell.
+
+ So as a compromise, we divide time into 'ticks' (currently, 10-second
+ increments) and say that a cell sent at the start of a current tick is
+ worth 1.0, a cell sent N seconds before the start of the current tick is
+ worth F^N, and a cell sent N seconds after the start of the current tick is
+ worth F^-N. This way we don't overflow, and we don't need to constantly
+ rescale.
+ */
+
+/** How long does a tick last (seconds)? */
+#define EWMA_TICK_LEN 10
+
+/** The default per-tick scale factor, if it hasn't been overridden by a
+ * consensus or a configuration setting. zero means "disabled". */
+#define EWMA_DEFAULT_HALFLIFE 0.0
+
+/** Given a timeval <b>now</b>, compute the cell_ewma tick in which it occurs
+ * and the fraction of the tick that has elapsed between the start of the tick
+ * and <b>now</b>. Return the former and store the latter in
+ * *<b>remainder_out</b>.
+ *
+ * These tick values are not meant to be shared between Tor instances, or used
+ * for other purposes. */
+static unsigned
+cell_ewma_tick_from_timeval(const struct timeval *now,
+ double *remainder_out)
+{
+ unsigned res = (unsigned) (now->tv_sec / EWMA_TICK_LEN);
+ /* rem */
+ double rem = (now->tv_sec % EWMA_TICK_LEN) +
+ ((double)(now->tv_usec)) / 1.0e6;
+ *remainder_out = rem / EWMA_TICK_LEN;
+ return res;
+}
+
+/** Compute and return the current cell_ewma tick. */
+unsigned
+cell_ewma_get_tick(void)
+{
+ return ((unsigned)approx_time() / EWMA_TICK_LEN);
+}
+
+/** The per-tick scale factor to be used when computing cell-count EWMA
+ * values. (A cell sent N ticks before the start of the current tick
+ * has value ewma_scale_factor ** N.)
+ */
+static double ewma_scale_factor = 0.1;
+static int ewma_enabled = 0;
+
+#define EPSILON 0.00001
+#define LOG_ONEHALF -0.69314718055994529
+
+/** Adjust the global cell scale factor based on <b>options</b> */
+void
+cell_ewma_set_scale_factor(or_options_t *options, networkstatus_t *consensus)
+{
+ int32_t halflife_ms;
+ double halflife;
+ const char *source;
+ if (options && options->CircuitPriorityHalflife >= -EPSILON) {
+ halflife = options->CircuitPriorityHalflife;
+ source = "CircuitPriorityHalflife in configuration";
+ } else if (consensus && (halflife_ms = networkstatus_get_param(
+ consensus, "CircuitPriorityHalflifeMsec",
+ -1, -1, INT32_MAX)) >= 0) {
+ halflife = ((double)halflife_ms)/1000.0;
+ source = "CircuitPriorityHalflifeMsec in consensus";
+ } else {
+ halflife = EWMA_DEFAULT_HALFLIFE;
+ source = "Default value";
+ }
+
+ if (halflife <= EPSILON) {
+ /* The cell EWMA algorithm is disabled. */
+ ewma_scale_factor = 0.1;
+ ewma_enabled = 0;
+ log_info(LD_OR,
+ "Disabled cell_ewma algorithm because of value in %s",
+ source);
+ } else {
+ /* convert halflife into halflife-per-tick. */
+ halflife /= EWMA_TICK_LEN;
+ /* compute per-tick scale factor. */
+ ewma_scale_factor = exp( LOG_ONEHALF / halflife );
+ ewma_enabled = 1;
+ log_info(LD_OR,
+ "Enabled cell_ewma algorithm because of value in %s; "
+ "scale factor is %lf per %d seconds",
+ source, ewma_scale_factor, EWMA_TICK_LEN);
+ }
+}
+
+/** Return the multiplier necessary to convert the value of a cell sent in
+ * 'from_tick' to one sent in 'to_tick'. */
+static INLINE double
+get_scale_factor(unsigned from_tick, unsigned to_tick)
+{
+ /* This math can wrap around, but that's okay: unsigned overflow is
+ well-defined */
+ int diff = (int)(to_tick - from_tick);
+ return pow(ewma_scale_factor, diff);
+}
+
+/** Adjust the cell count of <b>ewma</b> so that it is scaled with respect to
+ * <b>cur_tick</b> */
+static void
+scale_single_cell_ewma(cell_ewma_t *ewma, unsigned cur_tick)
+{
+ double factor = get_scale_factor(ewma->last_adjusted_tick, cur_tick);
+ ewma->cell_count *= factor;
+ ewma->last_adjusted_tick = cur_tick;
+}
+
+/** Adjust the cell count of every active circuit on <b>conn</b> so
+ * that they are scaled with respect to <b>cur_tick</b> */
+static void
+scale_active_circuits(or_connection_t *conn, unsigned cur_tick)
+{
+
+ double factor = get_scale_factor(
+ conn->active_circuit_pqueue_last_recalibrated,
+ cur_tick);
+ /** Ordinarily it isn't okay to change the value of an element in a heap,
+ * but it's okay here, since we are preserving the order. */
+ SMARTLIST_FOREACH(conn->active_circuit_pqueue, cell_ewma_t *, e, {
+ tor_assert(e->last_adjusted_tick ==
+ conn->active_circuit_pqueue_last_recalibrated);
+ e->cell_count *= factor;
+ e->last_adjusted_tick = cur_tick;
+ });
+ conn->active_circuit_pqueue_last_recalibrated = cur_tick;
+}
+
+/** Rescale <b>ewma</b> to the same scale as <b>conn</b>, and add it to
+ * <b>conn</b>'s priority queue of active circuits */
+static void
+add_cell_ewma_to_conn(or_connection_t *conn, cell_ewma_t *ewma)
+{
+ tor_assert(ewma->heap_index == -1);
+ scale_single_cell_ewma(ewma,
+ conn->active_circuit_pqueue_last_recalibrated);
+
+ smartlist_pqueue_add(conn->active_circuit_pqueue,
+ compare_cell_ewma_counts,
+ STRUCT_OFFSET(cell_ewma_t, heap_index),
+ ewma);
+}
+
+/** Remove <b>ewma</b> from <b>conn</b>'s priority queue of active circuits */
+static void
+remove_cell_ewma_from_conn(or_connection_t *conn, cell_ewma_t *ewma)
+{
+ tor_assert(ewma->heap_index != -1);
+ smartlist_pqueue_remove(conn->active_circuit_pqueue,
+ compare_cell_ewma_counts,
+ STRUCT_OFFSET(cell_ewma_t, heap_index),
+ ewma);
+}
+
+/** Remove and return the first cell_ewma_t from conn's priority queue of
+ * active circuits. Requires that the priority queue is nonempty. */
+static cell_ewma_t *
+pop_first_cell_ewma_from_conn(or_connection_t *conn)
+{
+ return smartlist_pqueue_pop(conn->active_circuit_pqueue,
+ compare_cell_ewma_counts,
+ STRUCT_OFFSET(cell_ewma_t, heap_index));
+}
+
/** Add <b>circ</b> to the list of circuits with pending cells on
- * <b>conn</b>. No effect if <b>circ</b> is already unlinked. */
+ * <b>conn</b>. No effect if <b>circ</b> is already linked. */
void
make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
{
@@ -1679,6 +2119,8 @@ make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
return;
}
+ assert_active_circuits_ok_paranoid(conn);
+
if (! conn->active_circuits) {
conn->active_circuits = circ;
*prevp = *nextp = circ;
@@ -1690,10 +2132,19 @@ make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
*prev_circ_on_conn_p(head, conn) = circ;
*prevp = old_tail;
}
+
+ if (circ->n_conn == conn) {
+ add_cell_ewma_to_conn(conn, &circ->n_cell_ewma);
+ } else {
+ or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
+ tor_assert(conn == orcirc->p_conn);
+ add_cell_ewma_to_conn(conn, &orcirc->p_cell_ewma);
+ }
+
assert_active_circuits_ok_paranoid(conn);
}
-/** Remove <b>circ</b> to the list of circuits with pending cells on
+/** Remove <b>circ</b> from the list of circuits with pending cells on
* <b>conn</b>. No effect if <b>circ</b> is already unlinked. */
void
make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
@@ -1707,6 +2158,8 @@ make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
return;
}
+ assert_active_circuits_ok_paranoid(conn);
+
tor_assert(next && prev);
tor_assert(*prev_circ_on_conn_p(next, conn) == circ);
tor_assert(*next_circ_on_conn_p(prev, conn) == circ);
@@ -1720,6 +2173,15 @@ make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
conn->active_circuits = next;
}
*prevp = *nextp = NULL;
+
+ if (circ->n_conn == conn) {
+ remove_cell_ewma_from_conn(conn, &circ->n_cell_ewma);
+ } else {
+ or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
+ tor_assert(conn == orcirc->p_conn);
+ remove_cell_ewma_from_conn(conn, &orcirc->p_cell_ewma);
+ }
+
assert_active_circuits_ok_paranoid(conn);
}
@@ -1739,16 +2201,27 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn)
cur = next;
} while (cur != head);
orconn->active_circuits = NULL;
+
+ SMARTLIST_FOREACH(orconn->active_circuit_pqueue, cell_ewma_t *, e,
+ e->heap_index = -1);
+ smartlist_clear(orconn->active_circuit_pqueue);
}
/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
* every edge connection that is using <b>circ</b> to write to <b>orconn</b>,
- * and start or stop reading as appropriate. */
-static void
+ * and start or stop reading as appropriate.
+ *
+ * If <b>stream_id</b> is nonzero, block only the edge connection whose
+ * stream_id matches it.
+ *
+ * Returns the number of streams whose status we changed.
+ */
+static int
set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
- int block)
+ int block, streamid_t stream_id)
{
edge_connection_t *edge = NULL;
+ int n = 0;
if (circ->n_conn == orconn) {
circ->streams_blocked_on_n_conn = block;
if (CIRCUIT_IS_ORIGIN(circ))
@@ -1761,7 +2234,13 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
for (; edge; edge = edge->next_stream) {
connection_t *conn = TO_CONN(edge);
- edge->edge_blocked_on_circ = block;
+ if (stream_id && edge->stream_id != stream_id)
+ continue;
+
+ if (edge->edge_blocked_on_circ != block) {
+ ++n;
+ edge->edge_blocked_on_circ = block;
+ }
if (!conn->read_event) {
/* This connection is a placeholder for something; probably a DNS
@@ -1778,10 +2257,12 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
connection_start_reading(conn);
}
}
+
+ return n;
}
/** Pull as many cells as possible (but no more than <b>max</b>) from the
- * queue of the first active circuit on <b>conn</b>, and write then to
+ * queue of the first active circuit on <b>conn</b>, and write them to
* <b>conn</b>-&gt;outbuf. Return the number of cells written. Advance
* the active circuit pointer to the next active circuit in the ring. */
int
@@ -1792,9 +2273,35 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
cell_queue_t *queue;
circuit_t *circ;
int streams_blocked;
+
+ /* The current (hi-res) time */
+ struct timeval now_hires;
+
+ /* The EWMA cell counter for the circuit we're flushing. */
+ cell_ewma_t *cell_ewma = NULL;
+ double ewma_increment = -1;
+
circ = conn->active_circuits;
if (!circ) return 0;
assert_active_circuits_ok_paranoid(conn);
+
+ /* See if we're doing the ewma circuit selection algorithm. */
+ if (ewma_enabled) {
+ unsigned tick;
+ double fractional_tick;
+ tor_gettimeofday_cached(&now_hires);
+ tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick);
+
+ if (tick != conn->active_circuit_pqueue_last_recalibrated) {
+ scale_active_circuits(conn, tick);
+ }
+
+ ewma_increment = pow(ewma_scale_factor, -fractional_tick);
+
+ cell_ewma = smartlist_get(conn->active_circuit_pqueue, 0);
+ circ = cell_ewma_to_circuit(cell_ewma);
+ }
+
if (circ->n_conn == conn) {
queue = &circ->n_conn_cells;
streams_blocked = circ->streams_blocked_on_n_conn;
@@ -1808,10 +2315,60 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
packed_cell_t *cell = cell_queue_pop(queue);
tor_assert(*next_circ_on_conn_p(circ,conn));
+ /* Calculate the exact time that this cell has spent in the queue. */
+ if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) {
+ struct timeval now;
+ uint32_t flushed;
+ uint32_t cell_waiting_time;
+ insertion_time_queue_t *it_queue = queue->insertion_times;
+ tor_gettimeofday_cached(&now);
+ flushed = (uint32_t)((now.tv_sec % SECONDS_IN_A_DAY) * 100L +
+ (uint32_t)now.tv_usec / (uint32_t)10000L);
+ if (!it_queue || !it_queue->first) {
+ log_info(LD_GENERAL, "Cannot determine insertion time of cell. "
+ "Looks like the CellStatistics option was "
+ "recently enabled.");
+ } else {
+ or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
+ insertion_time_elem_t *elem = it_queue->first;
+ cell_waiting_time =
+ (uint32_t)((flushed * 10L + SECONDS_IN_A_DAY * 1000L -
+ elem->insertion_time * 10L) %
+ (SECONDS_IN_A_DAY * 1000L));
+#undef SECONDS_IN_A_DAY
+ elem->counter--;
+ if (elem->counter < 1) {
+ it_queue->first = elem->next;
+ if (elem == it_queue->last)
+ it_queue->last = NULL;
+ mp_pool_release(elem);
+ }
+ orcirc->total_cell_waiting_time += cell_waiting_time;
+ orcirc->processed_cells++;
+ }
+ }
+
+ /* If we just flushed our queue and this circuit is used for a
+ * tunneled directory request, possibly advance its state. */
+ if (queue->n == 0 && TO_CONN(conn)->dirreq_id)
+ geoip_change_dirreq_state(TO_CONN(conn)->dirreq_id,
+ DIRREQ_TUNNELED,
+ DIRREQ_CIRC_QUEUE_FLUSHED);
+
connection_write_to_buf(cell->body, CELL_NETWORK_SIZE, TO_CONN(conn));
- packed_cell_free(cell);
+ packed_cell_free_unchecked(cell);
++n_flushed;
+ if (cell_ewma) {
+ cell_ewma_t *tmp;
+ cell_ewma->cell_count += ewma_increment;
+ /* We pop and re-add the cell_ewma_t here, not above, since we need to
+ * re-add it immediately to keep the priority queue consistent with
+ * the linked-list implementation */
+ tmp = pop_first_cell_ewma_from_conn(conn);
+ tor_assert(tmp == cell_ewma);
+ add_cell_ewma_to_conn(conn, cell_ewma);
+ }
if (circ != conn->active_circuits) {
/* If this happens, the current circuit just got made inactive by
* a call in connection_write_to_buf(). That's nothing to worry about:
@@ -1829,9 +2386,9 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
/* Is the cell queue low enough to unblock all the streams that are waiting
* to write to this circuit? */
if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
- set_streams_blocked_on_circ(circ, conn, 0); /* unblock streams */
+ set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */
- /* Did we just ran out of cells on this queue? */
+ /* Did we just run out of cells on this circuit's queue? */
if (queue->n == 0) {
log_debug(LD_GENERAL, "Made a circuit inactive.");
make_circuit_inactive_on_conn(circ, conn);
@@ -1846,10 +2403,14 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
* transmitting in <b>direction</b>. */
void
append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
- cell_t *cell, cell_direction_t direction)
+ cell_t *cell, cell_direction_t direction,
+ streamid_t fromstream)
{
cell_queue_t *queue;
int streams_blocked;
+ if (circ->marked_for_close)
+ return;
+
if (direction == CELL_DIRECTION_OUT) {
queue = &circ->n_conn_cells;
streams_blocked = circ->streams_blocked_on_n_conn;
@@ -1868,7 +2429,12 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
/* If we have too many cells on the circuit, we should stop reading from
* the edge streams for a while. */
if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
- set_streams_blocked_on_circ(circ, orconn, 1); /* block streams */
+ set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */
+
+ if (streams_blocked && fromstream) {
+ /* This edge connection is apparently not blocked; block it. */
+ set_streams_blocked_on_circ(circ, orconn, 1, fromstream);
+ }
if (queue->n == 1) {
/* This was the first cell added to the queue. We need to make this
@@ -1947,6 +2513,25 @@ decode_address_from_payload(tor_addr_t *addr_out, const uint8_t *payload,
return payload + 2 + payload[1];
}
+/** Remove all the cells queued on <b>circ</b> for <b>orconn</b>. */
+void
+circuit_clear_cell_queue(circuit_t *circ, or_connection_t *orconn)
+{
+ cell_queue_t *queue;
+ if (circ->n_conn == orconn) {
+ queue = &circ->n_conn_cells;
+ } else {
+ or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
+ tor_assert(orcirc->p_conn == orconn);
+ queue = &orcirc->p_conn_cells;
+ }
+
+ if (queue->n)
+ make_circuit_inactive_on_conn(circ,orconn);
+
+ cell_queue_clear(queue);
+}
+
/** Fail with an assert if the active circuits ring on <b>orconn</b> is
* corrupt. */
void
@@ -1954,16 +2539,44 @@ assert_active_circuits_ok(or_connection_t *orconn)
{
circuit_t *head = orconn->active_circuits;
circuit_t *cur = head;
+ int n = 0;
if (! head)
return;
do {
circuit_t *next = *next_circ_on_conn_p(cur, orconn);
circuit_t *prev = *prev_circ_on_conn_p(cur, orconn);
+ cell_ewma_t *ewma;
tor_assert(next);
tor_assert(prev);
tor_assert(*next_circ_on_conn_p(prev, orconn) == cur);
tor_assert(*prev_circ_on_conn_p(next, orconn) == cur);
+ if (orconn == cur->n_conn) {
+ ewma = &cur->n_cell_ewma;
+ tor_assert(!ewma->is_for_p_conn);
+ } else {
+ ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma;
+ tor_assert(ewma->is_for_p_conn);
+ }
+ tor_assert(ewma->heap_index != -1);
+ tor_assert(ewma == smartlist_get(orconn->active_circuit_pqueue,
+ ewma->heap_index));
+ n++;
cur = next;
} while (cur != head);
+
+ tor_assert(n == smartlist_len(orconn->active_circuit_pqueue));
+}
+
+/** Return 1 if we shouldn't restart reading on this circuit, even if
+ * we get a SENDME. Else return 0.
+*/
+static int
+circuit_queue_streams_are_blocked(circuit_t *circ)
+{
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ return circ->streams_blocked_on_n_conn;
+ } else {
+ return circ->streams_blocked_on_p_conn;
+ }
}