diff options
-rw-r--r-- | src/common/token_bucket.c | 206 | ||||
-rw-r--r-- | src/common/token_bucket.h | 86 | ||||
-rw-r--r-- | src/or/connection.c | 70 | ||||
-rw-r--r-- | src/or/connection_or.c | 4 | ||||
-rw-r--r-- | src/or/main.c | 16 | ||||
-rw-r--r-- | src/or/main.h | 4 | ||||
-rw-r--r-- | src/or/or.h | 2 | ||||
-rw-r--r-- | src/test/test_bwmgt.c | 164 |
8 files changed, 322 insertions, 230 deletions
diff --git a/src/common/token_bucket.c b/src/common/token_bucket.c index 6af2982147..7c81264e25 100644 --- a/src/common/token_bucket.c +++ b/src/common/token_bucket.c @@ -9,8 +9,9 @@ * Tor uses these token buckets to keep track of bandwidth usage, and * sometimes other things too. * - * The time units we use internally are based on "timestamp" units -- see - * monotime_coarse_to_stamp() for a rationale. + * There are two layers of abstraction here: "raw" token buckets, in which all + * the pieces are decoupled, and "read-write" token buckets, which combine all + * the moving parts into one. * * Token buckets may become negative. **/ @@ -20,6 +21,92 @@ #include "token_bucket.h" #include "util_bug.h" +/** + * Set the <b>rate</b> and <b>burst</b> value in a token_bucket_cfg. + * + * Note that the <b>rate</b> value is in arbitrary units, but those units will + * determine the units of token_bucket_raw_dec(), token_bucket_raw_refill, and + * so on. + */ +void +token_bucket_cfg_init(token_bucket_cfg_t *cfg, + uint32_t rate, + uint32_t burst) +{ + tor_assert_nonfatal(rate > 0); + tor_assert_nonfatal(burst > 0); + if (burst > TOKEN_BUCKET_MAX_BURST) + burst = TOKEN_BUCKET_MAX_BURST; + + cfg->rate = rate; + cfg->burst = burst; +} + +/** + * Initialize a raw token bucket and its associated timestamp to the "full" + * state, according to <b>cfg</b>. + */ +void +token_bucket_raw_reset(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg) +{ + bucket->bucket = cfg->burst; +} + +/** + * Adust a preexisting token bucket to respect the new configuration + * <b>cfg</b>, by decreasing its current level if needed. */ +void +token_bucket_raw_adjust(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg) +{ + bucket->bucket = MIN(bucket->bucket, cfg->burst); +} + +/** + * Given an amount of <b>elapsed</b> time units, and a bucket configuration + * <b>cfg</b>, refill the level of <b>bucket</b> accordingly. Note that the + * units of time in <b>elapsed</b> must correspond to those used to set the + * rate in <b>cfg</b>, or the result will be illogical. + */ +int +token_bucket_raw_refill_steps(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg, + const uint32_t elapsed) +{ + const int was_empty = (bucket->bucket <= 0); + /* The casts here prevent an underflow. + * + * Note that even if the bucket value is negative, subtracting it from + * "burst" will still produce a correct result. If this result is + * ridiculously high, then the "elapsed > gap / rate" check below + * should catch it. */ + const size_t gap = ((size_t)cfg->burst) - ((size_t)bucket->bucket); + + if (elapsed > gap / cfg->rate) { + bucket->bucket = cfg->burst; + } else { + bucket->bucket += cfg->rate * elapsed; + } + + return was_empty && bucket->bucket > 0; +} + +/** + * Decrement a provided bucket by <b>n</b> units. Note that <b>n</b> + * must be nonnegative. + */ +int +token_bucket_raw_dec(token_bucket_raw_t *bucket, + ssize_t n) +{ + if (BUG(n < 0)) + return 0; + const int becomes_empty = bucket->bucket > 0 && n >= bucket->bucket; + bucket->bucket -= n; + return becomes_empty; +} + /** Convert a rate in bytes per second to a rate in bytes per step */ static uint32_t rate_per_sec_to_rate_per_step(uint32_t rate) @@ -30,8 +117,9 @@ rate_per_sec_to_rate_per_step(uint32_t rate) (rate / 1000) * to_approximate_msec(TICKS_PER_STEP). But to minimize rounding error, we do it this way instead, and divide last. */ - return (uint32_t) + uint32_t val = (uint32_t) monotime_coarse_stamp_units_to_approx_msec(rate*TICKS_PER_STEP)/1000; + return val ? val : 1; } /** @@ -41,14 +129,14 @@ rate_per_sec_to_rate_per_step(uint32_t rate) * starts out full. */ void -token_bucket_init(token_bucket_t *bucket, +token_bucket_rw_init(token_bucket_rw_t *bucket, uint32_t rate, uint32_t burst, uint32_t now_ts) { - memset(bucket, 0, sizeof(token_bucket_t)); - token_bucket_adjust(bucket, rate, burst); - token_bucket_reset(bucket, now_ts); + memset(bucket, 0, sizeof(token_bucket_rw_t)); + token_bucket_rw_adjust(bucket, rate, burst); + token_bucket_rw_reset(bucket, now_ts); } /** @@ -56,56 +144,27 @@ token_bucket_init(token_bucket_t *bucket, * for the token bucket in *<b>bucket</b>. */ void -token_bucket_adjust(token_bucket_t *bucket, - uint32_t rate, - uint32_t burst) +token_bucket_rw_adjust(token_bucket_rw_t *bucket, + uint32_t rate, + uint32_t burst) { - tor_assert_nonfatal(rate > 0); - tor_assert_nonfatal(burst > 0); - if (burst > TOKEN_BUCKET_MAX_BURST) - burst = TOKEN_BUCKET_MAX_BURST; - - bucket->rate = rate_per_sec_to_rate_per_step(rate); - bucket->burst = burst; - bucket->read_bucket = MIN(bucket->read_bucket, (int32_t)burst); - bucket->write_bucket = MIN(bucket->write_bucket, (int32_t)burst); + token_bucket_cfg_init(&bucket->cfg, + rate_per_sec_to_rate_per_step(rate), + burst); + token_bucket_raw_adjust(&bucket->read_bucket, &bucket->cfg); + token_bucket_raw_adjust(&bucket->write_bucket, &bucket->cfg); } /** * Reset <b>bucket</b> to be full, as of timestamp <b>now_ts</b>. */ void -token_bucket_reset(token_bucket_t *bucket, - uint32_t now_ts) +token_bucket_rw_reset(token_bucket_rw_t *bucket, + uint32_t now_ts) { - bucket->read_bucket = bucket->burst; - bucket->write_bucket = bucket->burst; - bucket->last_refilled_at_ts = now_ts; -} - -/* Helper: see token_bucket_refill */ -static int -refill_single_bucket(int32_t *bucketptr, - const uint32_t rate, - const int32_t burst, - const uint32_t elapsed_steps) -{ - const int was_empty = (*bucketptr <= 0); - /* The casts here prevent an underflow. - * - * Note that even if the bucket value is negative, subtracting it from - * "burst" will still produce a correct result. If this result is - * ridiculously high, then the "elapsed_steps > gap / rate" check below - * should catch it. */ - const size_t gap = ((size_t)burst) - ((size_t)*bucketptr); - - if (elapsed_steps > gap / rate) { - *bucketptr = burst; - } else { - *bucketptr += rate * elapsed_steps; - } - - return was_empty && *bucketptr > 0; + token_bucket_raw_reset(&bucket->read_bucket, &bucket->cfg); + token_bucket_raw_reset(&bucket->write_bucket, &bucket->cfg); + bucket->last_refilled_at_timestamp = now_ts; } /** @@ -116,10 +175,11 @@ refill_single_bucket(int32_t *bucketptr, * nonempty, and TB_WRITE iff the write bucket was empty and became nonempty. */ int -token_bucket_refill(token_bucket_t *bucket, - uint32_t now_ts) +token_bucket_rw_refill(token_bucket_rw_t *bucket, + uint32_t now_ts) { - const uint32_t elapsed_ticks = (now_ts - bucket->last_refilled_at_ts); + const uint32_t elapsed_ticks = + (now_ts - bucket->last_refilled_at_timestamp); if (elapsed_ticks > UINT32_MAX-(300*1000)) { /* Either about 48 days have passed since the last refill, or the * monotonic clock has somehow moved backwards. (We're looking at you, @@ -132,34 +192,23 @@ token_bucket_refill(token_bucket_t *bucket, if (!elapsed_steps) { /* Note that if less than one whole step elapsed, we don't advance the - * time in last_refilled_at_ts. That's intentional: we want to make sure + * time in last_refilled_at. That's intentional: we want to make sure * that we add some bytes to it eventually. */ return 0; } int flags = 0; - if (refill_single_bucket(&bucket->read_bucket, - bucket->rate, bucket->burst, elapsed_steps)) + if (token_bucket_raw_refill_steps(&bucket->read_bucket, + &bucket->cfg, elapsed_steps)) flags |= TB_READ; - if (refill_single_bucket(&bucket->write_bucket, - bucket->rate, bucket->burst, elapsed_steps)) + if (token_bucket_raw_refill_steps(&bucket->write_bucket, + &bucket->cfg, elapsed_steps)) flags |= TB_WRITE; - bucket->last_refilled_at_ts = now_ts; + bucket->last_refilled_at_timestamp = now_ts; return flags; } -static int -decrement_single_bucket(int32_t *bucketptr, - ssize_t n) -{ - if (BUG(n < 0)) - return 0; - const int becomes_empty = *bucketptr > 0 && n >= *bucketptr; - *bucketptr -= n; - return becomes_empty; -} - /** * Decrement the read token bucket in <b>bucket</b> by <b>n</b> bytes. * @@ -167,10 +216,10 @@ decrement_single_bucket(int32_t *bucketptr, * otherwise. */ int -token_bucket_dec_read(token_bucket_t *bucket, +token_bucket_rw_dec_read(token_bucket_rw_t *bucket, ssize_t n) { - return decrement_single_bucket(&bucket->read_bucket, n); + return token_bucket_raw_dec(&bucket->read_bucket, n); } /** @@ -180,20 +229,21 @@ token_bucket_dec_read(token_bucket_t *bucket, * otherwise. */ int -token_bucket_dec_write(token_bucket_t *bucket, +token_bucket_rw_dec_write(token_bucket_rw_t *bucket, ssize_t n) { - return decrement_single_bucket(&bucket->write_bucket, n); + return token_bucket_raw_dec(&bucket->write_bucket, n); } /** - * As token_bucket_dec_read and token_bucket_dec_write, in a single operation. + * As token_bucket_rw_dec_read and token_bucket_rw_dec_write, in a single + * operation. */ void -token_bucket_dec(token_bucket_t *bucket, - ssize_t n_read, ssize_t n_written) +token_bucket_rw_dec(token_bucket_rw_t *bucket, + ssize_t n_read, ssize_t n_written) { - token_bucket_dec_read(bucket, n_read); - token_bucket_dec_read(bucket, n_written); + token_bucket_rw_dec_read(bucket, n_read); + token_bucket_rw_dec_write(bucket, n_written); } diff --git a/src/common/token_bucket.h b/src/common/token_bucket.h index 2d1ccd5cf3..44315fb6bd 100644 --- a/src/common/token_bucket.h +++ b/src/common/token_bucket.h @@ -2,8 +2,8 @@ /* See LICENSE for licensing information */ /** - * \file token_bucket.h - * \brief Headers for token_bucket.c + * \file token_bucket_rw.h + * \brief Headers for token_bucket_rw.c **/ #ifndef TOR_TOKEN_BUCKET_H @@ -11,55 +11,95 @@ #include "torint.h" -typedef struct token_bucket_t { +/** Largest allowable burst value for a token buffer. */ +#define TOKEN_BUCKET_MAX_BURST INT32_MAX + +/** A generic token buffer configuration: determines the number of tokens + * added to the bucket in each time unit (the "rate"), and the maximum number + * of tokens in the bucket (the "burst") */ +typedef struct token_bucket_cfg_t { uint32_t rate; int32_t burst; - int32_t read_bucket; - int32_t write_bucket; - uint32_t last_refilled_at_ts; -} token_bucket_t; +} token_bucket_cfg_t; -#define TOKEN_BUCKET_MAX_BURST INT32_MAX +/** A raw token bucket, decoupled from its configuration and timestamp. */ +typedef struct token_bucket_raw_t { + int32_t bucket; +} token_bucket_raw_t; + +void token_bucket_cfg_init(token_bucket_cfg_t *cfg, + uint32_t rate, + uint32_t burst); + +void token_bucket_raw_adjust(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg); + +void token_bucket_raw_reset(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg); + +int token_bucket_raw_dec(token_bucket_raw_t *bucket, + ssize_t n); + +int token_bucket_raw_refill_steps(token_bucket_raw_t *bucket, + const token_bucket_cfg_t *cfg, + const uint32_t elapsed_steps); + +static inline size_t token_bucket_raw_get(const token_bucket_raw_t *bucket); +/** Return the current number of bytes set in a token bucket. */ +static inline size_t +token_bucket_raw_get(const token_bucket_raw_t *bucket) +{ + return bucket->bucket >= 0 ? bucket->bucket : 0; +} -void token_bucket_init(token_bucket_t *bucket, +/** A convenience type containing all the pieces needed for a coupled + * read-bucket and write-bucket that have the same rate limit, and which use + * "timestamp units" (see compat_time.h) for their time. */ +typedef struct token_bucket_rw_t { + token_bucket_cfg_t cfg; + token_bucket_raw_t read_bucket; + token_bucket_raw_t write_bucket; + uint32_t last_refilled_at_timestamp; +} token_bucket_rw_t; + +void token_bucket_rw_init(token_bucket_rw_t *bucket, uint32_t rate, uint32_t burst, uint32_t now_ts); -void token_bucket_adjust(token_bucket_t *bucket, +void token_bucket_rw_adjust(token_bucket_rw_t *bucket, uint32_t rate, uint32_t burst); -void token_bucket_reset(token_bucket_t *bucket, +void token_bucket_rw_reset(token_bucket_rw_t *bucket, uint32_t now_ts); #define TB_READ 1 #define TB_WRITE 2 -int token_bucket_refill(token_bucket_t *bucket, +int token_bucket_rw_refill(token_bucket_rw_t *bucket, uint32_t now_ts); -int token_bucket_dec_read(token_bucket_t *bucket, +int token_bucket_rw_dec_read(token_bucket_rw_t *bucket, ssize_t n); -int token_bucket_dec_write(token_bucket_t *bucket, +int token_bucket_rw_dec_write(token_bucket_rw_t *bucket, ssize_t n); -void token_bucket_dec(token_bucket_t *bucket, +void token_bucket_rw_dec(token_bucket_rw_t *bucket, ssize_t n_read, ssize_t n_written); -static inline size_t token_bucket_get_read(const token_bucket_t *bucket); +static inline size_t token_bucket_rw_get_read(const token_bucket_rw_t *bucket); static inline size_t -token_bucket_get_read(const token_bucket_t *bucket) +token_bucket_rw_get_read(const token_bucket_rw_t *bucket) { - const ssize_t b = bucket->read_bucket; - return b >= 0 ? b : 0; + return token_bucket_raw_get(&bucket->read_bucket); } -static inline size_t token_bucket_get_write(const token_bucket_t *bucket); +static inline size_t token_bucket_rw_get_write( + const token_bucket_rw_t *bucket); static inline size_t -token_bucket_get_write(const token_bucket_t *bucket) +token_bucket_rw_get_write(const token_bucket_rw_t *bucket) { - const ssize_t b = bucket->write_bucket; - return b >= 0 ? b : 0; + return token_bucket_raw_get(&bucket->write_bucket); } #ifdef TOKEN_BUCKET_PRIVATE diff --git a/src/or/connection.c b/src/or/connection.c index 1aad68678e..20c758f57d 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -2880,12 +2880,12 @@ connection_bucket_read_limit(connection_t *conn, time_t now) int base = RELAY_PAYLOAD_SIZE; int priority = conn->type != CONN_TYPE_DIR; ssize_t conn_bucket = -1; - size_t global_bucket_val = token_bucket_get_read(&global_bucket); + size_t global_bucket_val = token_bucket_rw_get_read(&global_bucket); if (connection_speaks_cells(conn)) { or_connection_t *or_conn = TO_OR_CONN(conn); if (conn->state == OR_CONN_STATE_OPEN) - conn_bucket = token_bucket_get_read(&or_conn->bucket); + conn_bucket = token_bucket_rw_get_read(&or_conn->bucket); base = get_cell_network_size(or_conn->wide_circ_ids); } @@ -2895,7 +2895,7 @@ connection_bucket_read_limit(connection_t *conn, time_t now) } if (connection_counts_as_relayed_traffic(conn, now)) { - size_t relayed = token_bucket_get_read(&global_relayed_bucket); + size_t relayed = token_bucket_rw_get_read(&global_relayed_bucket); global_bucket_val = MIN(global_bucket_val, relayed); } @@ -2910,7 +2910,7 @@ connection_bucket_write_limit(connection_t *conn, time_t now) int base = RELAY_PAYLOAD_SIZE; int priority = conn->type != CONN_TYPE_DIR; size_t conn_bucket = conn->outbuf_flushlen; - size_t global_bucket_val = token_bucket_get_write(&global_bucket); + size_t global_bucket_val = token_bucket_rw_get_write(&global_bucket); if (!connection_is_rate_limited(conn)) { /* be willing to write to local conns even if our buckets are empty */ @@ -2921,12 +2921,13 @@ connection_bucket_write_limit(connection_t *conn, time_t now) /* use the per-conn write limit if it's lower */ or_connection_t *or_conn = TO_OR_CONN(conn); if (conn->state == OR_CONN_STATE_OPEN) - conn_bucket = MIN(conn_bucket, token_bucket_get_write(&or_conn->bucket)); + conn_bucket = MIN(conn_bucket, + token_bucket_rw_get_write(&or_conn->bucket)); base = get_cell_network_size(or_conn->wide_circ_ids); } if (connection_counts_as_relayed_traffic(conn, now)) { - size_t relayed = token_bucket_get_write(&global_relayed_bucket); + size_t relayed = token_bucket_rw_get_write(&global_relayed_bucket); global_bucket_val = MIN(global_bucket_val, relayed); } @@ -2956,8 +2957,9 @@ connection_bucket_write_limit(connection_t *conn, time_t now) int global_write_bucket_low(connection_t *conn, size_t attempt, int priority) { - size_t smaller_bucket = MIN(token_bucket_get_write(&global_bucket), - token_bucket_get_write(&global_relayed_bucket)); + size_t smaller_bucket = + MIN(token_bucket_rw_get_write(&global_bucket), + token_bucket_rw_get_write(&global_relayed_bucket)); if (authdir_mode(get_options()) && priority>1) return 0; /* there's always room to answer v2 if we're an auth dir */ @@ -3041,12 +3043,12 @@ connection_buckets_decrement(connection_t *conn, time_t now, return; /* local IPs are free */ if (connection_counts_as_relayed_traffic(conn, now)) { - token_bucket_dec(&global_relayed_bucket, num_read, num_written); + token_bucket_rw_dec(&global_relayed_bucket, num_read, num_written); } - token_bucket_dec(&global_bucket, num_read, num_written); + token_bucket_rw_dec(&global_bucket, num_read, num_written); if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { or_connection_t *or_conn = TO_OR_CONN(conn); - token_bucket_dec(&or_conn->bucket, num_read, num_written); + token_bucket_rw_dec(&or_conn->bucket, num_read, num_written); } } @@ -3060,14 +3062,14 @@ connection_consider_empty_read_buckets(connection_t *conn) if (!connection_is_rate_limited(conn)) return; /* Always okay. */ - if (token_bucket_get_read(&global_bucket) <= 0) { + if (token_bucket_rw_get_read(&global_bucket) <= 0) { reason = "global read bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && - token_bucket_get_read(&global_relayed_bucket) <= 0) { + token_bucket_rw_get_read(&global_relayed_bucket) <= 0) { reason = "global relayed read bucket exhausted. Pausing."; } else if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN && - token_bucket_get_read(&TO_OR_CONN(conn)->bucket) <= 0) { + token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection read bucket exhausted. Pausing."; } else return; /* all good, no need to stop it */ @@ -3087,14 +3089,14 @@ connection_consider_empty_write_buckets(connection_t *conn) if (!connection_is_rate_limited(conn)) return; /* Always okay. */ - if (token_bucket_get_write(&global_bucket) <= 0) { + if (token_bucket_rw_get_write(&global_bucket) <= 0) { reason = "global write bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && - token_bucket_get_write(&global_relayed_bucket) <= 0) { + token_bucket_rw_get_write(&global_relayed_bucket) <= 0) { reason = "global relayed write bucket exhausted. Pausing."; } else if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN && - token_bucket_get_write(&TO_OR_CONN(conn)->bucket) <= 0) { + token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection write bucket exhausted. Pausing."; } else return; /* all good, no need to stop it */ @@ -3111,17 +3113,17 @@ connection_bucket_init(void) { const or_options_t *options = get_options(); const uint32_t now_ts = monotime_coarse_get_stamp(); - token_bucket_init(&global_bucket, + token_bucket_rw_init(&global_bucket, (int32_t)options->BandwidthRate, (int32_t)options->BandwidthBurst, now_ts); if (options->RelayBandwidthRate) { - token_bucket_init(&global_relayed_bucket, + token_bucket_rw_init(&global_relayed_bucket, (int32_t)options->RelayBandwidthRate, (int32_t)options->RelayBandwidthBurst, now_ts); } else { - token_bucket_init(&global_relayed_bucket, + token_bucket_rw_init(&global_relayed_bucket, (int32_t)options->BandwidthRate, (int32_t)options->BandwidthBurst, now_ts); @@ -3132,15 +3134,15 @@ connection_bucket_init(void) void connection_bucket_adjust(const or_options_t *options) { - token_bucket_adjust(&global_bucket, + token_bucket_rw_adjust(&global_bucket, (int32_t)options->BandwidthRate, (int32_t)options->BandwidthBurst); if (options->RelayBandwidthRate) { - token_bucket_adjust(&global_relayed_bucket, + token_bucket_rw_adjust(&global_relayed_bucket, (int32_t)options->RelayBandwidthRate, (int32_t)options->RelayBandwidthBurst); } else { - token_bucket_adjust(&global_relayed_bucket, + token_bucket_rw_adjust(&global_relayed_bucket, (int32_t)options->BandwidthRate, (int32_t)options->BandwidthBurst); } @@ -3153,12 +3155,12 @@ connection_bucket_refill(time_t now, uint32_t now_ts) smartlist_t *conns = get_connection_array(); write_buckets_empty_last_second = - token_bucket_get_write(&global_bucket) <= 0 || - token_bucket_get_write(&global_relayed_bucket) <= 0; + token_bucket_rw_get_write(&global_bucket) <= 0 || + token_bucket_rw_get_write(&global_relayed_bucket) <= 0; /* refill the global buckets */ - token_bucket_refill(&global_bucket, now_ts); - token_bucket_refill(&global_relayed_bucket, now_ts); + token_bucket_rw_refill(&global_bucket, now_ts); + token_bucket_rw_refill(&global_relayed_bucket, now_ts); /* refill the per-connection buckets */ SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { @@ -3166,17 +3168,17 @@ connection_bucket_refill(time_t now, uint32_t now_ts) or_connection_t *or_conn = TO_OR_CONN(conn); if (conn->state == OR_CONN_STATE_OPEN) { - token_bucket_refill(&or_conn->bucket, now_ts); + token_bucket_rw_refill(&or_conn->bucket, now_ts); } } if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */ - && token_bucket_get_read(&global_bucket) > 0 /* and we can read */ + && token_bucket_rw_get_read(&global_bucket) > 0 /* and we can read */ && (!connection_counts_as_relayed_traffic(conn, now) || - token_bucket_get_read(&global_relayed_bucket) > 0) + token_bucket_rw_get_read(&global_relayed_bucket) > 0) && (!connection_speaks_cells(conn) || conn->state != OR_CONN_STATE_OPEN || - token_bucket_get_read(&TO_OR_CONN(conn)->bucket) > 0)) { + token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) > 0)) { /* and either a non-cell conn or a cell conn with non-empty bucket */ LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, "waking up conn (fd %d) for read", (int)conn->s)); @@ -3185,12 +3187,12 @@ connection_bucket_refill(time_t now, uint32_t now_ts) } if (conn->write_blocked_on_bw == 1 - && token_bucket_get_write(&global_bucket) > 0 /* and we can write */ + && token_bucket_rw_get_write(&global_bucket) > 0 /* and we can write */ && (!connection_counts_as_relayed_traffic(conn, now) || - token_bucket_get_write(&global_relayed_bucket) > 0) + token_bucket_rw_get_write(&global_relayed_bucket) > 0) && (!connection_speaks_cells(conn) || conn->state != OR_CONN_STATE_OPEN || - token_bucket_get_write(&TO_OR_CONN(conn)->bucket) > 0)) { + token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) > 0)) { LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, "waking up conn (fd %d) for write", (int)conn->s)); conn->write_blocked_on_bw = 0; diff --git a/src/or/connection_or.c b/src/or/connection_or.c index 3afdfa6b5a..7723d9d2bd 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -793,9 +793,9 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset, (int)options->BandwidthBurst, 1, INT32_MAX); } - token_bucket_adjust(&conn->bucket, rate, burst); + token_bucket_rw_adjust(&conn->bucket, rate, burst); if (reset) { - token_bucket_reset(&conn->bucket, monotime_coarse_get_stamp()); + token_bucket_rw_reset(&conn->bucket, monotime_coarse_get_stamp()); } } diff --git a/src/or/main.c b/src/or/main.c index 30f00ae342..a852d3273d 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -154,10 +154,10 @@ static void shutdown_did_not_work_callback(evutil_socket_t fd, short event, /********* START VARIABLES **********/ /* Token bucket for all traffic. */ -token_bucket_t global_bucket; +token_bucket_rw_t global_bucket; /* Token bucket for relayed traffic. */ -token_bucket_t global_relayed_bucket; +token_bucket_rw_t global_relayed_bucket; /** What was the read/write bucket before the last second_elapsed_callback() * call? (used to determine how many bytes we've read). */ @@ -2394,9 +2394,9 @@ refill_callback(periodic_timer_t *timer, void *arg) } bytes_written = stats_prev_global_write_bucket - - token_bucket_get_write(&global_bucket); + token_bucket_rw_get_write(&global_bucket); bytes_read = stats_prev_global_read_bucket - - token_bucket_get_read(&global_bucket); + token_bucket_rw_get_read(&global_bucket); stats_n_bytes_read += bytes_read; stats_n_bytes_written += bytes_written; @@ -2408,8 +2408,8 @@ refill_callback(periodic_timer_t *timer, void *arg) monotime_coarse_get_stamp()); } - stats_prev_global_read_bucket = token_bucket_get_read(&global_bucket); - stats_prev_global_write_bucket = token_bucket_get_write(&global_bucket); + stats_prev_global_read_bucket = token_bucket_rw_get_read(&global_bucket); + stats_prev_global_write_bucket = token_bucket_rw_get_write(&global_bucket); /* remember what time it is, for next time */ refill_timer_current_millisecond = now; @@ -2618,8 +2618,8 @@ do_main_loop(void) /* Set up our buckets */ connection_bucket_init(); - stats_prev_global_read_bucket = token_bucket_get_read(&global_bucket); - stats_prev_global_write_bucket = token_bucket_get_write(&global_bucket); + stats_prev_global_read_bucket = token_bucket_rw_get_read(&global_bucket); + stats_prev_global_write_bucket = token_bucket_rw_get_write(&global_bucket); /* initialize the bootstrap status events to know we're starting up */ control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0); diff --git a/src/or/main.h b/src/or/main.h index 0143973b26..e50d14d4d9 100644 --- a/src/or/main.h +++ b/src/or/main.h @@ -88,8 +88,8 @@ uint64_t get_main_loop_idle_count(void); extern time_t time_of_process_start; extern int quiet_level; -extern token_bucket_t global_bucket; -extern token_bucket_t global_relayed_bucket; +extern token_bucket_rw_t global_bucket; +extern token_bucket_rw_t global_relayed_bucket; #ifdef MAIN_PRIVATE STATIC void init_connection_lists(void); diff --git a/src/or/or.h b/src/or/or.h index 829b6755d2..c0e1ffff48 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1661,7 +1661,7 @@ typedef struct or_connection_t { time_t timestamp_lastempty; /**< When was the outbuf last completely empty?*/ - token_bucket_t bucket; /**< Used for rate limiting when the connection is + token_bucket_rw_t bucket; /**< Used for rate limiting when the connection is * in state CONN_OPEN. */ /* diff --git a/src/test/test_bwmgt.c b/src/test/test_bwmgt.c index 1a54f44fc4..0579b4a419 100644 --- a/src/test/test_bwmgt.c +++ b/src/test/test_bwmgt.c @@ -21,23 +21,23 @@ static void test_bwmgt_token_buf_init(void *arg) { (void)arg; - token_bucket_t b; + token_bucket_rw_t b; - token_bucket_init(&b, 16*KB, 64*KB, START_TS); + token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); // Burst is correct - tt_uint_op(b.burst, OP_EQ, 64*KB); + tt_uint_op(b.cfg.burst, OP_EQ, 64*KB); // Rate is correct, within 1 percent. { uint32_t ticks_per_sec = (uint32_t) monotime_msec_to_approx_coarse_stamp_units(1000); - uint32_t rate_per_sec = (b.rate * ticks_per_sec / TICKS_PER_STEP); + uint32_t rate_per_sec = (b.cfg.rate * ticks_per_sec / TICKS_PER_STEP); tt_uint_op(rate_per_sec, OP_GT, 16*KB-160); tt_uint_op(rate_per_sec, OP_LT, 16*KB+160); } // Bucket starts out full: - tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS); - tt_int_op(b.read_bucket, OP_EQ, 64*KB); + tt_uint_op(b.last_refilled_at_timestamp, OP_EQ, START_TS); + tt_int_op(b.read_bucket.bucket, OP_EQ, 64*KB); done: ; @@ -47,35 +47,35 @@ static void test_bwmgt_token_buf_adjust(void *arg) { (void)arg; - token_bucket_t b; + token_bucket_rw_t b; - token_bucket_init(&b, 16*KB, 64*KB, START_TS); + token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); - uint32_t rate_orig = b.rate; + uint32_t rate_orig = b.cfg.rate; // Increasing burst - token_bucket_adjust(&b, 16*KB, 128*KB); - tt_uint_op(b.rate, OP_EQ, rate_orig); - tt_uint_op(b.read_bucket, OP_EQ, 64*KB); - tt_uint_op(b.burst, OP_EQ, 128*KB); + token_bucket_rw_adjust(&b, 16*KB, 128*KB); + tt_uint_op(b.cfg.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket.bucket, OP_EQ, 64*KB); + tt_uint_op(b.cfg.burst, OP_EQ, 128*KB); // Decreasing burst but staying above bucket - token_bucket_adjust(&b, 16*KB, 96*KB); - tt_uint_op(b.rate, OP_EQ, rate_orig); - tt_uint_op(b.read_bucket, OP_EQ, 64*KB); - tt_uint_op(b.burst, OP_EQ, 96*KB); + token_bucket_rw_adjust(&b, 16*KB, 96*KB); + tt_uint_op(b.cfg.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket.bucket, OP_EQ, 64*KB); + tt_uint_op(b.cfg.burst, OP_EQ, 96*KB); // Decreasing burst below bucket, - token_bucket_adjust(&b, 16*KB, 48*KB); - tt_uint_op(b.rate, OP_EQ, rate_orig); - tt_uint_op(b.read_bucket, OP_EQ, 48*KB); - tt_uint_op(b.burst, OP_EQ, 48*KB); + token_bucket_rw_adjust(&b, 16*KB, 48*KB); + tt_uint_op(b.cfg.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket.bucket, OP_EQ, 48*KB); + tt_uint_op(b.cfg.burst, OP_EQ, 48*KB); // Changing rate. - token_bucket_adjust(&b, 32*KB, 48*KB); - tt_uint_op(b.rate, OP_GE, rate_orig*2 - 10); - tt_uint_op(b.rate, OP_LE, rate_orig*2 + 10); - tt_uint_op(b.read_bucket, OP_EQ, 48*KB); - tt_uint_op(b.burst, OP_EQ, 48*KB); + token_bucket_rw_adjust(&b, 32*KB, 48*KB); + tt_uint_op(b.cfg.rate, OP_GE, rate_orig*2 - 10); + tt_uint_op(b.cfg.rate, OP_LE, rate_orig*2 + 10); + tt_uint_op(b.read_bucket.bucket, OP_EQ, 48*KB); + tt_uint_op(b.cfg.burst, OP_EQ, 48*KB); done: ; @@ -85,34 +85,34 @@ static void test_bwmgt_token_buf_dec(void *arg) { (void)arg; - token_bucket_t b; - token_bucket_init(&b, 16*KB, 64*KB, START_TS); + token_bucket_rw_t b; + token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); // full-to-not-full. - tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, KB)); - tt_int_op(b.read_bucket, OP_EQ, 63*KB); + tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, KB)); + tt_int_op(b.read_bucket.bucket, OP_EQ, 63*KB); // Full to almost-not-full - tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 63*KB - 1)); - tt_int_op(b.read_bucket, OP_EQ, 1); + tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, 63*KB - 1)); + tt_int_op(b.read_bucket.bucket, OP_EQ, 1); // almost-not-full to empty. - tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 1)); - tt_int_op(b.read_bucket, OP_EQ, 0); + tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 1)); + tt_int_op(b.read_bucket.bucket, OP_EQ, 0); // reset bucket, try full-to-empty - token_bucket_init(&b, 16*KB, 64*KB, START_TS); - tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB)); - tt_int_op(b.read_bucket, OP_EQ, 0); + token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); + tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 64*KB)); + tt_int_op(b.read_bucket.bucket, OP_EQ, 0); // reset bucket, try underflow. - token_bucket_init(&b, 16*KB, 64*KB, START_TS); - tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB + 1)); - tt_int_op(b.read_bucket, OP_EQ, -1); + token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); + tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 64*KB + 1)); + tt_int_op(b.read_bucket.bucket, OP_EQ, -1); // A second underflow does not make the bucket empty. - tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 1000)); - tt_int_op(b.read_bucket, OP_EQ, -1001); + tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, 1000)); + tt_int_op(b.read_bucket.bucket, OP_EQ, -1001); done: ; @@ -122,71 +122,71 @@ static void test_bwmgt_token_buf_refill(void *arg) { (void)arg; - token_bucket_t b; + token_bucket_rw_t b; const uint32_t SEC = (uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000); - printf("%d\n", (int)SEC); - token_bucket_init(&b, 16*KB, 64*KB, START_TS); + token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS); /* Make the buffer much emptier, then let one second elapse. */ - token_bucket_dec_read(&b, 48*KB); - tt_int_op(b.read_bucket, OP_EQ, 16*KB); - tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC)); - tt_int_op(b.read_bucket, OP_GT, 32*KB - 300); - tt_int_op(b.read_bucket, OP_LT, 32*KB + 300); + token_bucket_rw_dec_read(&b, 48*KB); + tt_int_op(b.read_bucket.bucket, OP_EQ, 16*KB); + tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC)); + tt_int_op(b.read_bucket.bucket, OP_GT, 32*KB - 300); + tt_int_op(b.read_bucket.bucket, OP_LT, 32*KB + 300); /* Another half second. */ - tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2)); - tt_int_op(b.read_bucket, OP_GT, 40*KB - 400); - tt_int_op(b.read_bucket, OP_LT, 40*KB + 400); - tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2); + tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2)); + tt_int_op(b.read_bucket.bucket, OP_GT, 40*KB - 400); + tt_int_op(b.read_bucket.bucket, OP_LT, 40*KB + 400); + tt_uint_op(b.last_refilled_at_timestamp, OP_EQ, START_TS + SEC*3/2); /* No time: nothing happens. */ { - const uint32_t bucket_orig = b.read_bucket; - tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2)); - tt_int_op(b.read_bucket, OP_EQ, bucket_orig); + const uint32_t bucket_orig = b.read_bucket.bucket; + tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2)); + tt_int_op(b.read_bucket.bucket, OP_EQ, bucket_orig); } /* Another 30 seconds: fill the bucket. */ - tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*30)); - tt_int_op(b.read_bucket, OP_EQ, b.burst); - tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*30); + tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*30)); + tt_int_op(b.read_bucket.bucket, OP_EQ, b.cfg.burst); + tt_uint_op(b.last_refilled_at_timestamp, OP_EQ, START_TS + SEC*3/2 + SEC*30); /* Another 30 seconds: nothing happens. */ - tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*60)); - tt_int_op(b.read_bucket, OP_EQ, b.burst); - tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*60); + tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*60)); + tt_int_op(b.read_bucket.bucket, OP_EQ, b.cfg.burst); + tt_uint_op(b.last_refilled_at_timestamp, OP_EQ, START_TS + SEC*3/2 + SEC*60); /* Empty the bucket, let two seconds pass, and make sure that a refill is * noticed. */ - tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.burst)); - tt_int_op(0, OP_EQ, b.read_bucket); - tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*61)); - tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*62)); - tt_int_op(b.read_bucket, OP_GT, 32*KB-400); - tt_int_op(b.read_bucket, OP_LT, 32*KB+400); + tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, b.cfg.burst)); + tt_int_op(0, OP_EQ, b.read_bucket.bucket); + tt_int_op(1, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*61)); + tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*62)); + tt_int_op(b.read_bucket.bucket, OP_GT, 32*KB-400); + tt_int_op(b.read_bucket.bucket, OP_LT, 32*KB+400); /* Underflow the bucket, make sure we detect when it has tokens again. */ - tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.read_bucket+16*KB)); - tt_int_op(-16*KB, OP_EQ, b.read_bucket); + tt_int_op(1, OP_EQ, + token_bucket_rw_dec_read(&b, b.read_bucket.bucket+16*KB)); + tt_int_op(-16*KB, OP_EQ, b.read_bucket.bucket); // half a second passes... - tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64)); - tt_int_op(b.read_bucket, OP_GT, -8*KB-300); - tt_int_op(b.read_bucket, OP_LT, -8*KB+300); + tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*64)); + tt_int_op(b.read_bucket.bucket, OP_GT, -8*KB-300); + tt_int_op(b.read_bucket.bucket, OP_LT, -8*KB+300); // a second passes - tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*65)); - tt_int_op(b.read_bucket, OP_GT, 8*KB-400); - tt_int_op(b.read_bucket, OP_LT, 8*KB+400); + tt_int_op(1, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*65)); + tt_int_op(b.read_bucket.bucket, OP_GT, 8*KB-400); + tt_int_op(b.read_bucket.bucket, OP_LT, 8*KB+400); // We step a second backwards, and nothing happens. - tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64)); - tt_int_op(b.read_bucket, OP_GT, 8*KB-400); - tt_int_op(b.read_bucket, OP_LT, 8*KB+400); + tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*64)); + tt_int_op(b.read_bucket.bucket, OP_GT, 8*KB-400); + tt_int_op(b.read_bucket.bucket, OP_LT, 8*KB+400); // A ridiculous amount of time passes. - tt_int_op(0, OP_EQ, token_bucket_refill(&b, INT32_MAX)); - tt_int_op(b.read_bucket, OP_EQ, b.burst); + tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, INT32_MAX)); + tt_int_op(b.read_bucket.bucket, OP_EQ, b.cfg.burst); done: ; |