diff options
-rw-r--r-- | src/common/include.am | 2 | ||||
-rw-r--r-- | src/common/token_bucket.c | 180 | ||||
-rw-r--r-- | src/common/token_bucket.h | 72 | ||||
-rw-r--r-- | src/test/include.am | 1 | ||||
-rw-r--r-- | src/test/test.c | 1 | ||||
-rw-r--r-- | src/test/test.h | 1 | ||||
-rw-r--r-- | src/test/test_bwmgt.c | 199 |
7 files changed, 456 insertions, 0 deletions
diff --git a/src/common/include.am b/src/common/include.am index 73c51ff0b2..87ab9d79e9 100644 --- a/src/common/include.am +++ b/src/common/include.am @@ -97,6 +97,7 @@ LIBOR_A_SRC = \ src/common/util_process.c \ src/common/sandbox.c \ src/common/storagedir.c \ + src/common/token_bucket.c \ src/common/workqueue.c \ $(libor_extra_source) \ $(threads_impl_source) \ @@ -184,6 +185,7 @@ COMMONHEADERS = \ src/common/storagedir.h \ src/common/testsupport.h \ src/common/timers.h \ + src/common/token_bucket.h \ src/common/torint.h \ src/common/torlog.h \ src/common/tortls.h \ diff --git a/src/common/token_bucket.c b/src/common/token_bucket.c new file mode 100644 index 0000000000..f4d2cccffd --- /dev/null +++ b/src/common/token_bucket.c @@ -0,0 +1,180 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file token_bucket.c + * \brief Functions to use and manipulate token buckets, used for + * rate-limiting on connections and globally. + * + * Tor uses these token buckets to keep track of bandwidth usage, and + * sometimes other things too. + * + * The time units we use internally are based on "timestamp" units -- see + * monotime_coarse_to_stamp() for a rationale. + * + * Token buckets may become negative. + **/ + +#define TOKEN_BUCKET_PRIVATE + +#include "token_bucket.h" +#include "util_bug.h" + +/** Convert a rate in bytes per second to a rate in bytes per step */ +static uint32_t +rate_per_sec_to_rate_per_step(uint32_t rate) +{ + /* + The precise calculation we'd want to do is + + (rate / 1000) * to_approximate_msec(TICKS_PER_STEP). But to minimize + rounding error, we do it this way instead, and divide last. + */ + return (uint32_t) + monotime_coarse_stamp_units_to_approx_msec(rate*TICKS_PER_STEP)/1000; +} + +/** + * Initialize a token bucket in *<b>bucket</b>, set up to allow <b>rate</b> + * bytes per second, with a maximum burst of <b>burst</b> bytes. The bucket + * is created such that <b>now_ts</b> is the current timestamp. The bucket + * starts out full. + */ +void +token_bucket_init(token_bucket_t *bucket, + uint32_t rate, + uint32_t burst, + uint32_t now_ts) +{ + memset(bucket, 0, sizeof(token_bucket_t)); + token_bucket_adjust(bucket, rate, burst); + token_bucket_reset(bucket, now_ts); +} + +/** + * Change the configured rate (in bytes per second) and burst (in bytes) + * for the token bucket in *<b>bucket</b>. + */ +void +token_bucket_adjust(token_bucket_t *bucket, + uint32_t rate, + uint32_t burst) +{ + tor_assert_nonfatal(rate > 0); + tor_assert_nonfatal(burst > 0); + if (burst > TOKEN_BUCKET_MAX_BURST) + burst = TOKEN_BUCKET_MAX_BURST; + + bucket->rate = rate_per_sec_to_rate_per_step(rate); + bucket->burst = burst; + bucket->read_bucket = MIN(bucket->read_bucket, (int32_t)burst); + bucket->write_bucket = MIN(bucket->write_bucket, (int32_t)burst); +} + +/** + * Reset <b>bucket</b> to be full, as of timestamp <b>now_ts</b>. + */ +void +token_bucket_reset(token_bucket_t *bucket, + uint32_t now_ts) +{ + bucket->read_bucket = bucket->burst; + bucket->write_bucket = bucket->burst; + bucket->last_refilled_at_ts = now_ts; +} + +/* Helper: see token_bucket_refill */ +static int +refill_single_bucket(int32_t *bucketptr, + const uint32_t rate, + const int32_t burst, + const uint32_t elapsed_steps) +{ + const int was_empty = (*bucketptr <= 0); + /* The casts here prevent an underflow. + * + * Note that even if the bucket value is negative, subtracting it from + * "burst" will still produce a correct result. If this result is + * ridiculously high, then the "elapsed_steps > gap / rate" check below + * should catch it. */ + const size_t gap = ((size_t)burst) - ((size_t)*bucketptr); + + if (elapsed_steps > gap / rate) { + *bucketptr = burst; + } else { + *bucketptr += rate * elapsed_steps; + } + + return was_empty && *bucketptr > 0; +} + +/** + * Refill <b>bucket</b> as appropriate, given that the current timestamp + * is <b>now_ts</b>. + * + * Return a bitmask containing TB_READ iff read bucket was empty and became + * nonempty, and TB_WRITE iff the write bucket was empty and became nonempty. + */ +int +token_bucket_refill(token_bucket_t *bucket, + uint32_t now_ts) +{ + const uint32_t elapsed_ticks = (now_ts - bucket->last_refilled_at_ts); + const uint32_t elapsed_steps = elapsed_ticks / TICKS_PER_STEP; + + if (!elapsed_steps) { + /* Note that if less than one whole step elapsed, we don't advance the + * time in last_refilled_at_ts. That's intentional: we want to make sure + * that we add some bytes to it eventually. */ + return 0; + } + + int flags = 0; + if (refill_single_bucket(&bucket->read_bucket, + bucket->rate, bucket->burst, elapsed_steps)) + flags |= TB_READ; + if (refill_single_bucket(&bucket->write_bucket, + bucket->rate, bucket->burst, elapsed_steps)) + flags |= TB_WRITE; + + bucket->last_refilled_at_ts = now_ts; + return flags; +} + +static int +decrement_single_bucket(int32_t *bucketptr, + ssize_t n) +{ + if (BUG(n < 0)) + return 0; + const int becomes_empty = *bucketptr > 0 && n >= *bucketptr; + *bucketptr -= n; + return becomes_empty; +} + +/** + * Decrement the read token bucket in <b>bucket</b> by <b>n</b> bytes. + * + * Return true if the bucket was nonempty and became empty; return false + * otherwise. + */ +int +token_bucket_dec_read(token_bucket_t *bucket, + ssize_t n) +{ + return decrement_single_bucket(&bucket->read_bucket, n); +} + +/** + * Decrement the write token bucket in <b>bucket</b> by <b>n</b> bytes. + * + * Return true if the bucket was nonempty and became empty; return false + * otherwise. + */ +int +token_bucket_dec_write(token_bucket_t *bucket, + ssize_t n) +{ + return decrement_single_bucket(&bucket->write_bucket, n); +} + diff --git a/src/common/token_bucket.h b/src/common/token_bucket.h new file mode 100644 index 0000000000..ef0735219e --- /dev/null +++ b/src/common/token_bucket.h @@ -0,0 +1,72 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file token_bucket.h + * \brief Headers for token_bucket.c + **/ + +#ifndef TOR_TOKEN_BUCKET_H +#define TOR_TOKEN_BUCKET_H + +#include "torint.h" + +typedef struct token_bucket_t { + uint32_t rate; + int32_t burst; + int32_t read_bucket; + int32_t write_bucket; + uint32_t last_refilled_at_ts; +} token_bucket_t; + +#define TOKEN_BUCKET_MAX_BURST INT32_MAX + +void token_bucket_init(token_bucket_t *bucket, + uint32_t rate, + uint32_t burst, + uint32_t now_ts); + +void token_bucket_adjust(token_bucket_t *bucket, + uint32_t rate, uint32_t burst); + +void token_bucket_reset(token_bucket_t *bucket, + uint32_t now_ts); + +#define TB_READ 1 +#define TB_WRITE 2 + +int token_bucket_refill(token_bucket_t *bucket, + uint32_t now_ts); + +int token_bucket_dec_read(token_bucket_t *bucket, + ssize_t n); +int token_bucket_dec_write(token_bucket_t *bucket, + ssize_t n); + +static inline size_t token_bucket_get_read(const token_bucket_t *bucket); +static inline size_t +token_bucket_get_read(const token_bucket_t *bucket) +{ + const ssize_t b = bucket->read_bucket; + return b >= 0 ? b : 0; +} + +static inline size_t token_bucket_get_write(const token_bucket_t *bucket); +static inline size_t +token_bucket_get_write(const token_bucket_t *bucket) +{ + const ssize_t b = bucket->write_bucket; + return b >= 0 ? b : 0; +} + +#ifdef TOKEN_BUCKET_PRIVATE + +/* To avoid making the rates too small, we consider units of "steps", + * where a "step" is defined as this many timestamp ticks. Keep this + * a power of two if you can. */ +#define TICKS_PER_STEP 16 + +#endif + +#endif /* TOR_TOKEN_BUCKET_H */ + diff --git a/src/test/include.am b/src/test/include.am index e98b056a44..2da50de01d 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -89,6 +89,7 @@ src_test_test_SOURCES = \ src/test/test_address.c \ src/test/test_address_set.c \ src/test/test_buffers.c \ + src/test/test_bwmgt.c \ src/test/test_cell_formats.c \ src/test/test_cell_queue.c \ src/test/test_channel.c \ diff --git a/src/test/test.c b/src/test/test.c index 4f2fbc693a..7df385bc36 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -812,6 +812,7 @@ struct testgroup_t testgroups[] = { { "address/", address_tests }, { "address_set/", address_set_tests }, { "buffer/", buffer_tests }, + { "bwmgt/", bwmgt_tests }, { "cellfmt/", cell_format_tests }, { "cellqueue/", cell_queue_tests }, { "channel/", channel_tests }, diff --git a/src/test/test.h b/src/test/test.h index 02ec9bda89..95715da7a9 100644 --- a/src/test/test.h +++ b/src/test/test.h @@ -178,6 +178,7 @@ extern struct testcase_t accounting_tests[]; extern struct testcase_t addr_tests[]; extern struct testcase_t address_tests[]; extern struct testcase_t address_set_tests[]; +extern struct testcase_t bwmgt_tests[]; extern struct testcase_t buffer_tests[]; extern struct testcase_t cell_format_tests[]; extern struct testcase_t cell_queue_tests[]; diff --git a/src/test/test_bwmgt.c b/src/test/test_bwmgt.c new file mode 100644 index 0000000000..7bcfcf7fe9 --- /dev/null +++ b/src/test/test_bwmgt.c @@ -0,0 +1,199 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file test_bwmgt.c + * \brief tests for bandwidth management / token bucket functions + */ + +#define TOKEN_BUCKET_PRIVATE + +#include "or.h" +#include "test.h" + +#include "token_bucket.h" + +// an imaginary time, in timestamp units. Chosen so it will roll over. +static const uint32_t START_TS = UINT32_MAX-10; +static const int32_t KB = 1024; + +static void +test_bwmgt_token_buf_init(void *arg) +{ + (void)arg; + token_bucket_t b; + + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + // Burst is correct + tt_uint_op(b.burst, OP_EQ, 64*KB); + // Rate is correct, within 1 percent. + { + uint32_t ticks_per_sec = + (uint32_t) monotime_msec_to_approx_coarse_stamp_units(1000); + uint32_t rate_per_sec = (b.rate * ticks_per_sec / TICKS_PER_STEP); + + tt_uint_op(rate_per_sec, OP_GT, 16*KB-160); + tt_uint_op(rate_per_sec, OP_LT, 16*KB+160); + } + // Bucket starts out full: + tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS); + tt_int_op(b.read_bucket, OP_EQ, 64*KB); + + done: + ; +} + +static void +test_bwmgt_token_buf_adjust(void *arg) +{ + (void)arg; + token_bucket_t b; + + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + + uint32_t rate_orig = b.rate; + // Increasing burst + token_bucket_adjust(&b, 16*KB, 128*KB); + tt_uint_op(b.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket, OP_EQ, 64*KB); + tt_uint_op(b.burst, OP_EQ, 128*KB); + + // Decreasing burst but staying above bucket + token_bucket_adjust(&b, 16*KB, 96*KB); + tt_uint_op(b.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket, OP_EQ, 64*KB); + tt_uint_op(b.burst, OP_EQ, 96*KB); + + // Decreasing burst below bucket, + token_bucket_adjust(&b, 16*KB, 48*KB); + tt_uint_op(b.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket, OP_EQ, 48*KB); + tt_uint_op(b.burst, OP_EQ, 48*KB); + + // Changing rate. + token_bucket_adjust(&b, 32*KB, 48*KB); + tt_uint_op(b.rate, OP_GE, rate_orig*2 - 10); + tt_uint_op(b.rate, OP_LE, rate_orig*2 + 10); + tt_uint_op(b.read_bucket, OP_EQ, 48*KB); + tt_uint_op(b.burst, OP_EQ, 48*KB); + + done: + ; +} + +static void +test_bwmgt_token_buf_dec(void *arg) +{ + (void)arg; + token_bucket_t b; + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + + // full-to-not-full. + tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, KB)); + tt_int_op(b.read_bucket, OP_EQ, 63*KB); + + // Full to almost-not-full + tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 63*KB - 1)); + tt_int_op(b.read_bucket, OP_EQ, 1); + + // almost-not-full to empty. + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 1)); + tt_int_op(b.read_bucket, OP_EQ, 0); + + // reset bucket, try full-to-empty + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB)); + tt_int_op(b.read_bucket, OP_EQ, 0); + + // reset bucket, try underflow. + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB + 1)); + tt_int_op(b.read_bucket, OP_EQ, -1); + + // A second underflow does not make the bucket empty. + tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 1000)); + tt_int_op(b.read_bucket, OP_EQ, -1001); + + done: + ; +} + +static void +test_bwmgt_token_buf_refill(void *arg) +{ + (void)arg; + token_bucket_t b; + const uint32_t SEC = + (uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000); + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + + /* Make the buffer much emptier, then let one second elapse. */ + token_bucket_dec_read(&b, 48*KB); + tt_int_op(b.read_bucket, OP_EQ, 16*KB); + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC)); + tt_int_op(b.read_bucket, OP_GT, 32*KB - 300); + tt_int_op(b.read_bucket, OP_LT, 32*KB + 300); + + /* Another half second. */ + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2)); + tt_int_op(b.read_bucket, OP_GT, 40*KB - 400); + tt_int_op(b.read_bucket, OP_LT, 40*KB + 400); + tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2); + + /* No time: nothing happens. */ + { + const uint32_t bucket_orig = b.read_bucket; + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2)); + tt_int_op(b.read_bucket, OP_EQ, bucket_orig); + } + + /* Another 30 seconds: fill the bucket. */ + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*30)); + tt_int_op(b.read_bucket, OP_EQ, b.burst); + tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*30); + + /* Another 30 seconds: nothing happens. */ + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*60)); + tt_int_op(b.read_bucket, OP_EQ, b.burst); + tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*60); + + /* Empty the bucket, let two seconds pass, and make sure that a refill is + * noticed. */ + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.burst)); + tt_int_op(0, OP_EQ, b.read_bucket); + tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*61)); + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*62)); + tt_int_op(b.read_bucket, OP_GT, 32*KB-300); + tt_int_op(b.read_bucket, OP_LT, 32*KB+300); + + /* Underflow the bucket, make sure we detect when it has tokens again. */ + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.read_bucket+16*KB)); + tt_int_op(-16*KB, OP_EQ, b.read_bucket); + // half a second passes... + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64)); + tt_int_op(b.read_bucket, OP_GT, -8*KB-200); + tt_int_op(b.read_bucket, OP_LT, -8*KB+200); + // a second passes + tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*65)); + tt_int_op(b.read_bucket, OP_GT, 8*KB-200); + tt_int_op(b.read_bucket, OP_LT, 8*KB+200); + + // a ridiculous amount of time passes + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64)); + tt_int_op(b.read_bucket, OP_EQ, b.burst); + + done: + ; +} + +#define BWMGT(name) \ + { #name, test_bwmgt_ ## name , 0, NULL, NULL } + +struct testcase_t bwmgt_tests[] = { + BWMGT(token_buf_init), + BWMGT(token_buf_adjust), + BWMGT(token_buf_dec), + BWMGT(token_buf_refill), + END_OF_TESTCASES +}; + |