diff options
Diffstat (limited to 'src/core/or/relay.c')
-rw-r--r-- | src/core/or/relay.c | 127 |
1 files changed, 74 insertions, 53 deletions
diff --git a/src/core/or/relay.c b/src/core/or/relay.c index 827f0c3e46..58e48df902 100644 --- a/src/core/or/relay.c +++ b/src/core/or/relay.c @@ -122,7 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell, edge_connection_t *conn, crypt_path_t *layer_hint, relay_header_t *rh); -static void set_block_state_for_streams(edge_connection_t *stream_list, +static void set_block_state_for_streams(circuit_t *circ, + edge_connection_t *stream_list, int block, streamid_t stream_id); /** Stats: how many relay cells have originated at this hop, or have @@ -455,7 +456,7 @@ relay_lookup_conn(circuit_t *circ, cell_t *cell, tmpconn=tmpconn->next_stream) { if (rh.stream_id == tmpconn->stream_id && !tmpconn->base_.marked_for_close && - tmpconn->cpath_layer == layer_hint) { + edge_uses_cpath(tmpconn, layer_hint)) { log_debug(LD_APP,"found conn for stream %d.", rh.stream_id); return tmpconn; } @@ -1549,25 +1550,6 @@ connection_edge_process_relay_cell_not_open( // return -1; } -/** - * Return true iff our decryption layer_hint is from the last hop - * in a circuit. - */ -static bool -relay_crypt_from_last_hop(origin_circuit_t *circ, crypt_path_t *layer_hint) -{ - tor_assert(circ); - tor_assert(layer_hint); - tor_assert(circ->cpath); - - if (layer_hint != circ->cpath->prev) { - log_fn(LOG_PROTOCOL_WARN, LD_CIRC, - "Got unexpected relay data from intermediate hop"); - return false; - } - return true; -} - /** Process a SENDME cell that arrived on <b>circ</b>. If it is a stream level * cell, it is destined for the given <b>conn</b>. If it is a circuit level * cell, it is destined for the <b>layer_hint</b>. The <b>domain</b> is the @@ -2454,6 +2436,15 @@ circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming"); return; } + + /* If we have a conflux negotiated, and it still can't send on + * any circuit, then do not resume sending. */ + if (circ->conflux && !conflux_can_send(circ->conflux)) { + log_debug(layer_hint?LD_APP:LD_EXIT, + "Conflux can't send, not resuming edges"); + return; + } + log_debug(layer_hint?LD_APP:LD_EXIT,"resuming"); if (CIRCUIT_IS_ORIGIN(circ)) @@ -2487,20 +2478,6 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, return 0; } - /* How many cells do we have space for? It will be the minimum of - * the number needed to exhaust the package window, and the minimum - * needed to fill the cell queue. */ - - max_to_package = congestion_control_get_package_window(circ, layer_hint); - if (CIRCUIT_IS_ORIGIN(circ)) { - cells_on_queue = circ->n_chan_cells.n; - } else { - or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); - cells_on_queue = or_circ->p_chan_cells.n; - } - if (cell_queue_highwatermark() - cells_on_queue < max_to_package) - max_to_package = cell_queue_highwatermark() - cells_on_queue; - /* Once we used to start listening on the streams in the order they * appeared in the linked list. That leads to starvation on the * streams that appeared later on the list, since the first streams @@ -2539,11 +2516,13 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, /* Activate reading starting from the chosen stream */ for (conn=chosen_stream; conn; conn = conn->next_stream) { /* Start reading for the streams starting from here */ - if (conn->base_.marked_for_close || conn->package_window <= 0 || - conn->xoff_received) + if (conn->base_.marked_for_close || conn->package_window <= 0) continue; - if (!layer_hint || conn->cpath_layer == layer_hint) { - connection_start_reading(TO_CONN(conn)); + + if (edge_uses_cpath(conn, layer_hint)) { + if (!conn->xoff_received) { + connection_start_reading(TO_CONN(conn)); + } if (connection_get_inbuf_len(TO_CONN(conn)) > 0) ++n_packaging_streams; @@ -2551,11 +2530,13 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, } /* Go back and do the ones we skipped, circular-style */ for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) { - if (conn->base_.marked_for_close || conn->package_window <= 0 || - conn->xoff_received) + if (conn->base_.marked_for_close || conn->package_window <= 0) continue; - if (!layer_hint || conn->cpath_layer == layer_hint) { - connection_start_reading(TO_CONN(conn)); + + if (edge_uses_cpath(conn, layer_hint)) { + if (!conn->xoff_received) { + connection_start_reading(TO_CONN(conn)); + } if (connection_get_inbuf_len(TO_CONN(conn)) > 0) ++n_packaging_streams; @@ -2567,6 +2548,32 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, again: + /* If we're using conflux, the circuit we decide to send on may change + * after we're sending. Get it again, and re-check package windows + * for it */ + if (circ->conflux) { + if (circuit_consider_stop_edge_reading(circ, layer_hint)) + return -1; + + circ = conflux_decide_next_circ(circ->conflux); + + /* Get the destination layer hint for this circuit */ + layer_hint = conflux_get_destination_hop(circ); + } + + /* How many cells do we have space for? It will be the minimum of + * the number needed to exhaust the package window, and the minimum + * needed to fill the cell queue. */ + max_to_package = congestion_control_get_package_window(circ, layer_hint); + if (CIRCUIT_IS_ORIGIN(circ)) { + cells_on_queue = circ->n_chan_cells.n; + } else { + or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); + cells_on_queue = or_circ->p_chan_cells.n; + } + if (cell_queue_highwatermark() - cells_on_queue < max_to_package) + max_to_package = cell_queue_highwatermark() - cells_on_queue; + cells_per_conn = CEIL_DIV(max_to_package, n_packaging_streams); packaged_this_round = 0; @@ -2580,7 +2587,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, for (conn=first_conn; conn; conn=conn->next_stream) { if (conn->base_.marked_for_close || conn->package_window <= 0) continue; - if (!layer_hint || conn->cpath_layer == layer_hint) { + if (edge_uses_cpath(conn, layer_hint)) { int n = cells_per_conn, r; /* handle whatever might still be on the inbuf */ r = connection_edge_package_raw_inbuf(conn, 1, &n); @@ -2638,7 +2645,7 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); log_debug(domain,"considering circ->package_window %d", circ->package_window); - if (congestion_control_get_package_window(circ, layer_hint) <= 0) { + if (circuit_get_package_window(circ, layer_hint) <= 0) { log_debug(domain,"yes, not-at-origin. stopped."); for (conn = or_circ->n_streams; conn; conn=conn->next_stream) connection_stop_reading(TO_CONN(conn)); @@ -2649,11 +2656,11 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) /* else, layer hint is defined, use it */ log_debug(domain,"considering layer_hint->package_window %d", layer_hint->package_window); - if (congestion_control_get_package_window(circ, layer_hint) <= 0) { + if (circuit_get_package_window(circ, layer_hint) <= 0) { log_debug(domain,"yes, at-origin. stopped."); for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn; conn=conn->next_stream) { - if (conn->cpath_layer == layer_hint) + if (edge_uses_cpath(conn, layer_hint)) connection_stop_reading(TO_CONN(conn)); } return 1; @@ -3029,7 +3036,7 @@ set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block) edge = TO_OR_CIRCUIT(circ)->n_streams; } - set_block_state_for_streams(edge, block, 0); + set_block_state_for_streams(circ, edge, block, 0); } /** @@ -3039,15 +3046,29 @@ set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block) * in the stream list. If it is non-zero, only apply to that specific stream. */ static void -set_block_state_for_streams(edge_connection_t *stream_list, int block, - streamid_t stream_id) +set_block_state_for_streams(circuit_t *circ, edge_connection_t *stream_list, + int block, streamid_t stream_id) { + /* If we have a conflux object, we need to examine its status before + * blocking and unblocking streams. */ + if (circ->conflux) { + bool can_send = conflux_can_send(circ->conflux); + + if (block && can_send) { + /* Don't actually block streams, since conflux can send*/ + return; + } else if (!block && !can_send) { + /* Don't actually unblock streams, since conflux still can't send */ + return; + } + } + for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) { connection_t *conn = TO_CONN(edge); if (stream_id && edge->stream_id != stream_id) continue; - if (!conn->read_event) { + if (!conn->read_event || edge->xoff_received) { /* This connection is a placeholder for something; probably a DNS * request. It can't actually stop or start reading.*/ continue; @@ -3412,8 +3433,8 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, if (circ_blocked && fromstream) { /* This edge connection is apparently not blocked; this can happen for * new streams on a blocked circuit, for their CONNECTED response. - * block it now. */ - set_block_state_for_streams(stream_list, 1, fromstream); + * block it now, unless we have conflux. */ + set_block_state_for_streams(circ, stream_list, 1, fromstream); } update_circuit_on_cmux(circ, direction); |