diff options
Diffstat (limited to 'src/core/or/relay.c')
-rw-r--r-- | src/core/or/relay.c | 107 |
1 files changed, 84 insertions, 23 deletions
diff --git a/src/core/or/relay.c b/src/core/or/relay.c index f5a9e73856..68fddd1ae7 100644 --- a/src/core/or/relay.c +++ b/src/core/or/relay.c @@ -97,6 +97,8 @@ #include "feature/nodelist/routerinfo_st.h" #include "core/or/socks_request_st.h" #include "core/or/sendme.h" +#include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_flow.h" static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, @@ -115,13 +117,6 @@ static void adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ, node_t *node, const tor_addr_t *addr); -/** Stop reading on edge connections when we have this many cells - * waiting on the appropriate queue. */ -#define CELL_QUEUE_HIGHWATER_SIZE 256 -/** Start reading from edge connections again when we get down to this many - * cells. */ -#define CELL_QUEUE_LOWWATER_SIZE 64 - /** Stats: how many relay cells have originated at this hop, or have * been relayed onward (not recognized at this hop)? */ @@ -338,8 +333,17 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ, } return 0; } - log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, - "Didn't recognize cell, but circ stops here! Closing circ."); + if (BUG(CIRCUIT_IS_ORIGIN(circ))) { + /* Should be impossible at this point. */ + return -END_CIRC_REASON_TORPROTOCOL; + } + or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); + if (++or_circ->n_cells_discarded_at_end == 1) { + time_t seconds_open = approx_time() - circ->timestamp_created.tv_sec; + log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, + "Didn't recognize a cell, but circ stops here! Closing circuit. " + "It was created %ld seconds ago.", (long)seconds_open); + } return -END_CIRC_REASON_TORPROTOCOL; } @@ -1574,6 +1578,7 @@ process_sendme_cell(const relay_header_t *rh, const cell_t *cell, } /* Stream level SENDME cell. */ + // TODO: Turn this off for cc_alg=1,2,3; use XON/XOFF instead ret = sendme_process_stream_level(conn, circ, rh->length); if (ret < 0) { /* Means we need to close the circuit with reason ret. */ @@ -1738,6 +1743,44 @@ handle_relay_cell_command(cell_t *cell, circuit_t *circ, } return 0; + case RELAY_COMMAND_XOFF: + if (!conn) { + if (CIRCUIT_IS_ORIGIN(circ)) { + origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ); + if (relay_crypt_from_last_hop(ocirc, layer_hint) && + connection_half_edge_is_valid_data(ocirc->half_streams, + rh->stream_id)) { + circuit_read_valid_data(ocirc, rh->length); + } + } + return 0; + } + + if (circuit_process_stream_xoff(conn, layer_hint, cell)) { + if (CIRCUIT_IS_ORIGIN(circ)) { + circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length); + } + } + return 0; + case RELAY_COMMAND_XON: + if (!conn) { + if (CIRCUIT_IS_ORIGIN(circ)) { + origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ); + if (relay_crypt_from_last_hop(ocirc, layer_hint) && + connection_half_edge_is_valid_data(ocirc->half_streams, + rh->stream_id)) { + circuit_read_valid_data(ocirc, rh->length); + } + } + return 0; + } + + if (circuit_process_stream_xon(conn, layer_hint, cell)) { + if (CIRCUIT_IS_ORIGIN(circ)) { + circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length); + } + } + return 0; case RELAY_COMMAND_END: reason = rh->length > 0 ? get_uint8(cell->payload+RELAY_HEADER_SIZE) : END_STREAM_REASON_MISC; @@ -2091,6 +2134,7 @@ void circuit_reset_sendme_randomness(circuit_t *circ) { circ->have_sent_sufficiently_random_cell = 0; + // XXX: do we need to change this check for congestion control? circ->send_randomness_after_n_cells = CIRCWINDOW_INCREMENT / 2 + crypto_fast_rng_get_uint(get_thread_fast_rng(), CIRCWINDOW_INCREMENT / 2); } @@ -2284,7 +2328,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, } /* Handle the stream-level SENDME package window. */ - if (sendme_note_stream_data_packaged(conn) < 0) { + if (sendme_note_stream_data_packaged(conn, length) < 0) { connection_stop_reading(TO_CONN(conn)); log_debug(domain,"conn->package_window reached 0."); circuit_consider_stop_edge_reading(circ, cpath_layer); @@ -2350,15 +2394,16 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, /* How many cells do we have space for? It will be the minimum of * the number needed to exhaust the package window, and the minimum * needed to fill the cell queue. */ - max_to_package = circ->package_window; + + max_to_package = congestion_control_get_package_window(circ, layer_hint); if (CIRCUIT_IS_ORIGIN(circ)) { cells_on_queue = circ->n_chan_cells.n; } else { or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); cells_on_queue = or_circ->p_chan_cells.n; } - if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package) - max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue; + if (cell_queue_highwatermark() - cells_on_queue < max_to_package) + max_to_package = cell_queue_highwatermark() - cells_on_queue; /* Once we used to start listening on the streams in the order they * appeared in the linked list. That leads to starvation on the @@ -2398,7 +2443,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, /* Activate reading starting from the chosen stream */ for (conn=chosen_stream; conn; conn = conn->next_stream) { /* Start reading for the streams starting from here */ - if (conn->base_.marked_for_close || conn->package_window <= 0) + if (conn->base_.marked_for_close || conn->package_window <= 0 || + conn->xoff_received) continue; if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); @@ -2409,7 +2455,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, } /* Go back and do the ones we skipped, circular-style */ for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) { - if (conn->base_.marked_for_close || conn->package_window <= 0) + if (conn->base_.marked_for_close || conn->package_window <= 0 || + conn->xoff_received) continue; if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); @@ -2495,7 +2542,7 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); log_debug(domain,"considering circ->package_window %d", circ->package_window); - if (circ->package_window <= 0) { + if (congestion_control_get_package_window(circ, layer_hint) <= 0) { log_debug(domain,"yes, not-at-origin. stopped."); for (conn = or_circ->n_streams; conn; conn=conn->next_stream) connection_stop_reading(TO_CONN(conn)); @@ -2506,7 +2553,7 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) /* else, layer hint is defined, use it */ log_debug(domain,"considering layer_hint->package_window %d", layer_hint->package_window); - if (layer_hint->package_window <= 0) { + if (congestion_control_get_package_window(circ, layer_hint) <= 0) { log_debug(domain,"yes, at-origin. stopped."); for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn; conn=conn->next_stream) { @@ -2722,11 +2769,18 @@ cell_queues_get_total_allocation(void) /** The time at which we were last low on memory. */ static time_t last_time_under_memory_pressure = 0; +/** Statistics on how many bytes were removed by the OOM per type. */ +uint64_t oom_stats_n_bytes_removed_dns = 0; +uint64_t oom_stats_n_bytes_removed_cell = 0; +uint64_t oom_stats_n_bytes_removed_geoip = 0; +uint64_t oom_stats_n_bytes_removed_hsdir = 0; + /** Check whether we've got too much space used for cells. If so, * call the OOM handler and return 1. Otherwise, return 0. */ STATIC int cell_queues_check_size(void) { + size_t removed = 0; time_t now = time(NULL); size_t alloc = cell_queues_get_total_allocation(); alloc += half_streams_get_total_allocation(); @@ -2751,20 +2805,27 @@ cell_queues_check_size(void) if (hs_cache_total > get_options()->MaxMemInQueues / 5) { const size_t bytes_to_remove = hs_cache_total - (size_t)(get_options()->MaxMemInQueues / 10); - alloc -= hs_cache_handle_oom(now, bytes_to_remove); + removed = hs_cache_handle_oom(now, bytes_to_remove); + oom_stats_n_bytes_removed_hsdir += removed; + alloc -= removed; } if (geoip_client_cache_total > get_options()->MaxMemInQueues / 5) { const size_t bytes_to_remove = geoip_client_cache_total - (size_t)(get_options()->MaxMemInQueues / 10); - alloc -= geoip_client_cache_handle_oom(now, bytes_to_remove); + removed = geoip_client_cache_handle_oom(now, bytes_to_remove); + oom_stats_n_bytes_removed_geoip += removed; + alloc -= removed; } if (dns_cache_total > get_options()->MaxMemInQueues / 5) { const size_t bytes_to_remove = dns_cache_total - (size_t)(get_options()->MaxMemInQueues / 10); - alloc -= dns_cache_handle_oom(now, bytes_to_remove); + removed = dns_cache_handle_oom(now, bytes_to_remove); + oom_stats_n_bytes_removed_dns += removed; + alloc -= removed; } - circuits_handle_oom(alloc); + removed = circuits_handle_oom(alloc); + oom_stats_n_bytes_removed_cell += removed; return 1; } } @@ -3062,7 +3123,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) /* Is the cell queue low enough to unblock all the streams that are waiting * to write to this circuit? */ - if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) + if (streams_blocked && queue->n <= cell_queue_lowwatermark()) set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */ /* If n_flushed < max still, loop around and pick another circuit */ @@ -3180,7 +3241,7 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, /* If we have too many cells on the circuit, we should stop reading from * the edge streams for a while. */ - if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE) + if (!streams_blocked && queue->n >= cell_queue_highwatermark()) set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */ if (streams_blocked && fromstream) { |