diff options
Diffstat (limited to 'src/core/mainloop')
-rw-r--r-- | src/core/mainloop/connection.c | 72 | ||||
-rw-r--r-- | src/core/mainloop/connection.h | 4 | ||||
-rw-r--r-- | src/core/mainloop/mainloop.c | 27 |
3 files changed, 90 insertions, 13 deletions
diff --git a/src/core/mainloop/connection.c b/src/core/mainloop/connection.c index b17d7bf2bd..9271a70914 100644 --- a/src/core/mainloop/connection.c +++ b/src/core/mainloop/connection.c @@ -117,6 +117,7 @@ #include "lib/cc/ctassert.h" #include "lib/sandbox/sandbox.h" #include "lib/net/buffers_net.h" +#include "lib/net/address.h" #include "lib/tls/tortls.h" #include "lib/evloop/compat_libevent.h" #include "lib/compress/compress.h" @@ -146,6 +147,8 @@ #include "feature/nodelist/routerinfo_st.h" #include "core/or/socks_request_st.h" +#include "core/or/congestion_control_flow.h" + /** * On Windows and Linux we cannot reliably bind() a socket to an * address and port if: 1) There's already a socket bound to wildcard @@ -250,13 +253,13 @@ CONST_TO_LISTENER_CONN(const connection_t *c) } size_t -connection_get_inbuf_len(connection_t *conn) +connection_get_inbuf_len(const connection_t *conn) { return conn->inbuf ? buf_datalen(conn->inbuf) : 0; } size_t -connection_get_outbuf_len(connection_t *conn) +connection_get_outbuf_len(const connection_t *conn) { return conn->outbuf ? buf_datalen(conn->outbuf) : 0; } @@ -612,6 +615,11 @@ entry_connection_new(int type, int socket_family) entry_conn->entry_cfg.ipv4_traffic = 1; else if (socket_family == AF_INET6) entry_conn->entry_cfg.ipv6_traffic = 1; + + /* Initialize the read token bucket to the maximum value which is the same as + * no rate limiting. */ + token_bucket_rw_init(&ENTRY_TO_EDGE_CONN(entry_conn)->bucket, INT32_MAX, + INT32_MAX, monotime_coarse_get_stamp()); return entry_conn; } @@ -623,6 +631,10 @@ edge_connection_new(int type, int socket_family) edge_connection_t *edge_conn = tor_malloc_zero(sizeof(edge_connection_t)); tor_assert(type == CONN_TYPE_EXIT); connection_init(time(NULL), TO_CONN(edge_conn), type, socket_family); + /* Initialize the read token bucket to the maximum value which is the same as + * no rate limiting. */ + token_bucket_rw_init(&edge_conn->bucket, INT32_MAX, INT32_MAX, + monotime_coarse_get_stamp()); return edge_conn; } @@ -1261,7 +1273,7 @@ socket_failed_from_resource_exhaustion(void) */ if (get_max_sockets() > 65535) { /* TCP port exhaustion */ - rep_hist_note_overload(OVERLOAD_GENERAL); + rep_hist_note_tcp_exhaustion(); } else { /* File descriptor exhaustion */ rep_hist_note_overload(OVERLOAD_FD_EXHAUSTED); @@ -3457,6 +3469,19 @@ connection_bucket_read_limit(connection_t *conn, time_t now) base = get_cell_network_size(or_conn->wide_circ_ids); } + /* Edge connection have their own read bucket due to flow control being able + * to set a rate limit for them. However, for exit connections, we still need + * to honor the global bucket as well. */ + if (CONN_IS_EDGE(conn)) { + const edge_connection_t *edge_conn = CONST_TO_EDGE_CONN(conn); + conn_bucket = token_bucket_rw_get_read(&edge_conn->bucket); + if (conn->type == CONN_TYPE_EXIT) { + /* Decide between our limit and the global one. */ + goto end; + } + return conn_bucket; + } + if (!connection_is_rate_limited(conn)) { /* be willing to read on local conns even if our buckets are empty */ return conn_bucket>=0 ? conn_bucket : 1<<14; @@ -3467,6 +3492,7 @@ connection_bucket_read_limit(connection_t *conn, time_t now) global_bucket_val = MIN(global_bucket_val, relayed); } + end: return connection_bucket_get_share(base, priority, global_bucket_val, conn_bucket); } @@ -3644,6 +3670,13 @@ connection_buckets_decrement(connection_t *conn, time_t now, record_num_bytes_transferred_impl(conn, now, num_read, num_written); + /* Edge connection need to decrement the read side of the bucket used by our + * congestion control. */ + if (CONN_IS_EDGE(conn) && num_read > 0) { + edge_connection_t *edge_conn = TO_EDGE_CONN(conn); + token_bucket_rw_dec(&edge_conn->bucket, num_read, 0); + } + if (!connection_is_rate_limited(conn)) return; /* local IPs are free */ @@ -3697,14 +3730,16 @@ connection_write_bw_exhausted(connection_t *conn, bool is_global_bw) void connection_consider_empty_read_buckets(connection_t *conn) { + int is_global = 1; const char *reason; - if (!connection_is_rate_limited(conn)) + if (CONN_IS_EDGE(conn) && + token_bucket_rw_get_read(&TO_EDGE_CONN(conn)->bucket) <= 0) { + reason = "edge connection read bucket exhausted. Pausing."; + is_global = false; + } else if (!connection_is_rate_limited(conn)) { return; /* Always okay. */ - - int is_global = 1; - - if (token_bucket_rw_get_read(&global_bucket) <= 0) { + } else if (token_bucket_rw_get_read(&global_bucket) <= 0) { reason = "global read bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && token_bucket_rw_get_read(&global_relayed_bucket) <= 0) { @@ -3714,8 +3749,9 @@ connection_consider_empty_read_buckets(connection_t *conn) token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection read bucket exhausted. Pausing."; is_global = false; - } else + } else { return; /* all good, no need to stop it */ + } LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason)); connection_read_bw_exhausted(conn, is_global); @@ -3819,6 +3855,10 @@ connection_bucket_refill_single(connection_t *conn, uint32_t now_ts) or_connection_t *or_conn = TO_OR_CONN(conn); token_bucket_rw_refill(&or_conn->bucket, now_ts); } + + if (CONN_IS_EDGE(conn)) { + token_bucket_rw_refill(&TO_EDGE_CONN(conn)->bucket, now_ts); + } } /** @@ -4556,9 +4596,9 @@ connection_handle_write_impl(connection_t *conn, int force) !dont_stop_writing) { /* it's done flushing */ if (connection_finished_flushing(conn) < 0) { /* already marked */ - return -1; + goto err; } - return 0; + goto done; } /* Call even if result is 0, since the global write bucket may @@ -4568,7 +4608,17 @@ connection_handle_write_impl(connection_t *conn, int force) if (n_read > 0 && connection_is_reading(conn)) connection_consider_empty_read_buckets(conn); + done: + /* If this is an edge connection with congestion control, check to see + * if it is time to send an xon */ + if (conn_uses_flow_control(conn)) { + flow_control_decide_xon(TO_EDGE_CONN(conn), n_written); + } + return 0; + + err: + return -1; } /* DOCDOC connection_handle_write */ diff --git a/src/core/mainloop/connection.h b/src/core/mainloop/connection.h index 36c94d6570..8b378b15a4 100644 --- a/src/core/mainloop/connection.h +++ b/src/core/mainloop/connection.h @@ -274,8 +274,8 @@ void connection_buf_add_compress(const char *string, size_t len, struct dir_connection_t *conn, int done); void connection_buf_add_buf(struct connection_t *conn, struct buf_t *buf); -size_t connection_get_inbuf_len(struct connection_t *conn); -size_t connection_get_outbuf_len(struct connection_t *conn); +size_t connection_get_inbuf_len(const struct connection_t *conn); +size_t connection_get_outbuf_len(const struct connection_t *conn); struct connection_t *connection_get_by_global_id(uint64_t id); struct connection_t *connection_get_by_type(int type); diff --git a/src/core/mainloop/mainloop.c b/src/core/mainloop/mainloop.c index 69606c0d53..cd57dea3d4 100644 --- a/src/core/mainloop/mainloop.c +++ b/src/core/mainloop/mainloop.c @@ -641,6 +641,13 @@ connection_start_reading,(connection_t *conn)) if (connection_should_read_from_linked_conn(conn)) connection_start_reading_from_linked_conn(conn); } else { + if (CONN_IS_EDGE(conn) && TO_EDGE_CONN(conn)->xoff_received) { + /* We should not get called here if we're waiting for an XON, but + * belt-and-suspenders */ + log_notice(LD_NET, + "Request to start reading on an edgeconn blocked with XOFF"); + return; + } if (event_add(conn->read_event, NULL)) log_warn(LD_NET, "Error from libevent setting read event state for %d " "to watched: %s", @@ -1293,6 +1300,7 @@ signewnym_impl(time_t now) circuit_mark_all_dirty_circs_as_unusable(); addressmap_clear_transient(); hs_client_purge_state(); + purge_vanguards_lite(); time_of_last_signewnym = now; signewnym_is_pending = 0; @@ -1370,6 +1378,7 @@ CALLBACK(save_state); CALLBACK(write_stats_file); CALLBACK(control_per_second_events); CALLBACK(second_elapsed); +CALLBACK(manage_vglite); #undef CALLBACK @@ -1392,6 +1401,9 @@ STATIC periodic_event_item_t mainloop_periodic_events[] = { CALLBACK(second_elapsed, NET_PARTICIPANT, FL(RUN_ON_DISABLE)), + /* Update vanguards-lite once per hour, if we have networking */ + CALLBACK(manage_vglite, NET_PARTICIPANT, FL(NEED_NET)), + /* XXXX Do we have a reason to do this on a callback? Does it do any good at * all? For now, if we're dormant, we can let our listeners decay. */ CALLBACK(retry_listeners, NET_PARTICIPANT, FL(NEED_NET)), @@ -1662,6 +1674,21 @@ mainloop_schedule_shutdown(int delay_sec) mainloop_event_schedule(scheduled_shutdown_ev, &delay_tv); } +/** + * Update vanguards-lite layer2 nodes, once every 15 minutes + */ +static int +manage_vglite_callback(time_t now, const or_options_t *options) +{ + (void)now; + (void)options; +#define VANGUARDS_LITE_INTERVAL (15*60) + + maintain_layer2_guards(); + + return VANGUARDS_LITE_INTERVAL; +} + /** Perform regular maintenance tasks. This function gets run once per * second. */ |