diff options
Diffstat (limited to 'src/core/or/relay.c')
-rw-r--r-- | src/core/or/relay.c | 85 |
1 files changed, 48 insertions, 37 deletions
diff --git a/src/core/or/relay.c b/src/core/or/relay.c index 38fb560e34..827f0c3e46 100644 --- a/src/core/or/relay.c +++ b/src/core/or/relay.c @@ -122,6 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell, edge_connection_t *conn, crypt_path_t *layer_hint, relay_header_t *rh); +static void set_block_state_for_streams(edge_connection_t *stream_list, + int block, streamid_t stream_id); /** Stats: how many relay cells have originated at this hop, or have * been relayed onward (not recognized at this hop)? @@ -3005,41 +3007,46 @@ channel_unlink_all_circuits(channel_t *chan, smartlist_t *circuits_out) chan->num_p_circuits = 0; } -/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false) +/** + * Called when a circuit becomes blocked or unblocked due to the channel + * cell queue. + * + * Block (if <b>block</b> is true) or unblock (if <b>block</b> is false) * every edge connection that is using <b>circ</b> to write to <b>chan</b>, * and start or stop reading as appropriate. - * - * If <b>stream_id</b> is nonzero, block only the edge connection whose - * stream_id matches it. - * - * Returns the number of streams whose status we changed. */ -static int -set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan, - int block, streamid_t stream_id) +static void +set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block) { edge_connection_t *edge = NULL; - int n = 0; if (circ->n_chan == chan) { - circ->streams_blocked_on_n_chan = block; + circ->circuit_blocked_on_n_chan = block; if (CIRCUIT_IS_ORIGIN(circ)) edge = TO_ORIGIN_CIRCUIT(circ)->p_streams; } else { - circ->streams_blocked_on_p_chan = block; + circ->circuit_blocked_on_p_chan = block; tor_assert(!CIRCUIT_IS_ORIGIN(circ)); edge = TO_OR_CIRCUIT(circ)->n_streams; } - for (; edge; edge = edge->next_stream) { + set_block_state_for_streams(edge, block, 0); +} + +/** + * Helper function to block or unblock streams in a stream list. + * + * If <b>stream_id</id> is 0, apply the <b>block</b> state to all streams + * in the stream list. If it is non-zero, only apply to that specific stream. + */ +static void +set_block_state_for_streams(edge_connection_t *stream_list, int block, + streamid_t stream_id) +{ + for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) { connection_t *conn = TO_CONN(edge); if (stream_id && edge->stream_id != stream_id) continue; - if (edge->edge_blocked_on_circ != block) { - ++n; - edge->edge_blocked_on_circ = block; - } - if (!conn->read_event) { /* This connection is a placeholder for something; probably a DNS * request. It can't actually stop or start reading.*/ @@ -3055,8 +3062,6 @@ set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan, connection_start_reading(conn); } } - - return n; } /** Extract the command from a packed cell. */ @@ -3094,7 +3099,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) destroy_cell_queue_t *destroy_queue=NULL; circuit_t *circ; or_circuit_t *or_circ; - int streams_blocked; + int circ_blocked; packed_cell_t *cell; /* Get the cmux */ @@ -3134,12 +3139,12 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) if (circ->n_chan == chan) { queue = &circ->n_chan_cells; - streams_blocked = circ->streams_blocked_on_n_chan; + circ_blocked = circ->circuit_blocked_on_n_chan; } else { or_circ = TO_OR_CIRCUIT(circ); tor_assert(or_circ->p_chan == chan); queue = &TO_OR_CIRCUIT(circ)->p_chan_cells; - streams_blocked = circ->streams_blocked_on_p_chan; + circ_blocked = circ->circuit_blocked_on_p_chan; } /* Circuitmux told us this was active, so it should have cells. @@ -3240,8 +3245,8 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) /* Is the cell queue low enough to unblock all the streams that are waiting * to write to this circuit? */ - if (streams_blocked && queue->n <= cell_queue_lowwatermark()) - set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */ + if (circ_blocked && queue->n <= cell_queue_lowwatermark()) + set_circuit_blocked_on_chan(circ, chan, 0); /* unblock streams */ /* If n_flushed < max still, loop around and pick another circuit */ } @@ -3346,9 +3351,10 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, streamid_t fromstream) { or_circuit_t *orcirc = NULL; + edge_connection_t *stream_list = NULL; cell_queue_t *queue; int32_t max_queue_size; - int streams_blocked; + int circ_blocked; int exitward; if (circ->marked_for_close) return; @@ -3356,13 +3362,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, exitward = (direction == CELL_DIRECTION_OUT); if (exitward) { queue = &circ->n_chan_cells; - streams_blocked = circ->streams_blocked_on_n_chan; + circ_blocked = circ->circuit_blocked_on_n_chan; max_queue_size = max_circuit_cell_queue_size_out; + if (CIRCUIT_IS_ORIGIN(circ)) + stream_list = TO_ORIGIN_CIRCUIT(circ)->p_streams; } else { orcirc = TO_OR_CIRCUIT(circ); queue = &orcirc->p_chan_cells; - streams_blocked = circ->streams_blocked_on_p_chan; + circ_blocked = circ->circuit_blocked_on_p_chan; max_queue_size = max_circuit_cell_queue_size; + stream_list = TO_OR_CIRCUIT(circ)->n_streams; } if (PREDICT_UNLIKELY(queue->n >= max_queue_size)) { @@ -3395,14 +3404,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, return; } - /* If we have too many cells on the circuit, we should stop reading from - * the edge streams for a while. */ - if (!streams_blocked && queue->n >= cell_queue_highwatermark()) - set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */ + /* If we have too many cells on the circuit, note that it should + * be blocked from new cells. */ + if (!circ_blocked && queue->n >= cell_queue_highwatermark()) + set_circuit_blocked_on_chan(circ, chan, 1); - if (streams_blocked && fromstream) { - /* This edge connection is apparently not blocked; block it. */ - set_streams_blocked_on_circ(circ, chan, 1, fromstream); + if (circ_blocked && fromstream) { + /* This edge connection is apparently not blocked; this can happen for + * new streams on a blocked circuit, for their CONNECTED response. + * block it now. */ + set_block_state_for_streams(stream_list, 1, fromstream); } update_circuit_on_cmux(circ, direction); @@ -3508,8 +3519,8 @@ static int circuit_queue_streams_are_blocked(circuit_t *circ) { if (CIRCUIT_IS_ORIGIN(circ)) { - return circ->streams_blocked_on_n_chan; + return circ->circuit_blocked_on_n_chan; } else { - return circ->streams_blocked_on_p_chan; + return circ->circuit_blocked_on_p_chan; } } |