aboutsummaryrefslogtreecommitdiff
path: root/src/core/or/relay.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/or/relay.c')
-rw-r--r--src/core/or/relay.c107
1 files changed, 84 insertions, 23 deletions
diff --git a/src/core/or/relay.c b/src/core/or/relay.c
index f5a9e73856..68fddd1ae7 100644
--- a/src/core/or/relay.c
+++ b/src/core/or/relay.c
@@ -97,6 +97,8 @@
#include "feature/nodelist/routerinfo_st.h"
#include "core/or/socks_request_st.h"
#include "core/or/sendme.h"
+#include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_flow.h"
static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
cell_direction_t cell_direction,
@@ -115,13 +117,6 @@ static void adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ,
node_t *node,
const tor_addr_t *addr);
-/** Stop reading on edge connections when we have this many cells
- * waiting on the appropriate queue. */
-#define CELL_QUEUE_HIGHWATER_SIZE 256
-/** Start reading from edge connections again when we get down to this many
- * cells. */
-#define CELL_QUEUE_LOWWATER_SIZE 64
-
/** Stats: how many relay cells have originated at this hop, or have
* been relayed onward (not recognized at this hop)?
*/
@@ -338,8 +333,17 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
}
return 0;
}
- log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
- "Didn't recognize cell, but circ stops here! Closing circ.");
+ if (BUG(CIRCUIT_IS_ORIGIN(circ))) {
+ /* Should be impossible at this point. */
+ return -END_CIRC_REASON_TORPROTOCOL;
+ }
+ or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
+ if (++or_circ->n_cells_discarded_at_end == 1) {
+ time_t seconds_open = approx_time() - circ->timestamp_created.tv_sec;
+ log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
+ "Didn't recognize a cell, but circ stops here! Closing circuit. "
+ "It was created %ld seconds ago.", (long)seconds_open);
+ }
return -END_CIRC_REASON_TORPROTOCOL;
}
@@ -1574,6 +1578,7 @@ process_sendme_cell(const relay_header_t *rh, const cell_t *cell,
}
/* Stream level SENDME cell. */
+ // TODO: Turn this off for cc_alg=1,2,3; use XON/XOFF instead
ret = sendme_process_stream_level(conn, circ, rh->length);
if (ret < 0) {
/* Means we need to close the circuit with reason ret. */
@@ -1738,6 +1743,44 @@ handle_relay_cell_command(cell_t *cell, circuit_t *circ,
}
return 0;
+ case RELAY_COMMAND_XOFF:
+ if (!conn) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+ if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
+ connection_half_edge_is_valid_data(ocirc->half_streams,
+ rh->stream_id)) {
+ circuit_read_valid_data(ocirc, rh->length);
+ }
+ }
+ return 0;
+ }
+
+ if (circuit_process_stream_xoff(conn, layer_hint, cell)) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
+ }
+ }
+ return 0;
+ case RELAY_COMMAND_XON:
+ if (!conn) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+ if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
+ connection_half_edge_is_valid_data(ocirc->half_streams,
+ rh->stream_id)) {
+ circuit_read_valid_data(ocirc, rh->length);
+ }
+ }
+ return 0;
+ }
+
+ if (circuit_process_stream_xon(conn, layer_hint, cell)) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
+ }
+ }
+ return 0;
case RELAY_COMMAND_END:
reason = rh->length > 0 ?
get_uint8(cell->payload+RELAY_HEADER_SIZE) : END_STREAM_REASON_MISC;
@@ -2091,6 +2134,7 @@ void
circuit_reset_sendme_randomness(circuit_t *circ)
{
circ->have_sent_sufficiently_random_cell = 0;
+ // XXX: do we need to change this check for congestion control?
circ->send_randomness_after_n_cells = CIRCWINDOW_INCREMENT / 2 +
crypto_fast_rng_get_uint(get_thread_fast_rng(), CIRCWINDOW_INCREMENT / 2);
}
@@ -2284,7 +2328,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
}
/* Handle the stream-level SENDME package window. */
- if (sendme_note_stream_data_packaged(conn) < 0) {
+ if (sendme_note_stream_data_packaged(conn, length) < 0) {
connection_stop_reading(TO_CONN(conn));
log_debug(domain,"conn->package_window reached 0.");
circuit_consider_stop_edge_reading(circ, cpath_layer);
@@ -2350,15 +2394,16 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
/* How many cells do we have space for? It will be the minimum of
* the number needed to exhaust the package window, and the minimum
* needed to fill the cell queue. */
- max_to_package = circ->package_window;
+
+ max_to_package = congestion_control_get_package_window(circ, layer_hint);
if (CIRCUIT_IS_ORIGIN(circ)) {
cells_on_queue = circ->n_chan_cells.n;
} else {
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
cells_on_queue = or_circ->p_chan_cells.n;
}
- if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package)
- max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue;
+ if (cell_queue_highwatermark() - cells_on_queue < max_to_package)
+ max_to_package = cell_queue_highwatermark() - cells_on_queue;
/* Once we used to start listening on the streams in the order they
* appeared in the linked list. That leads to starvation on the
@@ -2398,7 +2443,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
/* Activate reading starting from the chosen stream */
for (conn=chosen_stream; conn; conn = conn->next_stream) {
/* Start reading for the streams starting from here */
- if (conn->base_.marked_for_close || conn->package_window <= 0)
+ if (conn->base_.marked_for_close || conn->package_window <= 0 ||
+ conn->xoff_received)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
@@ -2409,7 +2455,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
}
/* Go back and do the ones we skipped, circular-style */
for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
- if (conn->base_.marked_for_close || conn->package_window <= 0)
+ if (conn->base_.marked_for_close || conn->package_window <= 0 ||
+ conn->xoff_received)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
@@ -2495,7 +2542,7 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
log_debug(domain,"considering circ->package_window %d",
circ->package_window);
- if (circ->package_window <= 0) {
+ if (congestion_control_get_package_window(circ, layer_hint) <= 0) {
log_debug(domain,"yes, not-at-origin. stopped.");
for (conn = or_circ->n_streams; conn; conn=conn->next_stream)
connection_stop_reading(TO_CONN(conn));
@@ -2506,7 +2553,7 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
/* else, layer hint is defined, use it */
log_debug(domain,"considering layer_hint->package_window %d",
layer_hint->package_window);
- if (layer_hint->package_window <= 0) {
+ if (congestion_control_get_package_window(circ, layer_hint) <= 0) {
log_debug(domain,"yes, at-origin. stopped.");
for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn;
conn=conn->next_stream) {
@@ -2722,11 +2769,18 @@ cell_queues_get_total_allocation(void)
/** The time at which we were last low on memory. */
static time_t last_time_under_memory_pressure = 0;
+/** Statistics on how many bytes were removed by the OOM per type. */
+uint64_t oom_stats_n_bytes_removed_dns = 0;
+uint64_t oom_stats_n_bytes_removed_cell = 0;
+uint64_t oom_stats_n_bytes_removed_geoip = 0;
+uint64_t oom_stats_n_bytes_removed_hsdir = 0;
+
/** Check whether we've got too much space used for cells. If so,
* call the OOM handler and return 1. Otherwise, return 0. */
STATIC int
cell_queues_check_size(void)
{
+ size_t removed = 0;
time_t now = time(NULL);
size_t alloc = cell_queues_get_total_allocation();
alloc += half_streams_get_total_allocation();
@@ -2751,20 +2805,27 @@ cell_queues_check_size(void)
if (hs_cache_total > get_options()->MaxMemInQueues / 5) {
const size_t bytes_to_remove =
hs_cache_total - (size_t)(get_options()->MaxMemInQueues / 10);
- alloc -= hs_cache_handle_oom(now, bytes_to_remove);
+ removed = hs_cache_handle_oom(now, bytes_to_remove);
+ oom_stats_n_bytes_removed_hsdir += removed;
+ alloc -= removed;
}
if (geoip_client_cache_total > get_options()->MaxMemInQueues / 5) {
const size_t bytes_to_remove =
geoip_client_cache_total -
(size_t)(get_options()->MaxMemInQueues / 10);
- alloc -= geoip_client_cache_handle_oom(now, bytes_to_remove);
+ removed = geoip_client_cache_handle_oom(now, bytes_to_remove);
+ oom_stats_n_bytes_removed_geoip += removed;
+ alloc -= removed;
}
if (dns_cache_total > get_options()->MaxMemInQueues / 5) {
const size_t bytes_to_remove =
dns_cache_total - (size_t)(get_options()->MaxMemInQueues / 10);
- alloc -= dns_cache_handle_oom(now, bytes_to_remove);
+ removed = dns_cache_handle_oom(now, bytes_to_remove);
+ oom_stats_n_bytes_removed_dns += removed;
+ alloc -= removed;
}
- circuits_handle_oom(alloc);
+ removed = circuits_handle_oom(alloc);
+ oom_stats_n_bytes_removed_cell += removed;
return 1;
}
}
@@ -3062,7 +3123,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
/* Is the cell queue low enough to unblock all the streams that are waiting
* to write to this circuit? */
- if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
+ if (streams_blocked && queue->n <= cell_queue_lowwatermark())
set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
/* If n_flushed < max still, loop around and pick another circuit */
@@ -3180,7 +3241,7 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
/* If we have too many cells on the circuit, we should stop reading from
* the edge streams for a while. */
- if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
+ if (!streams_blocked && queue->n >= cell_queue_highwatermark())
set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */
if (streams_blocked && fromstream) {