aboutsummaryrefslogtreecommitdiff
path: root/src/core/or/relay.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/or/relay.c')
-rw-r--r--src/core/or/relay.c85
1 files changed, 48 insertions, 37 deletions
diff --git a/src/core/or/relay.c b/src/core/or/relay.c
index 38fb560e34..827f0c3e46 100644
--- a/src/core/or/relay.c
+++ b/src/core/or/relay.c
@@ -122,6 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell,
edge_connection_t *conn,
crypt_path_t *layer_hint,
relay_header_t *rh);
+static void set_block_state_for_streams(edge_connection_t *stream_list,
+ int block, streamid_t stream_id);
/** Stats: how many relay cells have originated at this hop, or have
* been relayed onward (not recognized at this hop)?
@@ -3005,41 +3007,46 @@ channel_unlink_all_circuits(channel_t *chan, smartlist_t *circuits_out)
chan->num_p_circuits = 0;
}
-/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
+/**
+ * Called when a circuit becomes blocked or unblocked due to the channel
+ * cell queue.
+ *
+ * Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
* every edge connection that is using <b>circ</b> to write to <b>chan</b>,
* and start or stop reading as appropriate.
- *
- * If <b>stream_id</b> is nonzero, block only the edge connection whose
- * stream_id matches it.
- *
- * Returns the number of streams whose status we changed.
*/
-static int
-set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
- int block, streamid_t stream_id)
+static void
+set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
{
edge_connection_t *edge = NULL;
- int n = 0;
if (circ->n_chan == chan) {
- circ->streams_blocked_on_n_chan = block;
+ circ->circuit_blocked_on_n_chan = block;
if (CIRCUIT_IS_ORIGIN(circ))
edge = TO_ORIGIN_CIRCUIT(circ)->p_streams;
} else {
- circ->streams_blocked_on_p_chan = block;
+ circ->circuit_blocked_on_p_chan = block;
tor_assert(!CIRCUIT_IS_ORIGIN(circ));
edge = TO_OR_CIRCUIT(circ)->n_streams;
}
- for (; edge; edge = edge->next_stream) {
+ set_block_state_for_streams(edge, block, 0);
+}
+
+/**
+ * Helper function to block or unblock streams in a stream list.
+ *
+ * If <b>stream_id</id> is 0, apply the <b>block</b> state to all streams
+ * in the stream list. If it is non-zero, only apply to that specific stream.
+ */
+static void
+set_block_state_for_streams(edge_connection_t *stream_list, int block,
+ streamid_t stream_id)
+{
+ for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) {
connection_t *conn = TO_CONN(edge);
if (stream_id && edge->stream_id != stream_id)
continue;
- if (edge->edge_blocked_on_circ != block) {
- ++n;
- edge->edge_blocked_on_circ = block;
- }
-
if (!conn->read_event) {
/* This connection is a placeholder for something; probably a DNS
* request. It can't actually stop or start reading.*/
@@ -3055,8 +3062,6 @@ set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
connection_start_reading(conn);
}
}
-
- return n;
}
/** Extract the command from a packed cell. */
@@ -3094,7 +3099,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
destroy_cell_queue_t *destroy_queue=NULL;
circuit_t *circ;
or_circuit_t *or_circ;
- int streams_blocked;
+ int circ_blocked;
packed_cell_t *cell;
/* Get the cmux */
@@ -3134,12 +3139,12 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
if (circ->n_chan == chan) {
queue = &circ->n_chan_cells;
- streams_blocked = circ->streams_blocked_on_n_chan;
+ circ_blocked = circ->circuit_blocked_on_n_chan;
} else {
or_circ = TO_OR_CIRCUIT(circ);
tor_assert(or_circ->p_chan == chan);
queue = &TO_OR_CIRCUIT(circ)->p_chan_cells;
- streams_blocked = circ->streams_blocked_on_p_chan;
+ circ_blocked = circ->circuit_blocked_on_p_chan;
}
/* Circuitmux told us this was active, so it should have cells.
@@ -3240,8 +3245,8 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
/* Is the cell queue low enough to unblock all the streams that are waiting
* to write to this circuit? */
- if (streams_blocked && queue->n <= cell_queue_lowwatermark())
- set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
+ if (circ_blocked && queue->n <= cell_queue_lowwatermark())
+ set_circuit_blocked_on_chan(circ, chan, 0); /* unblock streams */
/* If n_flushed < max still, loop around and pick another circuit */
}
@@ -3346,9 +3351,10 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
streamid_t fromstream)
{
or_circuit_t *orcirc = NULL;
+ edge_connection_t *stream_list = NULL;
cell_queue_t *queue;
int32_t max_queue_size;
- int streams_blocked;
+ int circ_blocked;
int exitward;
if (circ->marked_for_close)
return;
@@ -3356,13 +3362,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
exitward = (direction == CELL_DIRECTION_OUT);
if (exitward) {
queue = &circ->n_chan_cells;
- streams_blocked = circ->streams_blocked_on_n_chan;
+ circ_blocked = circ->circuit_blocked_on_n_chan;
max_queue_size = max_circuit_cell_queue_size_out;
+ if (CIRCUIT_IS_ORIGIN(circ))
+ stream_list = TO_ORIGIN_CIRCUIT(circ)->p_streams;
} else {
orcirc = TO_OR_CIRCUIT(circ);
queue = &orcirc->p_chan_cells;
- streams_blocked = circ->streams_blocked_on_p_chan;
+ circ_blocked = circ->circuit_blocked_on_p_chan;
max_queue_size = max_circuit_cell_queue_size;
+ stream_list = TO_OR_CIRCUIT(circ)->n_streams;
}
if (PREDICT_UNLIKELY(queue->n >= max_queue_size)) {
@@ -3395,14 +3404,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
return;
}
- /* If we have too many cells on the circuit, we should stop reading from
- * the edge streams for a while. */
- if (!streams_blocked && queue->n >= cell_queue_highwatermark())
- set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */
+ /* If we have too many cells on the circuit, note that it should
+ * be blocked from new cells. */
+ if (!circ_blocked && queue->n >= cell_queue_highwatermark())
+ set_circuit_blocked_on_chan(circ, chan, 1);
- if (streams_blocked && fromstream) {
- /* This edge connection is apparently not blocked; block it. */
- set_streams_blocked_on_circ(circ, chan, 1, fromstream);
+ if (circ_blocked && fromstream) {
+ /* This edge connection is apparently not blocked; this can happen for
+ * new streams on a blocked circuit, for their CONNECTED response.
+ * block it now. */
+ set_block_state_for_streams(stream_list, 1, fromstream);
}
update_circuit_on_cmux(circ, direction);
@@ -3508,8 +3519,8 @@ static int
circuit_queue_streams_are_blocked(circuit_t *circ)
{
if (CIRCUIT_IS_ORIGIN(circ)) {
- return circ->streams_blocked_on_n_chan;
+ return circ->circuit_blocked_on_n_chan;
} else {
- return circ->streams_blocked_on_p_chan;
+ return circ->circuit_blocked_on_p_chan;
}
}