aboutsummaryrefslogtreecommitdiff
path: root/src/core/or/relay.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/or/relay.c')
-rw-r--r--src/core/or/relay.c127
1 files changed, 74 insertions, 53 deletions
diff --git a/src/core/or/relay.c b/src/core/or/relay.c
index 827f0c3e46..58e48df902 100644
--- a/src/core/or/relay.c
+++ b/src/core/or/relay.c
@@ -122,7 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell,
edge_connection_t *conn,
crypt_path_t *layer_hint,
relay_header_t *rh);
-static void set_block_state_for_streams(edge_connection_t *stream_list,
+static void set_block_state_for_streams(circuit_t *circ,
+ edge_connection_t *stream_list,
int block, streamid_t stream_id);
/** Stats: how many relay cells have originated at this hop, or have
@@ -455,7 +456,7 @@ relay_lookup_conn(circuit_t *circ, cell_t *cell,
tmpconn=tmpconn->next_stream) {
if (rh.stream_id == tmpconn->stream_id &&
!tmpconn->base_.marked_for_close &&
- tmpconn->cpath_layer == layer_hint) {
+ edge_uses_cpath(tmpconn, layer_hint)) {
log_debug(LD_APP,"found conn for stream %d.", rh.stream_id);
return tmpconn;
}
@@ -1549,25 +1550,6 @@ connection_edge_process_relay_cell_not_open(
// return -1;
}
-/**
- * Return true iff our decryption layer_hint is from the last hop
- * in a circuit.
- */
-static bool
-relay_crypt_from_last_hop(origin_circuit_t *circ, crypt_path_t *layer_hint)
-{
- tor_assert(circ);
- tor_assert(layer_hint);
- tor_assert(circ->cpath);
-
- if (layer_hint != circ->cpath->prev) {
- log_fn(LOG_PROTOCOL_WARN, LD_CIRC,
- "Got unexpected relay data from intermediate hop");
- return false;
- }
- return true;
-}
-
/** Process a SENDME cell that arrived on <b>circ</b>. If it is a stream level
* cell, it is destined for the given <b>conn</b>. If it is a circuit level
* cell, it is destined for the <b>layer_hint</b>. The <b>domain</b> is the
@@ -2454,6 +2436,15 @@ circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming");
return;
}
+
+ /* If we have a conflux negotiated, and it still can't send on
+ * any circuit, then do not resume sending. */
+ if (circ->conflux && !conflux_can_send(circ->conflux)) {
+ log_debug(layer_hint?LD_APP:LD_EXIT,
+ "Conflux can't send, not resuming edges");
+ return;
+ }
+
log_debug(layer_hint?LD_APP:LD_EXIT,"resuming");
if (CIRCUIT_IS_ORIGIN(circ))
@@ -2487,20 +2478,6 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
return 0;
}
- /* How many cells do we have space for? It will be the minimum of
- * the number needed to exhaust the package window, and the minimum
- * needed to fill the cell queue. */
-
- max_to_package = congestion_control_get_package_window(circ, layer_hint);
- if (CIRCUIT_IS_ORIGIN(circ)) {
- cells_on_queue = circ->n_chan_cells.n;
- } else {
- or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
- cells_on_queue = or_circ->p_chan_cells.n;
- }
- if (cell_queue_highwatermark() - cells_on_queue < max_to_package)
- max_to_package = cell_queue_highwatermark() - cells_on_queue;
-
/* Once we used to start listening on the streams in the order they
* appeared in the linked list. That leads to starvation on the
* streams that appeared later on the list, since the first streams
@@ -2539,11 +2516,13 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
/* Activate reading starting from the chosen stream */
for (conn=chosen_stream; conn; conn = conn->next_stream) {
/* Start reading for the streams starting from here */
- if (conn->base_.marked_for_close || conn->package_window <= 0 ||
- conn->xoff_received)
+ if (conn->base_.marked_for_close || conn->package_window <= 0)
continue;
- if (!layer_hint || conn->cpath_layer == layer_hint) {
- connection_start_reading(TO_CONN(conn));
+
+ if (edge_uses_cpath(conn, layer_hint)) {
+ if (!conn->xoff_received) {
+ connection_start_reading(TO_CONN(conn));
+ }
if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
++n_packaging_streams;
@@ -2551,11 +2530,13 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
}
/* Go back and do the ones we skipped, circular-style */
for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
- if (conn->base_.marked_for_close || conn->package_window <= 0 ||
- conn->xoff_received)
+ if (conn->base_.marked_for_close || conn->package_window <= 0)
continue;
- if (!layer_hint || conn->cpath_layer == layer_hint) {
- connection_start_reading(TO_CONN(conn));
+
+ if (edge_uses_cpath(conn, layer_hint)) {
+ if (!conn->xoff_received) {
+ connection_start_reading(TO_CONN(conn));
+ }
if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
++n_packaging_streams;
@@ -2567,6 +2548,32 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
again:
+ /* If we're using conflux, the circuit we decide to send on may change
+ * after we're sending. Get it again, and re-check package windows
+ * for it */
+ if (circ->conflux) {
+ if (circuit_consider_stop_edge_reading(circ, layer_hint))
+ return -1;
+
+ circ = conflux_decide_next_circ(circ->conflux);
+
+ /* Get the destination layer hint for this circuit */
+ layer_hint = conflux_get_destination_hop(circ);
+ }
+
+ /* How many cells do we have space for? It will be the minimum of
+ * the number needed to exhaust the package window, and the minimum
+ * needed to fill the cell queue. */
+ max_to_package = congestion_control_get_package_window(circ, layer_hint);
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ cells_on_queue = circ->n_chan_cells.n;
+ } else {
+ or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
+ cells_on_queue = or_circ->p_chan_cells.n;
+ }
+ if (cell_queue_highwatermark() - cells_on_queue < max_to_package)
+ max_to_package = cell_queue_highwatermark() - cells_on_queue;
+
cells_per_conn = CEIL_DIV(max_to_package, n_packaging_streams);
packaged_this_round = 0;
@@ -2580,7 +2587,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
for (conn=first_conn; conn; conn=conn->next_stream) {
if (conn->base_.marked_for_close || conn->package_window <= 0)
continue;
- if (!layer_hint || conn->cpath_layer == layer_hint) {
+ if (edge_uses_cpath(conn, layer_hint)) {
int n = cells_per_conn, r;
/* handle whatever might still be on the inbuf */
r = connection_edge_package_raw_inbuf(conn, 1, &n);
@@ -2638,7 +2645,7 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
log_debug(domain,"considering circ->package_window %d",
circ->package_window);
- if (congestion_control_get_package_window(circ, layer_hint) <= 0) {
+ if (circuit_get_package_window(circ, layer_hint) <= 0) {
log_debug(domain,"yes, not-at-origin. stopped.");
for (conn = or_circ->n_streams; conn; conn=conn->next_stream)
connection_stop_reading(TO_CONN(conn));
@@ -2649,11 +2656,11 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
/* else, layer hint is defined, use it */
log_debug(domain,"considering layer_hint->package_window %d",
layer_hint->package_window);
- if (congestion_control_get_package_window(circ, layer_hint) <= 0) {
+ if (circuit_get_package_window(circ, layer_hint) <= 0) {
log_debug(domain,"yes, at-origin. stopped.");
for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn;
conn=conn->next_stream) {
- if (conn->cpath_layer == layer_hint)
+ if (edge_uses_cpath(conn, layer_hint))
connection_stop_reading(TO_CONN(conn));
}
return 1;
@@ -3029,7 +3036,7 @@ set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
edge = TO_OR_CIRCUIT(circ)->n_streams;
}
- set_block_state_for_streams(edge, block, 0);
+ set_block_state_for_streams(circ, edge, block, 0);
}
/**
@@ -3039,15 +3046,29 @@ set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
* in the stream list. If it is non-zero, only apply to that specific stream.
*/
static void
-set_block_state_for_streams(edge_connection_t *stream_list, int block,
- streamid_t stream_id)
+set_block_state_for_streams(circuit_t *circ, edge_connection_t *stream_list,
+ int block, streamid_t stream_id)
{
+ /* If we have a conflux object, we need to examine its status before
+ * blocking and unblocking streams. */
+ if (circ->conflux) {
+ bool can_send = conflux_can_send(circ->conflux);
+
+ if (block && can_send) {
+ /* Don't actually block streams, since conflux can send*/
+ return;
+ } else if (!block && !can_send) {
+ /* Don't actually unblock streams, since conflux still can't send */
+ return;
+ }
+ }
+
for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) {
connection_t *conn = TO_CONN(edge);
if (stream_id && edge->stream_id != stream_id)
continue;
- if (!conn->read_event) {
+ if (!conn->read_event || edge->xoff_received) {
/* This connection is a placeholder for something; probably a DNS
* request. It can't actually stop or start reading.*/
continue;
@@ -3412,8 +3433,8 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
if (circ_blocked && fromstream) {
/* This edge connection is apparently not blocked; this can happen for
* new streams on a blocked circuit, for their CONNECTED response.
- * block it now. */
- set_block_state_for_streams(stream_list, 1, fromstream);
+ * block it now, unless we have conflux. */
+ set_block_state_for_streams(circ, stream_list, 1, fromstream);
}
update_circuit_on_cmux(circ, direction);