aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Perry <mikeperry-git@torproject.org>2023-01-20 19:14:33 +0000
committerMike Perry <mikeperry-git@torproject.org>2023-04-06 15:57:10 +0000
commit21c861bfa3188444798a35e21f26579dd910a452 (patch)
treebd09bd363fa67cee794aa1bc1a1e4268ae70155e
parenta4ee0c29ee52052f82692f0825a50e1a55e01e5c (diff)
downloadtor-21c861bfa3188444798a35e21f26579dd910a452.tar.gz
tor-21c861bfa3188444798a35e21f26579dd910a452.zip
Refactor stream blocking due to channel cell queues
Streams can get blocked on a circuit in two ways: 1. When the circuit package window is full 2. When the channel's cell queue is too high Conflux needs to decouple stream blocking from both of these conditions, because streams can continue on another circuit, even if the primary circuit is blocked for either of these cases. However, both conflux and congestion control need to know if the channel's cell queue hit the highwatermark and is still draining, because this condition is used by those components, independent of stream state. Therefore, this commit renames the 'streams_blocked_on_chan' variable to signify that it refers to the cell queue state, and also refactors the actual stream blocking bits out, so they can be handled separately if conflux is present.
-rw-r--r--src/core/mainloop/mainloop.c2
-rw-r--r--src/core/mainloop/mainloop.h2
-rw-r--r--src/core/or/circuit_st.h8
-rw-r--r--src/core/or/circuituse.c3
-rw-r--r--src/core/or/congestion_control_common.c4
-rw-r--r--src/core/or/edge_connection_st.h3
-rw-r--r--src/core/or/relay.c85
-rw-r--r--src/test/fakecircs.c4
8 files changed, 60 insertions, 51 deletions
diff --git a/src/core/mainloop/mainloop.c b/src/core/mainloop/mainloop.c
index a1ea32220a..e1c9786b2e 100644
--- a/src/core/mainloop/mainloop.c
+++ b/src/core/mainloop/mainloop.c
@@ -497,7 +497,7 @@ connection_watch_events(connection_t *conn, watchable_events_t events)
/** Return true iff <b>conn</b> is listening for read events. */
int
-connection_is_reading(connection_t *conn)
+connection_is_reading(const connection_t *conn)
{
tor_assert(conn);
diff --git a/src/core/mainloop/mainloop.h b/src/core/mainloop/mainloop.h
index 98d0b3a058..64782c1318 100644
--- a/src/core/mainloop/mainloop.h
+++ b/src/core/mainloop/mainloop.h
@@ -38,7 +38,7 @@ typedef enum watchable_events {
WRITE_EVENT=0x04 /**< We want to know when a connection is writable */
} watchable_events_t;
void connection_watch_events(connection_t *conn, watchable_events_t events);
-int connection_is_reading(connection_t *conn);
+int connection_is_reading(const connection_t *conn);
MOCK_DECL(void,connection_stop_reading,(connection_t *conn));
MOCK_DECL(void,connection_start_reading,(connection_t *conn));
diff --git a/src/core/or/circuit_st.h b/src/core/or/circuit_st.h
index 7f39c9337e..1afb4d4426 100644
--- a/src/core/or/circuit_st.h
+++ b/src/core/or/circuit_st.h
@@ -88,11 +88,11 @@ struct circuit_t {
extend_info_t *n_hop;
/** True iff we are waiting for n_chan_cells to become less full before
- * allowing p_streams to add any more cells. (Origin circuit only.) */
- unsigned int streams_blocked_on_n_chan : 1;
+ * allowing any more cells on this circuit. (Origin circuit only.) */
+ unsigned int circuit_blocked_on_n_chan : 1;
/** True iff we are waiting for p_chan_cells to become less full before
- * allowing n_streams to add any more cells. (OR circuit only.) */
- unsigned int streams_blocked_on_p_chan : 1;
+ * allowing any more cells on this circuit. (OR circuit only.) */
+ unsigned int circuit_blocked_on_p_chan : 1;
/** True iff we have queued a delete backwards on this circuit, but not put
* it on the output buffer. */
diff --git a/src/core/or/circuituse.c b/src/core/or/circuituse.c
index 9110252976..25401aea55 100644
--- a/src/core/or/circuituse.c
+++ b/src/core/or/circuituse.c
@@ -63,6 +63,7 @@
#include "lib/math/fp.h"
#include "lib/time/tvdiff.h"
#include "lib/trace/events.h"
+#include "src/core/mainloop/mainloop.h"
#include "core/or/cpath_build_state_st.h"
#include "feature/dircommon/dir_connection_st.h"
@@ -938,7 +939,7 @@ circuit_log_ancient_one_hop_circuits(int age)
c->marked_for_close,
c->hold_open_until_flushed ? "" : "not ",
conn->edge_has_sent_end ? "" : "not ",
- conn->edge_blocked_on_circ ? "Blocked" : "Not blocked");
+ connection_is_reading(c) ? "Not blocked" : "Blocked");
if (! c->linked_conn)
continue;
diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c
index 920b57cf00..c7c950d0c8 100644
--- a/src/core/or/congestion_control_common.c
+++ b/src/core/or/congestion_control_common.c
@@ -954,11 +954,11 @@ congestion_control_update_circuit_bdp(congestion_control_t *cc,
if (CIRCUIT_IS_ORIGIN(circ)) {
/* origin circs use n_chan */
chan_q = circ->n_chan_cells.n;
- blocked_on_chan = circ->streams_blocked_on_n_chan;
+ blocked_on_chan = circ->circuit_blocked_on_n_chan;
} else {
/* Both onion services and exits use or_circuit and p_chan */
chan_q = CONST_TO_OR_CIRCUIT(circ)->p_chan_cells.n;
- blocked_on_chan = circ->streams_blocked_on_p_chan;
+ blocked_on_chan = circ->circuit_blocked_on_p_chan;
}
/* If we have no EWMA RTT, it is because monotime has been stalled
diff --git a/src/core/or/edge_connection_st.h b/src/core/or/edge_connection_st.h
index 942991f139..22f9040d15 100644
--- a/src/core/or/edge_connection_st.h
+++ b/src/core/or/edge_connection_st.h
@@ -66,9 +66,6 @@ struct edge_connection_t {
* connections. Set once we've set the stream end,
* and check in connection_about_to_close_connection().
*/
- /** True iff we've blocked reading until the circuit has fewer queued
- * cells. */
- unsigned int edge_blocked_on_circ:1;
/** Unique ID for directory requests; this used to be in connection_t, but
* that's going away and being used on channels instead. We still tag
diff --git a/src/core/or/relay.c b/src/core/or/relay.c
index 38fb560e34..827f0c3e46 100644
--- a/src/core/or/relay.c
+++ b/src/core/or/relay.c
@@ -122,6 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell,
edge_connection_t *conn,
crypt_path_t *layer_hint,
relay_header_t *rh);
+static void set_block_state_for_streams(edge_connection_t *stream_list,
+ int block, streamid_t stream_id);
/** Stats: how many relay cells have originated at this hop, or have
* been relayed onward (not recognized at this hop)?
@@ -3005,41 +3007,46 @@ channel_unlink_all_circuits(channel_t *chan, smartlist_t *circuits_out)
chan->num_p_circuits = 0;
}
-/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
+/**
+ * Called when a circuit becomes blocked or unblocked due to the channel
+ * cell queue.
+ *
+ * Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
* every edge connection that is using <b>circ</b> to write to <b>chan</b>,
* and start or stop reading as appropriate.
- *
- * If <b>stream_id</b> is nonzero, block only the edge connection whose
- * stream_id matches it.
- *
- * Returns the number of streams whose status we changed.
*/
-static int
-set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
- int block, streamid_t stream_id)
+static void
+set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
{
edge_connection_t *edge = NULL;
- int n = 0;
if (circ->n_chan == chan) {
- circ->streams_blocked_on_n_chan = block;
+ circ->circuit_blocked_on_n_chan = block;
if (CIRCUIT_IS_ORIGIN(circ))
edge = TO_ORIGIN_CIRCUIT(circ)->p_streams;
} else {
- circ->streams_blocked_on_p_chan = block;
+ circ->circuit_blocked_on_p_chan = block;
tor_assert(!CIRCUIT_IS_ORIGIN(circ));
edge = TO_OR_CIRCUIT(circ)->n_streams;
}
- for (; edge; edge = edge->next_stream) {
+ set_block_state_for_streams(edge, block, 0);
+}
+
+/**
+ * Helper function to block or unblock streams in a stream list.
+ *
+ * If <b>stream_id</id> is 0, apply the <b>block</b> state to all streams
+ * in the stream list. If it is non-zero, only apply to that specific stream.
+ */
+static void
+set_block_state_for_streams(edge_connection_t *stream_list, int block,
+ streamid_t stream_id)
+{
+ for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) {
connection_t *conn = TO_CONN(edge);
if (stream_id && edge->stream_id != stream_id)
continue;
- if (edge->edge_blocked_on_circ != block) {
- ++n;
- edge->edge_blocked_on_circ = block;
- }
-
if (!conn->read_event) {
/* This connection is a placeholder for something; probably a DNS
* request. It can't actually stop or start reading.*/
@@ -3055,8 +3062,6 @@ set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
connection_start_reading(conn);
}
}
-
- return n;
}
/** Extract the command from a packed cell. */
@@ -3094,7 +3099,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
destroy_cell_queue_t *destroy_queue=NULL;
circuit_t *circ;
or_circuit_t *or_circ;
- int streams_blocked;
+ int circ_blocked;
packed_cell_t *cell;
/* Get the cmux */
@@ -3134,12 +3139,12 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
if (circ->n_chan == chan) {
queue = &circ->n_chan_cells;
- streams_blocked = circ->streams_blocked_on_n_chan;
+ circ_blocked = circ->circuit_blocked_on_n_chan;
} else {
or_circ = TO_OR_CIRCUIT(circ);
tor_assert(or_circ->p_chan == chan);
queue = &TO_OR_CIRCUIT(circ)->p_chan_cells;
- streams_blocked = circ->streams_blocked_on_p_chan;
+ circ_blocked = circ->circuit_blocked_on_p_chan;
}
/* Circuitmux told us this was active, so it should have cells.
@@ -3240,8 +3245,8 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
/* Is the cell queue low enough to unblock all the streams that are waiting
* to write to this circuit? */
- if (streams_blocked && queue->n <= cell_queue_lowwatermark())
- set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
+ if (circ_blocked && queue->n <= cell_queue_lowwatermark())
+ set_circuit_blocked_on_chan(circ, chan, 0); /* unblock streams */
/* If n_flushed < max still, loop around and pick another circuit */
}
@@ -3346,9 +3351,10 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
streamid_t fromstream)
{
or_circuit_t *orcirc = NULL;
+ edge_connection_t *stream_list = NULL;
cell_queue_t *queue;
int32_t max_queue_size;
- int streams_blocked;
+ int circ_blocked;
int exitward;
if (circ->marked_for_close)
return;
@@ -3356,13 +3362,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
exitward = (direction == CELL_DIRECTION_OUT);
if (exitward) {
queue = &circ->n_chan_cells;
- streams_blocked = circ->streams_blocked_on_n_chan;
+ circ_blocked = circ->circuit_blocked_on_n_chan;
max_queue_size = max_circuit_cell_queue_size_out;
+ if (CIRCUIT_IS_ORIGIN(circ))
+ stream_list = TO_ORIGIN_CIRCUIT(circ)->p_streams;
} else {
orcirc = TO_OR_CIRCUIT(circ);
queue = &orcirc->p_chan_cells;
- streams_blocked = circ->streams_blocked_on_p_chan;
+ circ_blocked = circ->circuit_blocked_on_p_chan;
max_queue_size = max_circuit_cell_queue_size;
+ stream_list = TO_OR_CIRCUIT(circ)->n_streams;
}
if (PREDICT_UNLIKELY(queue->n >= max_queue_size)) {
@@ -3395,14 +3404,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
return;
}
- /* If we have too many cells on the circuit, we should stop reading from
- * the edge streams for a while. */
- if (!streams_blocked && queue->n >= cell_queue_highwatermark())
- set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */
+ /* If we have too many cells on the circuit, note that it should
+ * be blocked from new cells. */
+ if (!circ_blocked && queue->n >= cell_queue_highwatermark())
+ set_circuit_blocked_on_chan(circ, chan, 1);
- if (streams_blocked && fromstream) {
- /* This edge connection is apparently not blocked; block it. */
- set_streams_blocked_on_circ(circ, chan, 1, fromstream);
+ if (circ_blocked && fromstream) {
+ /* This edge connection is apparently not blocked; this can happen for
+ * new streams on a blocked circuit, for their CONNECTED response.
+ * block it now. */
+ set_block_state_for_streams(stream_list, 1, fromstream);
}
update_circuit_on_cmux(circ, direction);
@@ -3508,8 +3519,8 @@ static int
circuit_queue_streams_are_blocked(circuit_t *circ)
{
if (CIRCUIT_IS_ORIGIN(circ)) {
- return circ->streams_blocked_on_n_chan;
+ return circ->circuit_blocked_on_n_chan;
} else {
- return circ->streams_blocked_on_p_chan;
+ return circ->circuit_blocked_on_p_chan;
}
}
diff --git a/src/test/fakecircs.c b/src/test/fakecircs.c
index cca3b43483..caeacd84ef 100644
--- a/src/test/fakecircs.c
+++ b/src/test/fakecircs.c
@@ -41,8 +41,8 @@ new_fake_orcirc(channel_t *nchan, channel_t *pchan)
cell_queue_init(&(circ->n_chan_cells));
circ->n_hop = NULL;
- circ->streams_blocked_on_n_chan = 0;
- circ->streams_blocked_on_p_chan = 0;
+ circ->circuit_blocked_on_n_chan = 0;
+ circ->circuit_blocked_on_p_chan = 0;
circ->n_delete_pending = 0;
circ->p_delete_pending = 0;
circ->received_destroy = 0;