summaryrefslogtreecommitdiff
path: root/src/or/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/or/connection.c')
-rw-r--r--src/or/connection.c625
1 files changed, 262 insertions, 363 deletions
diff --git a/src/or/connection.c b/src/or/connection.c
index 2a6b10763e..0a2a635096 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -1,4 +1,4 @@
- /* Copyright (c) 2001 Matej Pfajfar.
+/* Copyright (c) 2001 Matej Pfajfar.
* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2017, The Tor Project, Inc. */
@@ -76,6 +76,7 @@
#include "connection_edge.h"
#include "connection_or.h"
#include "control.h"
+#include "crypto_util.h"
#include "directory.h"
#include "dirserv.h"
#include "dns.h"
@@ -85,6 +86,7 @@
#include "ext_orport.h"
#include "geoip.h"
#include "main.h"
+#include "hibernate.h"
#include "hs_common.h"
#include "hs_ident.h"
#include "nodelist.h"
@@ -101,7 +103,6 @@
#include "transports.h"
#include "routerparse.h"
#include "sandbox.h"
-#include "transports.h"
#ifdef HAVE_PWD_H
#include <pwd.h>
@@ -120,8 +121,6 @@ static connection_t *connection_listener_new(
static void connection_init(time_t now, connection_t *conn, int type,
int socket_family);
static int connection_handle_listener_read(connection_t *conn, int new_type);
-static int connection_bucket_should_increase(int bucket,
- or_connection_t *conn);
static int connection_finished_flushing(connection_t *conn);
static int connection_flushed_some(connection_t *conn);
static int connection_finished_connecting(connection_t *conn);
@@ -140,6 +139,8 @@ static const char *proxy_type_to_string(int proxy_type);
static int get_proxy_type(void);
const tor_addr_t *conn_get_outbound_address(sa_family_t family,
const or_options_t *options, unsigned int conn_type);
+static void reenable_blocked_connection_init(const or_options_t *options);
+static void reenable_blocked_connection_schedule(void);
/** The last addresses that our network interface seemed to have been
* binding to. We use this as one way to detect when our IP changes.
@@ -460,8 +461,8 @@ connection_init(time_t now, connection_t *conn, int type, int socket_family)
}
conn->timestamp_created = now;
- conn->timestamp_lastread = now;
- conn->timestamp_lastwritten = now;
+ conn->timestamp_last_read_allowed = now;
+ conn->timestamp_last_write_allowed = now;
}
/** Create a link between <b>conn_a</b> and <b>conn_b</b>. */
@@ -775,8 +776,8 @@ connection_close_immediate(connection_t *conn)
connection_unregister_events(conn);
/* Prevent the event from getting unblocked. */
- conn->read_blocked_on_bw =
- conn->write_blocked_on_bw = 0;
+ conn->read_blocked_on_bw = 0;
+ conn->write_blocked_on_bw = 0;
if (SOCKET_OK(conn->s))
tor_close_socket(conn->s);
@@ -859,7 +860,7 @@ connection_mark_for_close_internal_, (connection_t *conn,
/* in case we're going to be held-open-til-flushed, reset
* the number of seconds since last successful write, so
* we get our whole 15 seconds */
- conn->timestamp_lastwritten = time(NULL);
+ conn->timestamp_last_write_allowed = time(NULL);
}
/** Find each connection that has hold_open_until_flushed set to
@@ -881,7 +882,7 @@ connection_expire_held_open(void)
*/
if (conn->hold_open_until_flushed) {
tor_assert(conn->marked_for_close);
- if (now - conn->timestamp_lastwritten >= 15) {
+ if (now - conn->timestamp_last_write_allowed >= 15) {
int severity;
if (conn->type == CONN_TYPE_EXIT ||
(conn->type == CONN_TYPE_DIR &&
@@ -1764,13 +1765,13 @@ connection_connect_sockaddr,(connection_t *conn,
tor_assert(sa);
tor_assert(socket_error);
- if (get_options()->DisableNetwork) {
- /* We should never even try to connect anyplace if DisableNetwork is set.
- * Warn if we do, and refuse to make the connection.
+ if (net_is_completely_disabled()) {
+ /* We should never even try to connect anyplace if the network is
+ * completely shut off.
*
- * We only check DisableNetwork here, not we_are_hibernating(), since
- * we'll still try to fulfill client requests sometimes in the latter case
- * (if it is soft hibernation) */
+ * (We don't check net_is_disabled() here, since we still sometimes
+ * want to open connections when we're in soft hibernation.)
+ */
static ratelim_t disablenet_violated = RATELIM_INIT(30*60);
*socket_error = SOCK_ERRNO(ENETUNREACH);
log_fn_ratelim(&disablenet_violated, LOG_WARN, LD_BUG,
@@ -2820,10 +2821,10 @@ connection_is_rate_limited(connection_t *conn)
return 1;
}
-/** Did either global write bucket run dry last second? If so,
- * we are likely to run dry again this second, so be stingy with the
- * tokens we just put in. */
-static int write_buckets_empty_last_second = 0;
+/** When was either global write bucket last empty? If this was recent, then
+ * we're probably low on bandwidth, and we should be stingy with our bandwidth
+ * usage. */
+static time_t write_buckets_last_empty_at = -100;
/** How many seconds of no active local circuits will make the
* connection revert to the "relayed" bandwidth class? */
@@ -2851,25 +2852,25 @@ connection_counts_as_relayed_traffic(connection_t *conn, time_t now)
* write many of them or just a few; and <b>conn_bucket</b> (if
* non-negative) provides an upper limit for our answer. */
static ssize_t
-connection_bucket_round_robin(int base, int priority,
- ssize_t global_bucket, ssize_t conn_bucket)
+connection_bucket_get_share(int base, int priority,
+ ssize_t global_bucket_val, ssize_t conn_bucket)
{
ssize_t at_most;
ssize_t num_bytes_high = (priority ? 32 : 16) * base;
ssize_t num_bytes_low = (priority ? 4 : 2) * base;
- /* Do a rudimentary round-robin so one circuit can't hog a connection.
+ /* Do a rudimentary limiting so one circuit can't hog a connection.
* Pick at most 32 cells, at least 4 cells if possible, and if we're in
* the middle pick 1/8 of the available bandwidth. */
- at_most = global_bucket / 8;
+ at_most = global_bucket_val / 8;
at_most -= (at_most % base); /* round down */
if (at_most > num_bytes_high) /* 16 KB, or 8 KB for low-priority */
at_most = num_bytes_high;
else if (at_most < num_bytes_low) /* 2 KB, or 1 KB for low-priority */
at_most = num_bytes_low;
- if (at_most > global_bucket)
- at_most = global_bucket;
+ if (at_most > global_bucket_val)
+ at_most = global_bucket_val;
if (conn_bucket >= 0 && at_most > conn_bucket)
at_most = conn_bucket;
@@ -2885,13 +2886,13 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
{
int base = RELAY_PAYLOAD_SIZE;
int priority = conn->type != CONN_TYPE_DIR;
- int conn_bucket = -1;
- int global_bucket = global_read_bucket;
+ ssize_t conn_bucket = -1;
+ size_t global_bucket_val = token_bucket_rw_get_read(&global_bucket);
if (connection_speaks_cells(conn)) {
or_connection_t *or_conn = TO_OR_CONN(conn);
if (conn->state == OR_CONN_STATE_OPEN)
- conn_bucket = or_conn->read_bucket;
+ conn_bucket = token_bucket_rw_get_read(&or_conn->bucket);
base = get_cell_network_size(or_conn->wide_circ_ids);
}
@@ -2900,12 +2901,13 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
return conn_bucket>=0 ? conn_bucket : 1<<14;
}
- if (connection_counts_as_relayed_traffic(conn, now) &&
- global_relayed_read_bucket <= global_read_bucket)
- global_bucket = global_relayed_read_bucket;
+ if (connection_counts_as_relayed_traffic(conn, now)) {
+ size_t relayed = token_bucket_rw_get_read(&global_relayed_bucket);
+ global_bucket_val = MIN(global_bucket_val, relayed);
+ }
- return connection_bucket_round_robin(base, priority,
- global_bucket, conn_bucket);
+ return connection_bucket_get_share(base, priority,
+ global_bucket_val, conn_bucket);
}
/** How many bytes at most can we write onto this connection? */
@@ -2914,8 +2916,8 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
{
int base = RELAY_PAYLOAD_SIZE;
int priority = conn->type != CONN_TYPE_DIR;
- int conn_bucket = (int)conn->outbuf_flushlen;
- int global_bucket = global_write_bucket;
+ size_t conn_bucket = conn->outbuf_flushlen;
+ size_t global_bucket_val = token_bucket_rw_get_write(&global_bucket);
if (!connection_is_rate_limited(conn)) {
/* be willing to write to local conns even if our buckets are empty */
@@ -2923,22 +2925,21 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
}
if (connection_speaks_cells(conn)) {
- /* use the per-conn write limit if it's lower, but if it's less
- * than zero just use zero */
+ /* use the per-conn write limit if it's lower */
or_connection_t *or_conn = TO_OR_CONN(conn);
if (conn->state == OR_CONN_STATE_OPEN)
- if (or_conn->write_bucket < conn_bucket)
- conn_bucket = or_conn->write_bucket >= 0 ?
- or_conn->write_bucket : 0;
+ conn_bucket = MIN(conn_bucket,
+ token_bucket_rw_get_write(&or_conn->bucket));
base = get_cell_network_size(or_conn->wide_circ_ids);
}
- if (connection_counts_as_relayed_traffic(conn, now) &&
- global_relayed_write_bucket <= global_write_bucket)
- global_bucket = global_relayed_write_bucket;
+ if (connection_counts_as_relayed_traffic(conn, now)) {
+ size_t relayed = token_bucket_rw_get_write(&global_relayed_bucket);
+ global_bucket_val = MIN(global_bucket_val, relayed);
+ }
- return connection_bucket_round_robin(base, priority,
- global_bucket, conn_bucket);
+ return connection_bucket_get_share(base, priority,
+ global_bucket_val, conn_bucket);
}
/** Return 1 if the global write buckets are low enough that we
@@ -2963,27 +2964,31 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
int
global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
{
- int smaller_bucket = global_write_bucket < global_relayed_write_bucket ?
- global_write_bucket : global_relayed_write_bucket;
+ size_t smaller_bucket =
+ MIN(token_bucket_rw_get_write(&global_bucket),
+ token_bucket_rw_get_write(&global_relayed_bucket));
if (authdir_mode(get_options()) && priority>1)
return 0; /* there's always room to answer v2 if we're an auth dir */
if (!connection_is_rate_limited(conn))
return 0; /* local conns don't get limited */
- if (smaller_bucket < (int)attempt)
+ if (smaller_bucket < attempt)
return 1; /* not enough space no matter the priority */
- if (write_buckets_empty_last_second)
- return 1; /* we're already hitting our limits, no more please */
+ {
+ const time_t diff = approx_time() - write_buckets_last_empty_at;
+ if (diff <= 1)
+ return 1; /* we're already hitting our limits, no more please */
+ }
if (priority == 1) { /* old-style v1 query */
/* Could we handle *two* of these requests within the next two seconds? */
const or_options_t *options = get_options();
- int64_t can_write = (int64_t)smaller_bucket
+ size_t can_write = (size_t) (smaller_bucket
+ 2*(options->RelayBandwidthRate ? options->RelayBandwidthRate :
- options->BandwidthRate);
- if (can_write < 2*(int64_t)attempt)
+ options->BandwidthRate));
+ if (can_write < 2*attempt)
return 1;
} else { /* v2 query */
/* no further constraints yet */
@@ -2991,6 +2996,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
return 0;
}
+/** When did we last tell the accounting subsystem about transmitted
+ * bandwidth? */
+static time_t last_recorded_accounting_at = 0;
+
/** Helper: adjusts our bandwidth history and informs the controller as
* appropriate, given that we have just read <b>num_read</b> bytes and written
* <b>num_written</b> bytes on <b>conn</b>. */
@@ -3021,59 +3030,22 @@ record_num_bytes_transferred_impl(connection_t *conn,
}
if (conn->type == CONN_TYPE_EXIT)
rep_hist_note_exit_bytes(conn->port, num_written, num_read);
-}
-/** Helper: convert given <b>tvnow</b> time value to milliseconds since
- * midnight. */
-static uint32_t
-msec_since_midnight(const struct timeval *tvnow)
-{
- return (uint32_t)(((tvnow->tv_sec % 86400L) * 1000L) +
- ((uint32_t)tvnow->tv_usec / (uint32_t)1000L));
-}
+ /* Remember these bytes towards statistics. */
+ stats_increment_bytes_read_and_written(num_read, num_written);
-/** Helper: return the time in milliseconds since <b>last_empty_time</b>
- * when a bucket ran empty that previously had <b>tokens_before</b> tokens
- * now has <b>tokens_after</b> tokens after refilling at timestamp
- * <b>tvnow</b>, capped at <b>milliseconds_elapsed</b> milliseconds since
- * last refilling that bucket. Return 0 if the bucket has not been empty
- * since the last refill or has not been refilled. */
-uint32_t
-bucket_millis_empty(int tokens_before, uint32_t last_empty_time,
- int tokens_after, int milliseconds_elapsed,
- const struct timeval *tvnow)
-{
- uint32_t result = 0, refilled;
- if (tokens_before <= 0 && tokens_after > tokens_before) {
- refilled = msec_since_midnight(tvnow);
- result = (uint32_t)((refilled + 86400L * 1000L - last_empty_time) %
- (86400L * 1000L));
- if (result > (uint32_t)milliseconds_elapsed)
- result = (uint32_t)milliseconds_elapsed;
- }
- return result;
-}
-
-/** Check if a bucket which had <b>tokens_before</b> tokens and which got
- * <b>tokens_removed</b> tokens removed at timestamp <b>tvnow</b> has run
- * out of tokens, and if so, note the milliseconds since midnight in
- * <b>timestamp_var</b> for the next TB_EMPTY event. */
-void
-connection_buckets_note_empty_ts(uint32_t *timestamp_var,
- int tokens_before, size_t tokens_removed,
- const struct timeval *tvnow)
-{
- if (tokens_before > 0 && (uint32_t)tokens_before <= tokens_removed)
- *timestamp_var = msec_since_midnight(tvnow);
+ /* Remember these bytes towards accounting. */
+ if (accounting_is_enabled(get_options())) {
+ if (now > last_recorded_accounting_at && last_recorded_accounting_at) {
+ accounting_add_bytes(num_read, num_written,
+ (int)(now - last_recorded_accounting_at));
+ } else {
+ accounting_add_bytes(num_read, num_written, 0);
+ }
+ last_recorded_accounting_at = now;
+ }
}
-/** Last time at which the global or relay buckets were emptied in msec
- * since midnight. */
-static uint32_t global_relayed_read_emptied = 0,
- global_relayed_write_emptied = 0,
- global_read_emptied = 0,
- global_write_emptied = 0;
-
/** We just read <b>num_read</b> and wrote <b>num_written</b> bytes
* onto <b>conn</b>. Decrement buckets appropriately. */
static void
@@ -3098,45 +3070,54 @@ connection_buckets_decrement(connection_t *conn, time_t now,
if (!connection_is_rate_limited(conn))
return; /* local IPs are free */
- /* If one or more of our token buckets ran dry just now, note the
- * timestamp for TB_EMPTY events. */
- if (get_options()->TestingEnableTbEmptyEvent) {
- struct timeval tvnow;
- tor_gettimeofday_cached(&tvnow);
- if (connection_counts_as_relayed_traffic(conn, now)) {
- connection_buckets_note_empty_ts(&global_relayed_read_emptied,
- global_relayed_read_bucket, num_read, &tvnow);
- connection_buckets_note_empty_ts(&global_relayed_write_emptied,
- global_relayed_write_bucket, num_written, &tvnow);
- }
- connection_buckets_note_empty_ts(&global_read_emptied,
- global_read_bucket, num_read, &tvnow);
- connection_buckets_note_empty_ts(&global_write_emptied,
- global_write_bucket, num_written, &tvnow);
- if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
- or_connection_t *or_conn = TO_OR_CONN(conn);
- connection_buckets_note_empty_ts(&or_conn->read_emptied_time,
- or_conn->read_bucket, num_read, &tvnow);
- connection_buckets_note_empty_ts(&or_conn->write_emptied_time,
- or_conn->write_bucket, num_written, &tvnow);
- }
+ unsigned flags = 0;
+ if (connection_counts_as_relayed_traffic(conn, now)) {
+ flags = token_bucket_rw_dec(&global_relayed_bucket, num_read, num_written);
}
+ flags |= token_bucket_rw_dec(&global_bucket, num_read, num_written);
- if (connection_counts_as_relayed_traffic(conn, now)) {
- global_relayed_read_bucket -= (int)num_read;
- global_relayed_write_bucket -= (int)num_written;
+ if (flags & TB_WRITE) {
+ write_buckets_last_empty_at = now;
}
- global_read_bucket -= (int)num_read;
- global_write_bucket -= (int)num_written;
if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
- TO_OR_CONN(conn)->read_bucket -= (int)num_read;
- TO_OR_CONN(conn)->write_bucket -= (int)num_written;
+ or_connection_t *or_conn = TO_OR_CONN(conn);
+ token_bucket_rw_dec(&or_conn->bucket, num_read, num_written);
}
}
+/**
+ * Mark <b>conn</b> as needing to stop reading because bandwidth has been
+ * exhausted. If <b>is_global_bw</b>, it is closing because global bandwidth
+ * limit has been exhausted. Otherwise, it is closing because its own
+ * bandwidth limit has been exhausted.
+ */
+void
+connection_read_bw_exhausted(connection_t *conn, bool is_global_bw)
+{
+ (void)is_global_bw;
+ conn->read_blocked_on_bw = 1;
+ connection_stop_reading(conn);
+ reenable_blocked_connection_schedule();
+}
+
+/**
+ * Mark <b>conn</b> as needing to stop reading because write bandwidth has
+ * been exhausted. If <b>is_global_bw</b>, it is closing because global
+ * bandwidth limit has been exhausted. Otherwise, it is closing because its
+ * own bandwidth limit has been exhausted.
+*/
+void
+connection_write_bw_exhausted(connection_t *conn, bool is_global_bw)
+{
+ (void)is_global_bw;
+ conn->write_blocked_on_bw = 1;
+ connection_stop_writing(conn);
+ reenable_blocked_connection_schedule();
+}
+
/** If we have exhausted our global buckets, or the buckets for conn,
* stop reading. */
-static void
+void
connection_consider_empty_read_buckets(connection_t *conn)
{
const char *reason;
@@ -3144,26 +3125,28 @@ connection_consider_empty_read_buckets(connection_t *conn)
if (!connection_is_rate_limited(conn))
return; /* Always okay. */
- if (global_read_bucket <= 0) {
+ int is_global = 1;
+
+ if (token_bucket_rw_get_read(&global_bucket) <= 0) {
reason = "global read bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
- global_relayed_read_bucket <= 0) {
+ token_bucket_rw_get_read(&global_relayed_bucket) <= 0) {
reason = "global relayed read bucket exhausted. Pausing.";
} else if (connection_speaks_cells(conn) &&
conn->state == OR_CONN_STATE_OPEN &&
- TO_OR_CONN(conn)->read_bucket <= 0) {
+ token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) {
reason = "connection read bucket exhausted. Pausing.";
+ is_global = false;
} else
return; /* all good, no need to stop it */
LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
- conn->read_blocked_on_bw = 1;
- connection_stop_reading(conn);
+ connection_read_bw_exhausted(conn, is_global);
}
/** If we have exhausted our global buckets, or the buckets for conn,
* stop writing. */
-static void
+void
connection_consider_empty_write_buckets(connection_t *conn)
{
const char *reason;
@@ -3171,233 +3154,166 @@ connection_consider_empty_write_buckets(connection_t *conn)
if (!connection_is_rate_limited(conn))
return; /* Always okay. */
- if (global_write_bucket <= 0) {
+ bool is_global = true;
+ if (token_bucket_rw_get_write(&global_bucket) <= 0) {
reason = "global write bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
- global_relayed_write_bucket <= 0) {
+ token_bucket_rw_get_write(&global_relayed_bucket) <= 0) {
reason = "global relayed write bucket exhausted. Pausing.";
} else if (connection_speaks_cells(conn) &&
conn->state == OR_CONN_STATE_OPEN &&
- TO_OR_CONN(conn)->write_bucket <= 0) {
+ token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) <= 0) {
reason = "connection write bucket exhausted. Pausing.";
+ is_global = false;
} else
return; /* all good, no need to stop it */
LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
- conn->write_blocked_on_bw = 1;
- connection_stop_writing(conn);
+ connection_write_bw_exhausted(conn, is_global);
}
-/** Initialize the global read bucket to options-\>BandwidthBurst. */
+/** Initialize the global buckets to the values configured in the
+ * options */
void
connection_bucket_init(void)
{
const or_options_t *options = get_options();
- /* start it at max traffic */
- global_read_bucket = (int)options->BandwidthBurst;
- global_write_bucket = (int)options->BandwidthBurst;
+ const uint32_t now_ts = monotime_coarse_get_stamp();
+ token_bucket_rw_init(&global_bucket,
+ (int32_t)options->BandwidthRate,
+ (int32_t)options->BandwidthBurst,
+ now_ts);
if (options->RelayBandwidthRate) {
- global_relayed_read_bucket = (int)options->RelayBandwidthBurst;
- global_relayed_write_bucket = (int)options->RelayBandwidthBurst;
+ token_bucket_rw_init(&global_relayed_bucket,
+ (int32_t)options->RelayBandwidthRate,
+ (int32_t)options->RelayBandwidthBurst,
+ now_ts);
} else {
- global_relayed_read_bucket = (int)options->BandwidthBurst;
- global_relayed_write_bucket = (int)options->BandwidthBurst;
+ token_bucket_rw_init(&global_relayed_bucket,
+ (int32_t)options->BandwidthRate,
+ (int32_t)options->BandwidthBurst,
+ now_ts);
}
+
+ reenable_blocked_connection_init(options);
}
-/** Refill a single <b>bucket</b> called <b>name</b> with bandwidth rate per
- * second <b>rate</b> and bandwidth burst <b>burst</b>, assuming that
- * <b>milliseconds_elapsed</b> milliseconds have passed since the last
- * call. */
-static void
-connection_bucket_refill_helper(int *bucket, int rate, int burst,
- int milliseconds_elapsed,
- const char *name)
+/** Update the global connection bucket settings to a new value. */
+void
+connection_bucket_adjust(const or_options_t *options)
{
- int starting_bucket = *bucket;
- if (starting_bucket < burst && milliseconds_elapsed > 0) {
- int64_t incr = (((int64_t)rate) * milliseconds_elapsed) / 1000;
- if ((burst - starting_bucket) < incr) {
- *bucket = burst; /* We would overflow the bucket; just set it to
- * the maximum. */
- } else {
- *bucket += (int)incr;
- if (*bucket > burst || *bucket < starting_bucket) {
- /* If we overflow the burst, or underflow our starting bucket,
- * cap the bucket value to burst. */
- /* XXXX this might be redundant now, but it doesn't show up
- * in profiles. Remove it after analysis. */
- *bucket = burst;
- }
- }
- log_debug(LD_NET,"%s now %d.", name, *bucket);
+ token_bucket_rw_adjust(&global_bucket,
+ (int32_t)options->BandwidthRate,
+ (int32_t)options->BandwidthBurst);
+ if (options->RelayBandwidthRate) {
+ token_bucket_rw_adjust(&global_relayed_bucket,
+ (int32_t)options->RelayBandwidthRate,
+ (int32_t)options->RelayBandwidthBurst);
+ } else {
+ token_bucket_rw_adjust(&global_relayed_bucket,
+ (int32_t)options->BandwidthRate,
+ (int32_t)options->BandwidthBurst);
}
}
-/** Time has passed; increment buckets appropriately. */
-void
-connection_bucket_refill(int milliseconds_elapsed, time_t now)
+/**
+ * Cached value of the last coarse-timestamp when we refilled the
+ * global buckets.
+ */
+static uint32_t last_refilled_global_buckets_ts=0;
+/**
+ * Refill the token buckets for a single connection <b>conn</b>, and the
+ * global token buckets as appropriate. Requires that <b>now_ts</b> is
+ * the time in coarse timestamp units.
+ */
+static void
+connection_bucket_refill_single(connection_t *conn, uint32_t now_ts)
{
- const or_options_t *options = get_options();
- smartlist_t *conns = get_connection_array();
- int bandwidthrate, bandwidthburst, relayrate, relayburst;
+ /* Note that we only check for equality here: the underlying
+ * token bucket functions can handle moving backwards in time if they
+ * need to. */
+ if (now_ts != last_refilled_global_buckets_ts) {
+ token_bucket_rw_refill(&global_bucket, now_ts);
+ token_bucket_rw_refill(&global_relayed_bucket, now_ts);
+ last_refilled_global_buckets_ts = now_ts;
+ }
- int prev_global_read = global_read_bucket;
- int prev_global_write = global_write_bucket;
- int prev_relay_read = global_relayed_read_bucket;
- int prev_relay_write = global_relayed_write_bucket;
- struct timeval tvnow; /*< Only used if TB_EMPTY events are enabled. */
+ if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
+ or_connection_t *or_conn = TO_OR_CONN(conn);
+ token_bucket_rw_refill(&or_conn->bucket, now_ts);
+ }
+}
- bandwidthrate = (int)options->BandwidthRate;
- bandwidthburst = (int)options->BandwidthBurst;
+/**
+ * Event to re-enable all connections that were previously blocked on read or
+ * write.
+ */
+static mainloop_event_t *reenable_blocked_connections_ev = NULL;
- if (options->RelayBandwidthRate) {
- relayrate = (int)options->RelayBandwidthRate;
- relayburst = (int)options->RelayBandwidthBurst;
- } else {
- relayrate = bandwidthrate;
- relayburst = bandwidthburst;
- }
-
- tor_assert(milliseconds_elapsed >= 0);
-
- write_buckets_empty_last_second =
- global_relayed_write_bucket <= 0 || global_write_bucket <= 0;
-
- /* refill the global buckets */
- connection_bucket_refill_helper(&global_read_bucket,
- bandwidthrate, bandwidthburst,
- milliseconds_elapsed,
- "global_read_bucket");
- connection_bucket_refill_helper(&global_write_bucket,
- bandwidthrate, bandwidthburst,
- milliseconds_elapsed,
- "global_write_bucket");
- connection_bucket_refill_helper(&global_relayed_read_bucket,
- relayrate, relayburst,
- milliseconds_elapsed,
- "global_relayed_read_bucket");
- connection_bucket_refill_helper(&global_relayed_write_bucket,
- relayrate, relayburst,
- milliseconds_elapsed,
- "global_relayed_write_bucket");
-
- /* If buckets were empty before and have now been refilled, tell any
- * interested controllers. */
- if (get_options()->TestingEnableTbEmptyEvent) {
- uint32_t global_read_empty_time, global_write_empty_time,
- relay_read_empty_time, relay_write_empty_time;
- tor_gettimeofday_cached(&tvnow);
- global_read_empty_time = bucket_millis_empty(prev_global_read,
- global_read_emptied, global_read_bucket,
- milliseconds_elapsed, &tvnow);
- global_write_empty_time = bucket_millis_empty(prev_global_write,
- global_write_emptied, global_write_bucket,
- milliseconds_elapsed, &tvnow);
- control_event_tb_empty("GLOBAL", global_read_empty_time,
- global_write_empty_time, milliseconds_elapsed);
- relay_read_empty_time = bucket_millis_empty(prev_relay_read,
- global_relayed_read_emptied,
- global_relayed_read_bucket,
- milliseconds_elapsed, &tvnow);
- relay_write_empty_time = bucket_millis_empty(prev_relay_write,
- global_relayed_write_emptied,
- global_relayed_write_bucket,
- milliseconds_elapsed, &tvnow);
- control_event_tb_empty("RELAY", relay_read_empty_time,
- relay_write_empty_time, milliseconds_elapsed);
- }
-
- /* refill the per-connection buckets */
- SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
- if (connection_speaks_cells(conn)) {
- or_connection_t *or_conn = TO_OR_CONN(conn);
- int orbandwidthrate = or_conn->bandwidthrate;
- int orbandwidthburst = or_conn->bandwidthburst;
-
- int prev_conn_read = or_conn->read_bucket;
- int prev_conn_write = or_conn->write_bucket;
-
- if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) {
- connection_bucket_refill_helper(&or_conn->read_bucket,
- orbandwidthrate,
- orbandwidthburst,
- milliseconds_elapsed,
- "or_conn->read_bucket");
- }
- if (connection_bucket_should_increase(or_conn->write_bucket, or_conn)) {
- connection_bucket_refill_helper(&or_conn->write_bucket,
- orbandwidthrate,
- orbandwidthburst,
- milliseconds_elapsed,
- "or_conn->write_bucket");
- }
+/** True iff reenable_blocked_connections_ev is currently scheduled. */
+static int reenable_blocked_connections_is_scheduled = 0;
- /* If buckets were empty before and have now been refilled, tell any
- * interested controllers. */
- if (get_options()->TestingEnableTbEmptyEvent) {
- char *bucket;
- uint32_t conn_read_empty_time, conn_write_empty_time;
- tor_asprintf(&bucket, "ORCONN ID="U64_FORMAT,
- U64_PRINTF_ARG(or_conn->base_.global_identifier));
- conn_read_empty_time = bucket_millis_empty(prev_conn_read,
- or_conn->read_emptied_time,
- or_conn->read_bucket,
- milliseconds_elapsed, &tvnow);
- conn_write_empty_time = bucket_millis_empty(prev_conn_write,
- or_conn->write_emptied_time,
- or_conn->write_bucket,
- milliseconds_elapsed, &tvnow);
- control_event_tb_empty(bucket, conn_read_empty_time,
- conn_write_empty_time,
- milliseconds_elapsed);
- tor_free(bucket);
- }
- }
+/** Delay after which to run reenable_blocked_connections_ev. */
+static struct timeval reenable_blocked_connections_delay;
- if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */
- && global_read_bucket > 0 /* and we're allowed to read */
- && (!connection_counts_as_relayed_traffic(conn, now) ||
- global_relayed_read_bucket > 0) /* even if we're relayed traffic */
- && (!connection_speaks_cells(conn) ||
- conn->state != OR_CONN_STATE_OPEN ||
- TO_OR_CONN(conn)->read_bucket > 0)) {
- /* and either a non-cell conn or a cell conn with non-empty bucket */
- LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
- "waking up conn (fd %d) for read", (int)conn->s));
- conn->read_blocked_on_bw = 0;
+/**
+ * Re-enable all connections that were previously blocked on read or write.
+ * This event is scheduled after enough time has elapsed to be sure
+ * that the buckets will refill when the connections have something to do.
+ */
+static void
+reenable_blocked_connections_cb(mainloop_event_t *ev, void *arg)
+{
+ (void)ev;
+ (void)arg;
+ SMARTLIST_FOREACH_BEGIN(get_connection_array(), connection_t *, conn) {
+ if (conn->read_blocked_on_bw == 1) {
connection_start_reading(conn);
+ conn->read_blocked_on_bw = 0;
}
-
- if (conn->write_blocked_on_bw == 1
- && global_write_bucket > 0 /* and we're allowed to write */
- && (!connection_counts_as_relayed_traffic(conn, now) ||
- global_relayed_write_bucket > 0) /* even if it's relayed traffic */
- && (!connection_speaks_cells(conn) ||
- conn->state != OR_CONN_STATE_OPEN ||
- TO_OR_CONN(conn)->write_bucket > 0)) {
- LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
- "waking up conn (fd %d) for write", (int)conn->s));
- conn->write_blocked_on_bw = 0;
+ if (conn->write_blocked_on_bw == 1) {
connection_start_writing(conn);
+ conn->write_blocked_on_bw = 0;
}
} SMARTLIST_FOREACH_END(conn);
+
+ reenable_blocked_connections_is_scheduled = 0;
}
-/** Is the <b>bucket</b> for connection <b>conn</b> low enough that we
- * should add another pile of tokens to it?
+/**
+ * Initialize the mainloop event that we use to wake up connections that
+ * find themselves blocked on bandwidth.
*/
-static int
-connection_bucket_should_increase(int bucket, or_connection_t *conn)
+static void
+reenable_blocked_connection_init(const or_options_t *options)
{
- tor_assert(conn);
-
- if (conn->base_.state != OR_CONN_STATE_OPEN)
- return 0; /* only open connections play the rate limiting game */
- if (bucket >= conn->bandwidthburst)
- return 0;
+ if (! reenable_blocked_connections_ev) {
+ reenable_blocked_connections_ev =
+ mainloop_event_new(reenable_blocked_connections_cb, NULL);
+ reenable_blocked_connections_is_scheduled = 0;
+ }
+ time_t sec = options->TokenBucketRefillInterval / 1000;
+ int msec = (options->TokenBucketRefillInterval % 1000);
+ reenable_blocked_connections_delay.tv_sec = sec;
+ reenable_blocked_connections_delay.tv_usec = msec * 1000;
+}
- return 1;
+/**
+ * Called when we have blocked a connection for being low on bandwidth:
+ * schedule an event to reenable such connections, if it is not already
+ * scheduled.
+ */
+static void
+reenable_blocked_connection_schedule(void)
+{
+ if (reenable_blocked_connections_is_scheduled)
+ return;
+ if (BUG(reenable_blocked_connections_ev == NULL)) {
+ reenable_blocked_connection_init(get_options());
+ }
+ mainloop_event_schedule(reenable_blocked_connections_ev,
+ &reenable_blocked_connections_delay);
+ reenable_blocked_connections_is_scheduled = 1;
}
/** Read bytes from conn-\>s and process them.
@@ -3418,7 +3334,9 @@ connection_handle_read_impl(connection_t *conn)
if (conn->marked_for_close)
return 0; /* do nothing */
- conn->timestamp_lastread = approx_time();
+ conn->timestamp_last_read_allowed = approx_time();
+
+ connection_bucket_refill_single(conn, monotime_coarse_get_stamp());
switch (conn->type) {
case CONN_TYPE_OR_LISTENER:
@@ -3525,8 +3443,7 @@ int
connection_handle_read(connection_t *conn)
{
int res;
-
- tor_gettimeofday_cache_clear();
+ update_current_time(time(NULL));
res = connection_handle_read_impl(conn);
return res;
}
@@ -3678,25 +3595,15 @@ connection_buf_read_from_socket(connection_t *conn, ssize_t *max_to_read,
/* change *max_to_read */
*max_to_read = at_most - n_read;
- /* Update edge_conn->n_read and ocirc->n_read_circ_bw */
+ /* Update edge_conn->n_read */
if (conn->type == CONN_TYPE_AP) {
edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
- circuit_t *circ = circuit_get_by_edge_conn(edge_conn);
- origin_circuit_t *ocirc;
/* Check for overflow: */
if (PREDICT_LIKELY(UINT32_MAX - edge_conn->n_read > n_read))
edge_conn->n_read += (int)n_read;
else
edge_conn->n_read = UINT32_MAX;
-
- if (circ && CIRCUIT_IS_ORIGIN(circ)) {
- ocirc = TO_ORIGIN_CIRCUIT(circ);
- if (PREDICT_LIKELY(UINT32_MAX - ocirc->n_read_circ_bw > n_read))
- ocirc->n_read_circ_bw += (int)n_read;
- else
- ocirc->n_read_circ_bw = UINT32_MAX;
- }
}
/* If CONN_BW events are enabled, update conn->n_read_conn_bw for
@@ -3782,7 +3689,6 @@ connection_outbuf_too_full(connection_t *conn)
* This should help fix slow upload rates.
*/
static void
-
update_send_buffer_size(tor_socket_t sock)
{
#ifdef _WIN32
@@ -3819,7 +3725,7 @@ update_send_buffer_size(tor_socket_t sock)
* when libevent tells us that conn wants to write, or below
* from connection_buf_add() when an entire TLS record is ready.
*
- * Update <b>conn</b>-\>timestamp_lastwritten to now, and call flush_buf
+ * Update <b>conn</b>-\>timestamp_last_write_allowed to now, and call flush_buf
* or flush_buf_tls appropriately. If it succeeds and there are no more
* more bytes on <b>conn</b>-\>outbuf, then call connection_finished_flushing
* on it too.
@@ -3852,7 +3758,9 @@ connection_handle_write_impl(connection_t *conn, int force)
return 0;
}
- conn->timestamp_lastwritten = now;
+ conn->timestamp_last_write_allowed = now;
+
+ connection_bucket_refill_single(conn, monotime_coarse_get_stamp());
/* Sometimes, "writable" means "connected". */
if (connection_state_is_connecting(conn)) {
@@ -3967,8 +3875,7 @@ connection_handle_write_impl(connection_t *conn, int force)
/* Make sure to avoid a loop if the receive buckets are empty. */
log_debug(LD_NET,"wanted read.");
if (!connection_is_reading(conn)) {
- connection_stop_writing(conn);
- conn->write_blocked_on_bw = 1;
+ connection_write_bw_exhausted(conn, true);
/* we'll start reading again when we get more tokens in our
* read bucket; then we'll start writing again too.
*/
@@ -4014,22 +3921,12 @@ connection_handle_write_impl(connection_t *conn, int force)
if (n_written && conn->type == CONN_TYPE_AP) {
edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
- circuit_t *circ = circuit_get_by_edge_conn(edge_conn);
- origin_circuit_t *ocirc;
/* Check for overflow: */
if (PREDICT_LIKELY(UINT32_MAX - edge_conn->n_written > n_written))
edge_conn->n_written += (int)n_written;
else
edge_conn->n_written = UINT32_MAX;
-
- if (circ && CIRCUIT_IS_ORIGIN(circ)) {
- ocirc = TO_ORIGIN_CIRCUIT(circ);
- if (PREDICT_LIKELY(UINT32_MAX - ocirc->n_written_circ_bw > n_written))
- ocirc->n_written_circ_bw += (int)n_written;
- else
- ocirc->n_written_circ_bw = UINT32_MAX;
- }
}
/* If CONN_BW events are enabled, update conn->n_written_conn_bw for
@@ -4089,7 +3986,7 @@ int
connection_handle_write(connection_t *conn, int force)
{
int res;
- tor_gettimeofday_cache_clear();
+ update_current_time(time(NULL));
conn->in_connection_handle_write = 1;
res = connection_handle_write_impl(conn, force);
conn->in_connection_handle_write = 0;
@@ -4528,8 +4425,6 @@ alloc_http_authenticator(const char *authenticator)
static void
client_check_address_changed(tor_socket_t sock)
{
- struct sockaddr_storage out_sockaddr;
- socklen_t out_addr_len = (socklen_t) sizeof(out_sockaddr);
tor_addr_t out_addr, iface_addr;
tor_addr_t **last_interface_ip_ptr;
sa_family_t family;
@@ -4537,13 +4432,12 @@ client_check_address_changed(tor_socket_t sock)
if (!outgoing_addrs)
outgoing_addrs = smartlist_new();
- if (getsockname(sock, (struct sockaddr*)&out_sockaddr, &out_addr_len)<0) {
+ if (tor_addr_from_getsockname(&out_addr, sock) < 0) {
int e = tor_socket_errno(sock);
log_warn(LD_NET, "getsockname() to check for address change failed: %s",
tor_socket_strerror(e));
return;
}
- tor_addr_from_sockaddr(&out_addr, (struct sockaddr*)&out_sockaddr, NULL);
family = tor_addr_family(&out_addr);
if (family == AF_INET)
@@ -5331,6 +5225,11 @@ connection_free_all(void)
tor_free(last_interface_ipv4);
tor_free(last_interface_ipv6);
+ last_recorded_accounting_at = 0;
+
+ mainloop_event_free(reenable_blocked_connections_ev);
+ reenable_blocked_connections_is_scheduled = 0;
+ memset(&reenable_blocked_connections_delay, 0, sizeof(struct timeval));
}
/** Log a warning, and possibly emit a control event, that <b>received</b> came
@@ -5338,10 +5237,10 @@ connection_free_all(void)
* that we had more faith in and therefore the warning level should have higher
* severity.
*/
-void
-clock_skew_warning(const connection_t *conn, long apparent_skew, int trusted,
- log_domain_mask_t domain, const char *received,
- const char *source)
+MOCK_IMPL(void,
+clock_skew_warning, (const connection_t *conn, long apparent_skew, int trusted,
+ log_domain_mask_t domain, const char *received,
+ const char *source))
{
char dbuf[64];
char *ext_source = NULL, *warn = NULL;