aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2010-02-22 13:59:34 -0500
committerNick Mathewson <nickm@torproject.org>2010-09-27 14:22:18 -0400
commitffd5070b04b3db4409d8e3dc933ffc7d12b5219d (patch)
treef1fcfa7d71f88f9b005637aa6df804d61f1f3085 /src
parent98ec959c9c1d2b0a872926a3cafe42742520397a (diff)
downloadtor-ffd5070b04b3db4409d8e3dc933ffc7d12b5219d.tar.gz
tor-ffd5070b04b3db4409d8e3dc933ffc7d12b5219d.zip
Convert bufferevents to use rate-limiting.
This requires the latest Git version of Libevent as of 24 March 2010. In the future, we'll just say it requires Libevent 2.0.5-alpha or later. Since Libevent doesn't yet support hierarchical rate limit groups, there isn't yet support for tracking relayed-bytes separately when using the bufferevent system. If a future version does add support for hierarchical buckets, we can add that back in.
Diffstat (limited to 'src')
-rw-r--r--src/common/compat_libevent.c24
-rw-r--r--src/common/compat_libevent.h5
-rw-r--r--src/common/tortls.c3
-rw-r--r--src/or/config.c11
-rw-r--r--src/or/connection.c118
-rw-r--r--src/or/connection.h3
-rw-r--r--src/or/connection_or.c19
-rw-r--r--src/or/main.c25
-rw-r--r--src/or/or.h6
9 files changed, 210 insertions, 4 deletions
diff --git a/src/common/compat_libevent.c b/src/common/compat_libevent.c
index 250fa2bdb7..2ae280e669 100644
--- a/src/common/compat_libevent.c
+++ b/src/common/compat_libevent.c
@@ -551,3 +551,27 @@ periodic_timer_free(periodic_timer_t *timer)
tor_free(timer);
}
+#ifdef USE_BUFFEREVENTS
+static const struct timeval *one_tick = NULL;
+/**
+ DOCDOC
+*/
+const struct timeval *tor_libevent_get_one_tick_timeout(void)
+{
+
+ if (PREDICT_UNLIKELY(one_tick == NULL)) {
+ struct event_base *base = tor_libevent_get_base();
+ struct timeval tv;
+ if (TOR_LIBEVENT_TICKS_PER_SECOND == 1) {
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ } else {
+ tv.tv_sec = 0;
+ tv.tv_usec = 1000000 / TOR_LIBEVENT_TICKS_PER_SECOND;
+ }
+ one_tick = event_base_init_common_timeout(base, &tv);
+ }
+ return one_tick;
+}
+#endif
+
diff --git a/src/common/compat_libevent.h b/src/common/compat_libevent.h
index a4011e37af..f483d6ee6d 100644
--- a/src/common/compat_libevent.h
+++ b/src/common/compat_libevent.h
@@ -64,5 +64,10 @@ void tor_check_libevent_version(const char *m, int server,
void tor_check_libevent_header_compatibility(void);
const char *tor_libevent_get_version_str(void);
+#ifdef USE_BUFFEREVENTS
+#define TOR_LIBEVENT_TICKS_PER_SECOND 3
+const struct timeval *tor_libevent_get_one_tick_timeout(void);
+#endif
+
#endif
diff --git a/src/common/tortls.c b/src/common/tortls.c
index 06533ca43b..3ae3ef8835 100644
--- a/src/common/tortls.c
+++ b/src/common/tortls.c
@@ -1699,7 +1699,6 @@ tor_tls_init_bufferevent(tor_tls_t *tls, struct bufferevent *bufev_in,
state,
BEV_OPT_DEFER_CALLBACKS);
#else
- /* Disabled: just use filter for now. */
if (bufev_in) {
evutil_socket_t s = bufferevent_getfd(bufev_in);
tor_assert(s == -1 || s == socket);
@@ -1715,7 +1714,7 @@ tor_tls_init_bufferevent(tor_tls_t *tls, struct bufferevent *bufev_in,
tls->ssl,
state,
0);
- //BEV_OPT_DEFER_CALLBACKS);
+ //BEV_OPT_DEFER_CALLBACKS);
#endif
return out;
}
diff --git a/src/or/config.c b/src/or/config.c
index 8febe7a56b..fa2eb73beb 100644
--- a/src/or/config.c
+++ b/src/or/config.c
@@ -1231,6 +1231,17 @@ options_act(or_options_t *old_options)
if (accounting_is_enabled(options))
configure_accounting(time(NULL));
+#ifdef USE_BUFFEREVENTS
+ /* If we're using the bufferevents implementation and our rate limits
+ * changed, we need to tell the rate-limiting system about it. */
+ if (!old_options ||
+ old_options->BandwidthRate != options->BandwidthRate ||
+ old_options->BandwidthBurst != options->BandwidthBurst ||
+ old_options->RelayBandwidthRate != options->RelayBandwidthRate ||
+ old_options->RelayBandwidthBurst != options->RelayBandwidthBurst)
+ connection_bucket_init();
+#endif
+
/* Change the cell EWMA settings */
cell_ewma_set_scale_factor(options, networkstatus_get_latest_consensus());
diff --git a/src/or/connection.c b/src/or/connection.c
index 2944a0d4bb..b6f0d5d69d 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -49,8 +49,10 @@ static void connection_init(time_t now, connection_t *conn, int type,
static int connection_init_accepted_conn(connection_t *conn,
uint8_t listener_type);
static int connection_handle_listener_read(connection_t *conn, int new_type);
+#ifndef USE_BUFFEREVENTS
static int connection_bucket_should_increase(int bucket,
or_connection_t *conn);
+#endif
static int connection_finished_flushing(connection_t *conn);
static int connection_flushed_some(connection_t *conn);
static int connection_finished_connecting(connection_t *conn);
@@ -199,6 +201,7 @@ connection_type_uses_bufferevent(connection_t *conn)
case CONN_TYPE_DIR:
case CONN_TYPE_CONTROL:
case CONN_TYPE_OR:
+ case CONN_TYPE_CPUWORKER:
return 1;
default:
return 0;
@@ -452,6 +455,8 @@ _connection_free(connection_t *conn)
tor_free(conn->read_event); /* Probably already freed by connection_free. */
tor_free(conn->write_event); /* Probably already freed by connection_free. */
IF_HAS_BUFFEREVENT(conn, {
+ /* XXXX this is a workaround. */
+ bufferevent_setcb(conn->bufev, NULL, NULL, NULL, NULL);
bufferevent_free(conn->bufev);
conn->bufev = NULL;
});
@@ -481,6 +486,11 @@ _connection_free(connection_t *conn)
log_warn(LD_BUG, "called on OR conn with non-zeroed identity_digest");
connection_or_remove_from_identity_map(TO_OR_CONN(conn));
}
+#ifdef USE_BUFFEREVENTS
+ if (conn->type == CONN_TYPE_OR && TO_OR_CONN(conn)->bucket_cfg) {
+ ev_token_bucket_cfg_free(TO_OR_CONN(conn)->bucket_cfg);
+ }
+#endif
memset(mem, 0xCC, memlen); /* poison memory */
tor_free(mem);
@@ -1945,6 +1955,9 @@ connection_is_rate_limited(connection_t *conn)
return 1;
}
+#ifdef USE_BUFFEREVENTS
+static struct bufferevent_rate_limit_group *global_rate_limit = NULL;
+#else
extern int global_read_bucket, global_write_bucket;
extern int global_relayed_read_bucket, global_relayed_write_bucket;
@@ -1952,11 +1965,13 @@ extern int global_relayed_read_bucket, global_relayed_write_bucket;
* we are likely to run dry again this second, so be stingy with the
* tokens we just put in. */
static int write_buckets_empty_last_second = 0;
+#endif
/** How many seconds of no active local circuits will make the
* connection revert to the "relayed" bandwidth class? */
#define CLIENT_IDLE_TIME_FOR_PRIORITY 30
+#ifndef USE_BUFFEREVENTS
/** Return 1 if <b>conn</b> should use tokens from the "relayed"
* bandwidth rates, else 0. Currently, only OR conns with bandwidth
* class 1, and directory conns that are serving data out, count.
@@ -2067,6 +2082,20 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
return connection_bucket_round_robin(base, priority,
global_bucket, conn_bucket);
}
+#else
+static ssize_t
+connection_bucket_read_limit(connection_t *conn, time_t now)
+{
+ (void) now;
+ return bufferevent_get_max_to_read(conn->bufev);
+}
+ssize_t
+connection_bucket_write_limit(connection_t *conn, time_t now)
+{
+ (void) now;
+ return bufferevent_get_max_to_write(conn->bufev);
+}
+#endif
/** Return 1 if the global write buckets are low enough that we
* shouldn't send <b>attempt</b> bytes of low-priority directory stuff
@@ -2091,8 +2120,12 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
int
global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
{
+#ifdef USE_BUFFEREVENTS
+ ssize_t smaller_bucket = bufferevent_get_max_to_write(conn->bufev);
+#else
int smaller_bucket = global_write_bucket < global_relayed_write_bucket ?
global_write_bucket : global_relayed_write_bucket;
+#endif
if (authdir_mode(get_options()) && priority>1)
return 0; /* there's always room to answer v2 if we're an auth dir */
@@ -2102,8 +2135,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
if (smaller_bucket < (int)attempt)
return 1; /* not enough space no matter the priority */
+#ifndef USE_BUFFEREVENTS
if (write_buckets_empty_last_second)
return 1; /* we're already hitting our limits, no more please */
+#endif
if (priority == 1) { /* old-style v1 query */
/* Could we handle *two* of these requests within the next two seconds? */
@@ -2119,6 +2154,7 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
return 0;
}
+#ifndef USE_BUFFEREVENTS
/** We just read <b>num_read</b> and wrote <b>num_written</b> bytes
* onto <b>conn</b>. Decrement buckets appropriately. */
static void
@@ -2362,6 +2398,88 @@ connection_bucket_should_increase(int bucket, or_connection_t *conn)
return 1;
}
+#else
+
+static void
+connection_buckets_decrement(connection_t *conn, time_t now,
+ size_t num_read, size_t num_written)
+{
+ (void) conn;
+ (void) now;
+ (void) num_read;
+ (void) num_written;
+ /* Libevent does this for us. */
+}
+void
+connection_bucket_refill(int seconds_elapsed, time_t now)
+{
+ (void) seconds_elapsed;
+ (void) now;
+ /* Libevent does this for us. */
+}
+void
+connection_bucket_init(void)
+{
+ or_options_t *options = get_options();
+ const struct timeval *tick = tor_libevent_get_one_tick_timeout();
+ struct ev_token_bucket_cfg *bucket_cfg;
+
+ uint64_t rate, burst;
+ if (options->RelayBandwidthRate) {
+ rate = options->RelayBandwidthRate;
+ burst = options->RelayBandwidthBurst;
+ } else {
+ rate = options->BandwidthRate;
+ burst = options->BandwidthBurst;
+ }
+
+ rate /= TOR_LIBEVENT_TICKS_PER_SECOND;
+ bucket_cfg = ev_token_bucket_cfg_new((uint32_t)rate, (uint32_t)burst,
+ (uint32_t)rate, (uint32_t)burst,
+ tick);
+
+ if (!global_rate_limit) {
+ global_rate_limit =
+ bufferevent_rate_limit_group_new(tor_libevent_get_base(), bucket_cfg);
+ } else {
+ bufferevent_rate_limit_group_set_cfg(global_rate_limit, bucket_cfg);
+ }
+ ev_token_bucket_cfg_free(bucket_cfg);
+}
+
+void
+connection_get_rate_limit_totals(uint64_t *read_out, uint64_t *written_out)
+{
+ if (global_rate_limit == NULL) {
+ *read_out = *written_out = 0;
+ } else {
+ bufferevent_rate_limit_group_get_totals(
+ global_rate_limit, read_out, written_out);
+ }
+}
+
+/** DOCDOC */
+void
+connection_enable_rate_limiting(connection_t *conn)
+{
+ if (conn->bufev) {
+ if (!global_rate_limit)
+ connection_bucket_init();
+ bufferevent_add_to_rate_limit_group(conn->bufev, global_rate_limit);
+ }
+}
+
+static void
+connection_consider_empty_write_buckets(connection_t *conn)
+{
+ (void) conn;
+}
+static void
+connection_consider_empty_read_buckets(connection_t *conn)
+{
+ (void) conn;
+}
+#endif
/** Read bytes from conn-\>s and process them.
*
diff --git a/src/or/connection.h b/src/or/connection.h
index 83aec0ef00..a40b1a5fb5 100644
--- a/src/or/connection.h
+++ b/src/or/connection.h
@@ -144,6 +144,9 @@ void connection_handle_read_cb(struct bufferevent *bufev, void *arg);
void connection_handle_write_cb(struct bufferevent *bufev, void *arg);
void connection_handle_event_cb(struct bufferevent *bufev, short event,
void *arg);
+void connection_get_rate_limit_totals(uint64_t *read_out,
+ uint64_t *written_out);
+void connection_enable_rate_limiting(connection_t *conn);
#else
#define connection_type_uses_bufferevent(c) (0)
#endif
diff --git a/src/or/connection_or.c b/src/or/connection_or.c
index 2fbc230bcd..044197d83a 100644
--- a/src/or/connection_or.c
+++ b/src/or/connection_or.c
@@ -388,6 +388,21 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset,
conn->bandwidthrate = rate;
conn->bandwidthburst = burst;
+#ifdef USE_BUFFEREVENTS
+ {
+ const struct timeval *tick = tor_libevent_get_one_tick_timeout();
+ struct ev_token_bucket_cfg *cfg, *old_cfg;
+ int rate_per_tick = rate / TOR_LIBEVENT_TICKS_PER_SECOND;
+ cfg = ev_token_bucket_cfg_new(rate_per_tick, burst, rate_per_tick,
+ burst, tick);
+ old_cfg = conn->bucket_cfg;
+ if (conn->_base.bufev)
+ bufferevent_set_rate_limit(conn->_base.bufev, cfg);
+ if (old_cfg)
+ ev_token_bucket_cfg_free(old_cfg);
+ conn->bucket_cfg = cfg;
+ }
+#else
if (reset) { /* set up the token buckets to be full */
conn->read_bucket = conn->write_bucket = burst;
return;
@@ -398,6 +413,7 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset,
conn->read_bucket = burst;
if (conn->write_bucket > burst)
conn->write_bucket = burst;
+#endif
}
/** Either our set of relays or our per-conn rate limits have changed.
@@ -879,6 +895,9 @@ connection_tls_start_handshake(or_connection_t *conn, int receiving)
return -1;
}
conn->_base.bufev = b;
+ if (conn->bucket_cfg)
+ bufferevent_set_rate_limit(conn->_base.bufev, conn->bucket_cfg);
+ connection_enable_rate_limiting(TO_CONN(conn));
bufferevent_setcb(b, connection_handle_read_cb,
connection_handle_write_cb,
connection_or_handle_event_cb,
diff --git a/src/or/main.c b/src/or/main.c
index f6f26b05f9..1979529f72 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -76,6 +76,7 @@ static int connection_should_read_from_linked_conn(connection_t *conn);
/********* START VARIABLES **********/
+#ifndef USE_BUFFEREVENTS
int global_read_bucket; /**< Max number of bytes I can read this second. */
int global_write_bucket; /**< Max number of bytes I can write this second. */
@@ -83,13 +84,17 @@ int global_write_bucket; /**< Max number of bytes I can write this second. */
int global_relayed_read_bucket;
/** Max number of relayed (bandwidth class 1) bytes I can write this second. */
int global_relayed_write_bucket;
-
/** What was the read bucket before the last second_elapsed_callback() call?
* (used to determine how many bytes we've read). */
static int stats_prev_global_read_bucket;
/** What was the write bucket before the last second_elapsed_callback() call?
* (used to determine how many bytes we've written). */
static int stats_prev_global_write_bucket;
+#else
+static uint64_t stats_prev_n_read = 0;
+static uint64_t stats_prev_n_written = 0;
+#endif
+
/* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/
/** How many bytes have we read since we started the process? */
static uint64_t stats_n_bytes_read = 0;
@@ -1395,6 +1400,9 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
size_t bytes_written;
size_t bytes_read;
int seconds_elapsed;
+#ifdef USE_BUFFEREVENTS
+ uint64_t cur_read,cur_written;
+#endif
or_options_t *options = get_options();
(void)timer;
(void)arg;
@@ -1406,9 +1414,15 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
update_approx_time(now);
/* the second has rolled over. check more stuff. */
+ seconds_elapsed = current_second ? (int)(now - current_second) : 0;
+#ifdef USE_BUFFEREVENTS
+ connection_get_rate_limit_totals(&cur_read, &cur_written);
+ bytes_written = ((size_t)cur_written) - stats_prev_n_written;
+ bytes_read = ((size_t)cur_read) - stats_prev_n_read;
+#else
bytes_written = stats_prev_global_write_bucket - global_write_bucket;
bytes_read = stats_prev_global_read_bucket - global_read_bucket;
- seconds_elapsed = current_second ? (int)(now - current_second) : 0;
+#endif
stats_n_bytes_read += bytes_read;
stats_n_bytes_written += bytes_written;
if (accounting_is_enabled(options) && seconds_elapsed >= 0)
@@ -1418,8 +1432,13 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
if (seconds_elapsed > 0)
connection_bucket_refill(seconds_elapsed, now);
+#ifdef USE_BUFFEREVENTS
+ stats_prev_n_written = cur_written;
+ stats_prev_n_read = cur_read;
+#else
stats_prev_global_read_bucket = global_read_bucket;
stats_prev_global_write_bucket = global_write_bucket;
+#endif
if (server_mode(options) &&
!we_are_hibernating() &&
@@ -1620,8 +1639,10 @@ do_main_loop(void)
/* Set up our buckets */
connection_bucket_init();
+#ifndef USE_BUFFEREVENTS
stats_prev_global_read_bucket = global_read_bucket;
stats_prev_global_write_bucket = global_write_bucket;
+#endif
/* initialize the bootstrap status events to know we're starting up */
control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0);
diff --git a/src/or/or.h b/src/or/or.h
index 8f875731f6..f4f511ad00 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -1078,10 +1078,16 @@ typedef struct or_connection_t {
/* bandwidth* and *_bucket only used by ORs in OPEN state: */
int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */
int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */
+#ifndef USE_BUFFEREVENTS
int read_bucket; /**< When this hits 0, stop receiving. Every second we
* add 'bandwidthrate' to this, capping it at
* bandwidthburst. (OPEN ORs only) */
int write_bucket; /**< When this hits 0, stop writing. Like read_bucket. */
+#else
+ /** DOCDOC */
+ /* XXXX we could share this among all connections. */
+ struct ev_token_bucket_cfg *bucket_cfg;
+#endif
int n_circuits; /**< How many circuits use this connection as p_conn or
* n_conn ? */