diff options
author | Nick Mathewson <nickm@torproject.org> | 2010-09-08 10:49:24 -0400 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2010-09-08 10:49:24 -0400 |
commit | aa42f941dc58ff7554e31dc6f15e4d23f639f351 (patch) | |
tree | 820fd82c59c1ab5cdbd01faaaf2a668d96417de4 /src/or | |
parent | 669fd05ed8cf7b90bb9e43dbbbbd35f40bf05fa0 (diff) | |
parent | 296a7d83880d75296d6665295f9fc4cb41cb63d8 (diff) | |
download | tor-aa42f941dc58ff7554e31dc6f15e4d23f639f351.tar.gz tor-aa42f941dc58ff7554e31dc6f15e4d23f639f351.zip |
Merge branch 'bug1653'
Diffstat (limited to 'src/or')
-rw-r--r-- | src/or/circuitbuild.c | 4 | ||||
-rw-r--r-- | src/or/relay.c | 67 | ||||
-rw-r--r-- | src/or/relay.h | 3 |
3 files changed, 58 insertions, 16 deletions
diff --git a/src/or/circuitbuild.c b/src/or/circuitbuild.c index e5e7d22195..5567b246ab 100644 --- a/src/or/circuitbuild.c +++ b/src/or/circuitbuild.c @@ -1752,7 +1752,7 @@ circuit_deliver_create_cell(circuit_t *circ, uint8_t cell_type, cell.circ_id = circ->n_circ_id; memcpy(cell.payload, payload, ONIONSKIN_CHALLENGE_LEN); - append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT); + append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT, 0); if (CIRCUIT_IS_ORIGIN(circ)) { /* mark it so it gets better rate limiting treatment. */ @@ -2329,7 +2329,7 @@ onionskin_answer(or_circuit_t *circ, uint8_t cell_type, const char *payload, circ->is_first_hop = (cell_type == CELL_CREATED_FAST); append_cell_to_circuit_queue(TO_CIRCUIT(circ), - circ->p_conn, &cell, CELL_DIRECTION_IN); + circ->p_conn, &cell, CELL_DIRECTION_IN, 0); log_debug(LD_CIRC,"Finished sending 'created' cell."); if (!is_local_addr(&circ->p_conn->_base.addr) && diff --git a/src/or/relay.c b/src/or/relay.c index b85cc84c52..d0986c8d4e 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -52,6 +52,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *conn, crypt_path_t *layer_hint); static int circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint); +static int circuit_queue_streams_are_blocked(circuit_t *circ); /** Cache the current hi-res time; the cache gets reset when libevent * calls us. */ @@ -268,7 +269,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ, * we might kill the circ before we relay * the cells. */ - append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction); + append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0); return 0; } @@ -365,7 +366,7 @@ relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, static int circuit_package_relay_cell(cell_t *cell, circuit_t *circ, cell_direction_t cell_direction, - crypt_path_t *layer_hint) + crypt_path_t *layer_hint, uint16_t on_stream) { or_connection_t *conn; /* where to send the cell */ @@ -409,7 +410,7 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ, } ++stats_n_relay_cells_relayed; - append_cell_to_circuit_queue(circ, conn, cell, cell_direction); + append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream); return 0; } @@ -624,8 +625,8 @@ relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ, } } - if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer) - < 0) { + if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer, + stream_id) < 0) { log_warn(LD_BUG,"circuit_package_relay_cell failed. Closing."); circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL); return -1; @@ -1236,6 +1237,10 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, conn->package_window += STREAMWINDOW_INCREMENT; log_debug(domain,"stream-level sendme, packagewindow now %d.", conn->package_window); + if (circuit_queue_streams_are_blocked(circ)) { + /* Still waiting for queue to flush; don't touch conn */ + return 0; + } connection_start_reading(TO_CONN(conn)); /* handle whatever might still be on the inbuf */ if (connection_edge_package_raw_inbuf(conn, 1) < 0) { @@ -1436,7 +1441,10 @@ connection_edge_consider_sending_sendme(edge_connection_t *conn) static void circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) { - + if (circuit_queue_streams_are_blocked(circ)) { + log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming"); + return; + } log_debug(layer_hint?LD_APP:LD_EXIT,"resuming"); if (CIRCUIT_IS_ORIGIN(circ)) @@ -2092,12 +2100,19 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn) /** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false) * every edge connection that is using <b>circ</b> to write to <b>orconn</b>, - * and start or stop reading as appropriate. */ -static void + * and start or stop reading as appropriate. + * + * If <b>stream_id</b> is nonzero, block only the edge connection whose + * stream_id matches it. + * + * Returns the number of streams whose status we changed. + */ +static int set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, - int block) + int block, uint16_t stream_id) { edge_connection_t *edge = NULL; + int n = 0; if (circ->n_conn == orconn) { circ->streams_blocked_on_n_conn = block; if (CIRCUIT_IS_ORIGIN(circ)) @@ -2110,7 +2125,13 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, for (; edge; edge = edge->next_stream) { connection_t *conn = TO_CONN(edge); - edge->edge_blocked_on_circ = block; + if (stream_id && edge->stream_id != stream_id) + continue; + + if (edge->edge_blocked_on_circ != block) { + ++n; + edge->edge_blocked_on_circ = block; + } if (!conn->read_event) { /* This connection is a placeholder for something; probably a DNS @@ -2127,6 +2148,8 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, connection_start_reading(conn); } } + + return n; } /** Pull as many cells as possible (but no more than <b>max</b>) from the @@ -2252,7 +2275,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, /* Is the cell queue low enough to unblock all the streams that are waiting * to write to this circuit? */ if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) - set_streams_blocked_on_circ(circ, conn, 0); /* unblock streams */ + set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */ /* Did we just run out of cells on this circuit's queue? */ if (queue->n == 0) { @@ -2269,7 +2292,8 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, * transmitting in <b>direction</b>. */ void append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, - cell_t *cell, cell_direction_t direction) + cell_t *cell, cell_direction_t direction, + uint16_t fromstream) { cell_queue_t *queue; int streams_blocked; @@ -2291,7 +2315,12 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, /* If we have too many cells on the circuit, we should stop reading from * the edge streams for a while. */ if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE) - set_streams_blocked_on_circ(circ, orconn, 1); /* block streams */ + set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */ + + if (streams_blocked && fromstream) { + /* This edge connection is apparently not blocked; block it. */ + set_streams_blocked_on_circ(circ, orconn, 1, fromstream); + } if (queue->n == 1) { /* This was the first cell added to the queue. We need to make this @@ -2405,3 +2434,15 @@ assert_active_circuits_ok(or_connection_t *orconn) tor_assert(n == smartlist_len(orconn->active_circuit_pqueue)); } +/** Return 1 if we shouldn't restart reading on this circuit, even if + * we get a SENDME. Else return 0. +*/ +static int +circuit_queue_streams_are_blocked(circuit_t *circ) +{ + if (CIRCUIT_IS_ORIGIN(circ)) { + return circ->streams_blocked_on_n_conn; + } else { + return circ->streams_blocked_on_p_conn; + } +} diff --git a/src/or/relay.h b/src/or/relay.h index 73855a52bf..088ef3228a 100644 --- a/src/or/relay.h +++ b/src/or/relay.h @@ -45,7 +45,8 @@ void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell); void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell); void append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, - cell_t *cell, cell_direction_t direction); + cell_t *cell, cell_direction_t direction, + uint16_t fromstream); void connection_or_unlink_all_active_circs(or_connection_t *conn); int connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, time_t now); |