aboutsummaryrefslogtreecommitdiff
path: root/src/core/mainloop
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/mainloop')
-rw-r--r--src/core/mainloop/connection.c72
-rw-r--r--src/core/mainloop/connection.h4
-rw-r--r--src/core/mainloop/mainloop.c27
3 files changed, 90 insertions, 13 deletions
diff --git a/src/core/mainloop/connection.c b/src/core/mainloop/connection.c
index b17d7bf2bd..9271a70914 100644
--- a/src/core/mainloop/connection.c
+++ b/src/core/mainloop/connection.c
@@ -117,6 +117,7 @@
#include "lib/cc/ctassert.h"
#include "lib/sandbox/sandbox.h"
#include "lib/net/buffers_net.h"
+#include "lib/net/address.h"
#include "lib/tls/tortls.h"
#include "lib/evloop/compat_libevent.h"
#include "lib/compress/compress.h"
@@ -146,6 +147,8 @@
#include "feature/nodelist/routerinfo_st.h"
#include "core/or/socks_request_st.h"
+#include "core/or/congestion_control_flow.h"
+
/**
* On Windows and Linux we cannot reliably bind() a socket to an
* address and port if: 1) There's already a socket bound to wildcard
@@ -250,13 +253,13 @@ CONST_TO_LISTENER_CONN(const connection_t *c)
}
size_t
-connection_get_inbuf_len(connection_t *conn)
+connection_get_inbuf_len(const connection_t *conn)
{
return conn->inbuf ? buf_datalen(conn->inbuf) : 0;
}
size_t
-connection_get_outbuf_len(connection_t *conn)
+connection_get_outbuf_len(const connection_t *conn)
{
return conn->outbuf ? buf_datalen(conn->outbuf) : 0;
}
@@ -612,6 +615,11 @@ entry_connection_new(int type, int socket_family)
entry_conn->entry_cfg.ipv4_traffic = 1;
else if (socket_family == AF_INET6)
entry_conn->entry_cfg.ipv6_traffic = 1;
+
+ /* Initialize the read token bucket to the maximum value which is the same as
+ * no rate limiting. */
+ token_bucket_rw_init(&ENTRY_TO_EDGE_CONN(entry_conn)->bucket, INT32_MAX,
+ INT32_MAX, monotime_coarse_get_stamp());
return entry_conn;
}
@@ -623,6 +631,10 @@ edge_connection_new(int type, int socket_family)
edge_connection_t *edge_conn = tor_malloc_zero(sizeof(edge_connection_t));
tor_assert(type == CONN_TYPE_EXIT);
connection_init(time(NULL), TO_CONN(edge_conn), type, socket_family);
+ /* Initialize the read token bucket to the maximum value which is the same as
+ * no rate limiting. */
+ token_bucket_rw_init(&edge_conn->bucket, INT32_MAX, INT32_MAX,
+ monotime_coarse_get_stamp());
return edge_conn;
}
@@ -1261,7 +1273,7 @@ socket_failed_from_resource_exhaustion(void)
*/
if (get_max_sockets() > 65535) {
/* TCP port exhaustion */
- rep_hist_note_overload(OVERLOAD_GENERAL);
+ rep_hist_note_tcp_exhaustion();
} else {
/* File descriptor exhaustion */
rep_hist_note_overload(OVERLOAD_FD_EXHAUSTED);
@@ -3457,6 +3469,19 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
base = get_cell_network_size(or_conn->wide_circ_ids);
}
+ /* Edge connection have their own read bucket due to flow control being able
+ * to set a rate limit for them. However, for exit connections, we still need
+ * to honor the global bucket as well. */
+ if (CONN_IS_EDGE(conn)) {
+ const edge_connection_t *edge_conn = CONST_TO_EDGE_CONN(conn);
+ conn_bucket = token_bucket_rw_get_read(&edge_conn->bucket);
+ if (conn->type == CONN_TYPE_EXIT) {
+ /* Decide between our limit and the global one. */
+ goto end;
+ }
+ return conn_bucket;
+ }
+
if (!connection_is_rate_limited(conn)) {
/* be willing to read on local conns even if our buckets are empty */
return conn_bucket>=0 ? conn_bucket : 1<<14;
@@ -3467,6 +3492,7 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
global_bucket_val = MIN(global_bucket_val, relayed);
}
+ end:
return connection_bucket_get_share(base, priority,
global_bucket_val, conn_bucket);
}
@@ -3644,6 +3670,13 @@ connection_buckets_decrement(connection_t *conn, time_t now,
record_num_bytes_transferred_impl(conn, now, num_read, num_written);
+ /* Edge connection need to decrement the read side of the bucket used by our
+ * congestion control. */
+ if (CONN_IS_EDGE(conn) && num_read > 0) {
+ edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
+ token_bucket_rw_dec(&edge_conn->bucket, num_read, 0);
+ }
+
if (!connection_is_rate_limited(conn))
return; /* local IPs are free */
@@ -3697,14 +3730,16 @@ connection_write_bw_exhausted(connection_t *conn, bool is_global_bw)
void
connection_consider_empty_read_buckets(connection_t *conn)
{
+ int is_global = 1;
const char *reason;
- if (!connection_is_rate_limited(conn))
+ if (CONN_IS_EDGE(conn) &&
+ token_bucket_rw_get_read(&TO_EDGE_CONN(conn)->bucket) <= 0) {
+ reason = "edge connection read bucket exhausted. Pausing.";
+ is_global = false;
+ } else if (!connection_is_rate_limited(conn)) {
return; /* Always okay. */
-
- int is_global = 1;
-
- if (token_bucket_rw_get_read(&global_bucket) <= 0) {
+ } else if (token_bucket_rw_get_read(&global_bucket) <= 0) {
reason = "global read bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
token_bucket_rw_get_read(&global_relayed_bucket) <= 0) {
@@ -3714,8 +3749,9 @@ connection_consider_empty_read_buckets(connection_t *conn)
token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) {
reason = "connection read bucket exhausted. Pausing.";
is_global = false;
- } else
+ } else {
return; /* all good, no need to stop it */
+ }
LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
connection_read_bw_exhausted(conn, is_global);
@@ -3819,6 +3855,10 @@ connection_bucket_refill_single(connection_t *conn, uint32_t now_ts)
or_connection_t *or_conn = TO_OR_CONN(conn);
token_bucket_rw_refill(&or_conn->bucket, now_ts);
}
+
+ if (CONN_IS_EDGE(conn)) {
+ token_bucket_rw_refill(&TO_EDGE_CONN(conn)->bucket, now_ts);
+ }
}
/**
@@ -4556,9 +4596,9 @@ connection_handle_write_impl(connection_t *conn, int force)
!dont_stop_writing) { /* it's done flushing */
if (connection_finished_flushing(conn) < 0) {
/* already marked */
- return -1;
+ goto err;
}
- return 0;
+ goto done;
}
/* Call even if result is 0, since the global write bucket may
@@ -4568,7 +4608,17 @@ connection_handle_write_impl(connection_t *conn, int force)
if (n_read > 0 && connection_is_reading(conn))
connection_consider_empty_read_buckets(conn);
+ done:
+ /* If this is an edge connection with congestion control, check to see
+ * if it is time to send an xon */
+ if (conn_uses_flow_control(conn)) {
+ flow_control_decide_xon(TO_EDGE_CONN(conn), n_written);
+ }
+
return 0;
+
+ err:
+ return -1;
}
/* DOCDOC connection_handle_write */
diff --git a/src/core/mainloop/connection.h b/src/core/mainloop/connection.h
index 36c94d6570..8b378b15a4 100644
--- a/src/core/mainloop/connection.h
+++ b/src/core/mainloop/connection.h
@@ -274,8 +274,8 @@ void connection_buf_add_compress(const char *string, size_t len,
struct dir_connection_t *conn, int done);
void connection_buf_add_buf(struct connection_t *conn, struct buf_t *buf);
-size_t connection_get_inbuf_len(struct connection_t *conn);
-size_t connection_get_outbuf_len(struct connection_t *conn);
+size_t connection_get_inbuf_len(const struct connection_t *conn);
+size_t connection_get_outbuf_len(const struct connection_t *conn);
struct connection_t *connection_get_by_global_id(uint64_t id);
struct connection_t *connection_get_by_type(int type);
diff --git a/src/core/mainloop/mainloop.c b/src/core/mainloop/mainloop.c
index 69606c0d53..cd57dea3d4 100644
--- a/src/core/mainloop/mainloop.c
+++ b/src/core/mainloop/mainloop.c
@@ -641,6 +641,13 @@ connection_start_reading,(connection_t *conn))
if (connection_should_read_from_linked_conn(conn))
connection_start_reading_from_linked_conn(conn);
} else {
+ if (CONN_IS_EDGE(conn) && TO_EDGE_CONN(conn)->xoff_received) {
+ /* We should not get called here if we're waiting for an XON, but
+ * belt-and-suspenders */
+ log_notice(LD_NET,
+ "Request to start reading on an edgeconn blocked with XOFF");
+ return;
+ }
if (event_add(conn->read_event, NULL))
log_warn(LD_NET, "Error from libevent setting read event state for %d "
"to watched: %s",
@@ -1293,6 +1300,7 @@ signewnym_impl(time_t now)
circuit_mark_all_dirty_circs_as_unusable();
addressmap_clear_transient();
hs_client_purge_state();
+ purge_vanguards_lite();
time_of_last_signewnym = now;
signewnym_is_pending = 0;
@@ -1370,6 +1378,7 @@ CALLBACK(save_state);
CALLBACK(write_stats_file);
CALLBACK(control_per_second_events);
CALLBACK(second_elapsed);
+CALLBACK(manage_vglite);
#undef CALLBACK
@@ -1392,6 +1401,9 @@ STATIC periodic_event_item_t mainloop_periodic_events[] = {
CALLBACK(second_elapsed, NET_PARTICIPANT,
FL(RUN_ON_DISABLE)),
+ /* Update vanguards-lite once per hour, if we have networking */
+ CALLBACK(manage_vglite, NET_PARTICIPANT, FL(NEED_NET)),
+
/* XXXX Do we have a reason to do this on a callback? Does it do any good at
* all? For now, if we're dormant, we can let our listeners decay. */
CALLBACK(retry_listeners, NET_PARTICIPANT, FL(NEED_NET)),
@@ -1662,6 +1674,21 @@ mainloop_schedule_shutdown(int delay_sec)
mainloop_event_schedule(scheduled_shutdown_ev, &delay_tv);
}
+/**
+ * Update vanguards-lite layer2 nodes, once every 15 minutes
+ */
+static int
+manage_vglite_callback(time_t now, const or_options_t *options)
+{
+ (void)now;
+ (void)options;
+#define VANGUARDS_LITE_INTERVAL (15*60)
+
+ maintain_layer2_guards();
+
+ return VANGUARDS_LITE_INTERVAL;
+}
+
/** Perform regular maintenance tasks. This function gets run once per
* second.
*/