diff options
author | Nick Mathewson <nickm@torproject.org> | 2018-04-13 10:47:24 -0400 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2018-04-13 10:47:24 -0400 |
commit | b152d62cee7480ee7b9b68dd9b619db65b6cd112 (patch) | |
tree | ff89d131f60906f9d1b8e06af77883b3d1155769 /src | |
parent | ad57b1279ab68e204243ddf3cf841fd85606c331 (diff) | |
parent | 787bafc0f916c143ac244a217accf755817512df (diff) | |
download | tor-b152d62cee7480ee7b9b68dd9b619db65b6cd112.tar.gz tor-b152d62cee7480ee7b9b68dd9b619db65b6cd112.zip |
Merge branch 'token_bucket_refactor_squashed'
Diffstat (limited to 'src')
-rw-r--r-- | src/common/compat_time.c | 13 | ||||
-rw-r--r-- | src/common/compat_time.h | 1 | ||||
-rw-r--r-- | src/common/include.am | 2 | ||||
-rw-r--r-- | src/common/token_bucket.c | 199 | ||||
-rw-r--r-- | src/common/token_bucket.h | 75 | ||||
-rw-r--r-- | src/or/config.c | 15 | ||||
-rw-r--r-- | src/or/connection.c | 353 | ||||
-rw-r--r-- | src/or/connection.h | 11 | ||||
-rw-r--r-- | src/or/connection_or.c | 14 | ||||
-rw-r--r-- | src/or/control.c | 23 | ||||
-rw-r--r-- | src/or/control.h | 5 | ||||
-rw-r--r-- | src/or/main.c | 46 | ||||
-rw-r--r-- | src/or/main.h | 6 | ||||
-rw-r--r-- | src/or/or.h | 20 | ||||
-rw-r--r-- | src/test/include.am | 1 | ||||
-rw-r--r-- | src/test/test.c | 1 | ||||
-rw-r--r-- | src/test/test.h | 1 | ||||
-rw-r--r-- | src/test/test_bwmgt.c | 205 | ||||
-rw-r--r-- | src/test/test_controller_events.c | 75 | ||||
-rw-r--r-- | src/test/test_options.c | 10 | ||||
-rw-r--r-- | src/test/test_util.c | 7 |
21 files changed, 627 insertions, 456 deletions
diff --git a/src/common/compat_time.c b/src/common/compat_time.c index 183a60a480..b940447b67 100644 --- a/src/common/compat_time.c +++ b/src/common/compat_time.c @@ -830,11 +830,24 @@ monotime_coarse_stamp_units_to_approx_msec(uint64_t units) return (abstime_diff * mach_time_info.numer) / (mach_time_info.denom * ONE_MILLION); } +uint64_t +monotime_msec_to_approx_coarse_stamp_units(uint64_t msec) +{ + uint64_t abstime_val = + (((uint64_t)msec) * ONE_MILLION * mach_time_info.denom) / + mach_time_info.numer; + return abstime_val >> monotime_shift; +} #else uint64_t monotime_coarse_stamp_units_to_approx_msec(uint64_t units) { return (units * 1000) / STAMP_TICKS_PER_SECOND; } +uint64_t +monotime_msec_to_approx_coarse_stamp_units(uint64_t msec) +{ + return (msec * STAMP_TICKS_PER_SECOND) / 1000; +} #endif diff --git a/src/common/compat_time.h b/src/common/compat_time.h index 6ddd11883d..75b57f6f24 100644 --- a/src/common/compat_time.h +++ b/src/common/compat_time.h @@ -150,6 +150,7 @@ uint32_t monotime_coarse_to_stamp(const monotime_coarse_t *t); * into an approximate number of milliseconds. */ uint64_t monotime_coarse_stamp_units_to_approx_msec(uint64_t units); +uint64_t monotime_msec_to_approx_coarse_stamp_units(uint64_t msec); uint32_t monotime_coarse_get_stamp(void); #if defined(MONOTIME_COARSE_TYPE_IS_DIFFERENT) diff --git a/src/common/include.am b/src/common/include.am index 73c51ff0b2..87ab9d79e9 100644 --- a/src/common/include.am +++ b/src/common/include.am @@ -97,6 +97,7 @@ LIBOR_A_SRC = \ src/common/util_process.c \ src/common/sandbox.c \ src/common/storagedir.c \ + src/common/token_bucket.c \ src/common/workqueue.c \ $(libor_extra_source) \ $(threads_impl_source) \ @@ -184,6 +185,7 @@ COMMONHEADERS = \ src/common/storagedir.h \ src/common/testsupport.h \ src/common/timers.h \ + src/common/token_bucket.h \ src/common/torint.h \ src/common/torlog.h \ src/common/tortls.h \ diff --git a/src/common/token_bucket.c b/src/common/token_bucket.c new file mode 100644 index 0000000000..6af2982147 --- /dev/null +++ b/src/common/token_bucket.c @@ -0,0 +1,199 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file token_bucket.c + * \brief Functions to use and manipulate token buckets, used for + * rate-limiting on connections and globally. + * + * Tor uses these token buckets to keep track of bandwidth usage, and + * sometimes other things too. + * + * The time units we use internally are based on "timestamp" units -- see + * monotime_coarse_to_stamp() for a rationale. + * + * Token buckets may become negative. + **/ + +#define TOKEN_BUCKET_PRIVATE + +#include "token_bucket.h" +#include "util_bug.h" + +/** Convert a rate in bytes per second to a rate in bytes per step */ +static uint32_t +rate_per_sec_to_rate_per_step(uint32_t rate) +{ + /* + The precise calculation we'd want to do is + + (rate / 1000) * to_approximate_msec(TICKS_PER_STEP). But to minimize + rounding error, we do it this way instead, and divide last. + */ + return (uint32_t) + monotime_coarse_stamp_units_to_approx_msec(rate*TICKS_PER_STEP)/1000; +} + +/** + * Initialize a token bucket in *<b>bucket</b>, set up to allow <b>rate</b> + * bytes per second, with a maximum burst of <b>burst</b> bytes. The bucket + * is created such that <b>now_ts</b> is the current timestamp. The bucket + * starts out full. + */ +void +token_bucket_init(token_bucket_t *bucket, + uint32_t rate, + uint32_t burst, + uint32_t now_ts) +{ + memset(bucket, 0, sizeof(token_bucket_t)); + token_bucket_adjust(bucket, rate, burst); + token_bucket_reset(bucket, now_ts); +} + +/** + * Change the configured rate (in bytes per second) and burst (in bytes) + * for the token bucket in *<b>bucket</b>. + */ +void +token_bucket_adjust(token_bucket_t *bucket, + uint32_t rate, + uint32_t burst) +{ + tor_assert_nonfatal(rate > 0); + tor_assert_nonfatal(burst > 0); + if (burst > TOKEN_BUCKET_MAX_BURST) + burst = TOKEN_BUCKET_MAX_BURST; + + bucket->rate = rate_per_sec_to_rate_per_step(rate); + bucket->burst = burst; + bucket->read_bucket = MIN(bucket->read_bucket, (int32_t)burst); + bucket->write_bucket = MIN(bucket->write_bucket, (int32_t)burst); +} + +/** + * Reset <b>bucket</b> to be full, as of timestamp <b>now_ts</b>. + */ +void +token_bucket_reset(token_bucket_t *bucket, + uint32_t now_ts) +{ + bucket->read_bucket = bucket->burst; + bucket->write_bucket = bucket->burst; + bucket->last_refilled_at_ts = now_ts; +} + +/* Helper: see token_bucket_refill */ +static int +refill_single_bucket(int32_t *bucketptr, + const uint32_t rate, + const int32_t burst, + const uint32_t elapsed_steps) +{ + const int was_empty = (*bucketptr <= 0); + /* The casts here prevent an underflow. + * + * Note that even if the bucket value is negative, subtracting it from + * "burst" will still produce a correct result. If this result is + * ridiculously high, then the "elapsed_steps > gap / rate" check below + * should catch it. */ + const size_t gap = ((size_t)burst) - ((size_t)*bucketptr); + + if (elapsed_steps > gap / rate) { + *bucketptr = burst; + } else { + *bucketptr += rate * elapsed_steps; + } + + return was_empty && *bucketptr > 0; +} + +/** + * Refill <b>bucket</b> as appropriate, given that the current timestamp + * is <b>now_ts</b>. + * + * Return a bitmask containing TB_READ iff read bucket was empty and became + * nonempty, and TB_WRITE iff the write bucket was empty and became nonempty. + */ +int +token_bucket_refill(token_bucket_t *bucket, + uint32_t now_ts) +{ + const uint32_t elapsed_ticks = (now_ts - bucket->last_refilled_at_ts); + if (elapsed_ticks > UINT32_MAX-(300*1000)) { + /* Either about 48 days have passed since the last refill, or the + * monotonic clock has somehow moved backwards. (We're looking at you, + * Windows.). We accept up to a 5 minute jump backwards as + * "unremarkable". + */ + return 0; + } + const uint32_t elapsed_steps = elapsed_ticks / TICKS_PER_STEP; + + if (!elapsed_steps) { + /* Note that if less than one whole step elapsed, we don't advance the + * time in last_refilled_at_ts. That's intentional: we want to make sure + * that we add some bytes to it eventually. */ + return 0; + } + + int flags = 0; + if (refill_single_bucket(&bucket->read_bucket, + bucket->rate, bucket->burst, elapsed_steps)) + flags |= TB_READ; + if (refill_single_bucket(&bucket->write_bucket, + bucket->rate, bucket->burst, elapsed_steps)) + flags |= TB_WRITE; + + bucket->last_refilled_at_ts = now_ts; + return flags; +} + +static int +decrement_single_bucket(int32_t *bucketptr, + ssize_t n) +{ + if (BUG(n < 0)) + return 0; + const int becomes_empty = *bucketptr > 0 && n >= *bucketptr; + *bucketptr -= n; + return becomes_empty; +} + +/** + * Decrement the read token bucket in <b>bucket</b> by <b>n</b> bytes. + * + * Return true if the bucket was nonempty and became empty; return false + * otherwise. + */ +int +token_bucket_dec_read(token_bucket_t *bucket, + ssize_t n) +{ + return decrement_single_bucket(&bucket->read_bucket, n); +} + +/** + * Decrement the write token bucket in <b>bucket</b> by <b>n</b> bytes. + * + * Return true if the bucket was nonempty and became empty; return false + * otherwise. + */ +int +token_bucket_dec_write(token_bucket_t *bucket, + ssize_t n) +{ + return decrement_single_bucket(&bucket->write_bucket, n); +} + +/** + * As token_bucket_dec_read and token_bucket_dec_write, in a single operation. + */ +void +token_bucket_dec(token_bucket_t *bucket, + ssize_t n_read, ssize_t n_written) +{ + token_bucket_dec_read(bucket, n_read); + token_bucket_dec_read(bucket, n_written); +} + diff --git a/src/common/token_bucket.h b/src/common/token_bucket.h new file mode 100644 index 0000000000..2d1ccd5cf3 --- /dev/null +++ b/src/common/token_bucket.h @@ -0,0 +1,75 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file token_bucket.h + * \brief Headers for token_bucket.c + **/ + +#ifndef TOR_TOKEN_BUCKET_H +#define TOR_TOKEN_BUCKET_H + +#include "torint.h" + +typedef struct token_bucket_t { + uint32_t rate; + int32_t burst; + int32_t read_bucket; + int32_t write_bucket; + uint32_t last_refilled_at_ts; +} token_bucket_t; + +#define TOKEN_BUCKET_MAX_BURST INT32_MAX + +void token_bucket_init(token_bucket_t *bucket, + uint32_t rate, + uint32_t burst, + uint32_t now_ts); + +void token_bucket_adjust(token_bucket_t *bucket, + uint32_t rate, uint32_t burst); + +void token_bucket_reset(token_bucket_t *bucket, + uint32_t now_ts); + +#define TB_READ 1 +#define TB_WRITE 2 + +int token_bucket_refill(token_bucket_t *bucket, + uint32_t now_ts); + +int token_bucket_dec_read(token_bucket_t *bucket, + ssize_t n); +int token_bucket_dec_write(token_bucket_t *bucket, + ssize_t n); + +void token_bucket_dec(token_bucket_t *bucket, + ssize_t n_read, ssize_t n_written); + +static inline size_t token_bucket_get_read(const token_bucket_t *bucket); +static inline size_t +token_bucket_get_read(const token_bucket_t *bucket) +{ + const ssize_t b = bucket->read_bucket; + return b >= 0 ? b : 0; +} + +static inline size_t token_bucket_get_write(const token_bucket_t *bucket); +static inline size_t +token_bucket_get_write(const token_bucket_t *bucket) +{ + const ssize_t b = bucket->write_bucket; + return b >= 0 ? b : 0; +} + +#ifdef TOKEN_BUCKET_PRIVATE + +/* To avoid making the rates too small, we consider units of "steps", + * where a "step" is defined as this many timestamp ticks. Keep this + * a power of two if you can. */ +#define TICKS_PER_STEP 16 + +#endif + +#endif /* TOR_TOKEN_BUCKET_H */ + diff --git a/src/or/config.c b/src/or/config.c index 4064cf64ab..206274cd38 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -337,7 +337,7 @@ static config_var_t option_vars_[] = { V(DownloadExtraInfo, BOOL, "0"), V(TestingEnableConnBwEvent, BOOL, "0"), V(TestingEnableCellStatsEvent, BOOL, "0"), - V(TestingEnableTbEmptyEvent, BOOL, "0"), + OBSOLETE("TestingEnableTbEmptyEvent"), V(EnforceDistinctSubnets, BOOL, "1"), V(EntryNodes, ROUTERSET, NULL), V(EntryStatistics, BOOL, "0"), @@ -707,7 +707,6 @@ static const config_var_t testing_tor_network_defaults[] = { V(TestingDirConnectionMaxStall, INTERVAL, "30 seconds"), V(TestingEnableConnBwEvent, BOOL, "1"), V(TestingEnableCellStatsEvent, BOOL, "1"), - V(TestingEnableTbEmptyEvent, BOOL, "1"), VAR("___UsingTestNetworkDefaults", BOOL, UsingTestNetworkDefaults_, "1"), V(RendPostPeriod, INTERVAL, "2 minutes"), @@ -2189,6 +2188,12 @@ options_act(const or_options_t *old_options) options->PerConnBWBurst != old_options->PerConnBWBurst) connection_or_update_token_buckets(get_connection_array(), options); + if (options->BandwidthRate != old_options->BandwidthRate || + options->BandwidthBurst != old_options->BandwidthBurst || + options->BandwidthRate != old_options->BandwidthRate || + options->RelayBandwidthBurst != old_options->RelayBandwidthBurst) + connection_bucket_adjust(options); + if (options->MainloopStats != old_options->MainloopStats) { reset_main_loop_counters(); } @@ -4459,12 +4464,6 @@ options_validate(or_options_t *old_options, or_options_t *options, "Tor networks!"); } - if (options->TestingEnableTbEmptyEvent && - !options->TestingTorNetwork && !options->UsingTestNetworkDefaults_) { - REJECT("TestingEnableTbEmptyEvent may only be changed in testing " - "Tor networks!"); - } - if (options->TestingTorNetwork) { log_warn(LD_CONFIG, "TestingTorNetwork is set. This will make your node " "almost unusable in the public Tor network, and is " diff --git a/src/or/connection.c b/src/or/connection.c index 5532551cfe..1aad68678e 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -119,8 +119,6 @@ static connection_t *connection_listener_new( static void connection_init(time_t now, connection_t *conn, int type, int socket_family); static int connection_handle_listener_read(connection_t *conn, int new_type); -static int connection_bucket_should_increase(int bucket, - or_connection_t *conn); static int connection_finished_flushing(connection_t *conn); static int connection_flushed_some(connection_t *conn); static int connection_finished_connecting(connection_t *conn); @@ -2848,7 +2846,7 @@ connection_counts_as_relayed_traffic(connection_t *conn, time_t now) * non-negative) provides an upper limit for our answer. */ static ssize_t connection_bucket_round_robin(int base, int priority, - ssize_t global_bucket, ssize_t conn_bucket) + ssize_t global_bucket_val, ssize_t conn_bucket) { ssize_t at_most; ssize_t num_bytes_high = (priority ? 32 : 16) * base; @@ -2857,15 +2855,15 @@ connection_bucket_round_robin(int base, int priority, /* Do a rudimentary round-robin so one circuit can't hog a connection. * Pick at most 32 cells, at least 4 cells if possible, and if we're in * the middle pick 1/8 of the available bandwidth. */ - at_most = global_bucket / 8; + at_most = global_bucket_val / 8; at_most -= (at_most % base); /* round down */ if (at_most > num_bytes_high) /* 16 KB, or 8 KB for low-priority */ at_most = num_bytes_high; else if (at_most < num_bytes_low) /* 2 KB, or 1 KB for low-priority */ at_most = num_bytes_low; - if (at_most > global_bucket) - at_most = global_bucket; + if (at_most > global_bucket_val) + at_most = global_bucket_val; if (conn_bucket >= 0 && at_most > conn_bucket) at_most = conn_bucket; @@ -2881,13 +2879,13 @@ connection_bucket_read_limit(connection_t *conn, time_t now) { int base = RELAY_PAYLOAD_SIZE; int priority = conn->type != CONN_TYPE_DIR; - int conn_bucket = -1; - int global_bucket = global_read_bucket; + ssize_t conn_bucket = -1; + size_t global_bucket_val = token_bucket_get_read(&global_bucket); if (connection_speaks_cells(conn)) { or_connection_t *or_conn = TO_OR_CONN(conn); if (conn->state == OR_CONN_STATE_OPEN) - conn_bucket = or_conn->read_bucket; + conn_bucket = token_bucket_get_read(&or_conn->bucket); base = get_cell_network_size(or_conn->wide_circ_ids); } @@ -2896,12 +2894,13 @@ connection_bucket_read_limit(connection_t *conn, time_t now) return conn_bucket>=0 ? conn_bucket : 1<<14; } - if (connection_counts_as_relayed_traffic(conn, now) && - global_relayed_read_bucket <= global_read_bucket) - global_bucket = global_relayed_read_bucket; + if (connection_counts_as_relayed_traffic(conn, now)) { + size_t relayed = token_bucket_get_read(&global_relayed_bucket); + global_bucket_val = MIN(global_bucket_val, relayed); + } return connection_bucket_round_robin(base, priority, - global_bucket, conn_bucket); + global_bucket_val, conn_bucket); } /** How many bytes at most can we write onto this connection? */ @@ -2910,8 +2909,8 @@ connection_bucket_write_limit(connection_t *conn, time_t now) { int base = RELAY_PAYLOAD_SIZE; int priority = conn->type != CONN_TYPE_DIR; - int conn_bucket = (int)conn->outbuf_flushlen; - int global_bucket = global_write_bucket; + size_t conn_bucket = conn->outbuf_flushlen; + size_t global_bucket_val = token_bucket_get_write(&global_bucket); if (!connection_is_rate_limited(conn)) { /* be willing to write to local conns even if our buckets are empty */ @@ -2919,22 +2918,20 @@ connection_bucket_write_limit(connection_t *conn, time_t now) } if (connection_speaks_cells(conn)) { - /* use the per-conn write limit if it's lower, but if it's less - * than zero just use zero */ + /* use the per-conn write limit if it's lower */ or_connection_t *or_conn = TO_OR_CONN(conn); if (conn->state == OR_CONN_STATE_OPEN) - if (or_conn->write_bucket < conn_bucket) - conn_bucket = or_conn->write_bucket >= 0 ? - or_conn->write_bucket : 0; + conn_bucket = MIN(conn_bucket, token_bucket_get_write(&or_conn->bucket)); base = get_cell_network_size(or_conn->wide_circ_ids); } - if (connection_counts_as_relayed_traffic(conn, now) && - global_relayed_write_bucket <= global_write_bucket) - global_bucket = global_relayed_write_bucket; + if (connection_counts_as_relayed_traffic(conn, now)) { + size_t relayed = token_bucket_get_write(&global_relayed_bucket); + global_bucket_val = MIN(global_bucket_val, relayed); + } return connection_bucket_round_robin(base, priority, - global_bucket, conn_bucket); + global_bucket_val, conn_bucket); } /** Return 1 if the global write buckets are low enough that we @@ -2959,15 +2956,15 @@ connection_bucket_write_limit(connection_t *conn, time_t now) int global_write_bucket_low(connection_t *conn, size_t attempt, int priority) { - int smaller_bucket = global_write_bucket < global_relayed_write_bucket ? - global_write_bucket : global_relayed_write_bucket; + size_t smaller_bucket = MIN(token_bucket_get_write(&global_bucket), + token_bucket_get_write(&global_relayed_bucket)); if (authdir_mode(get_options()) && priority>1) return 0; /* there's always room to answer v2 if we're an auth dir */ if (!connection_is_rate_limited(conn)) return 0; /* local conns don't get limited */ - if (smaller_bucket < (int)attempt) + if (smaller_bucket < attempt) return 1; /* not enough space no matter the priority */ if (write_buckets_empty_last_second) @@ -2976,10 +2973,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority) if (priority == 1) { /* old-style v1 query */ /* Could we handle *two* of these requests within the next two seconds? */ const or_options_t *options = get_options(); - int64_t can_write = (int64_t)smaller_bucket + size_t can_write = smaller_bucket + 2*(options->RelayBandwidthRate ? options->RelayBandwidthRate : options->BandwidthRate); - if (can_write < 2*(int64_t)attempt) + if (can_write < 2*attempt) return 1; } else { /* v2 query */ /* no further constraints yet */ @@ -3019,57 +3016,6 @@ record_num_bytes_transferred_impl(connection_t *conn, rep_hist_note_exit_bytes(conn->port, num_written, num_read); } -/** Helper: convert given <b>tvnow</b> time value to milliseconds since - * midnight. */ -static uint32_t -msec_since_midnight(const struct timeval *tvnow) -{ - return (uint32_t)(((tvnow->tv_sec % 86400L) * 1000L) + - ((uint32_t)tvnow->tv_usec / (uint32_t)1000L)); -} - -/** Helper: return the time in milliseconds since <b>last_empty_time</b> - * when a bucket ran empty that previously had <b>tokens_before</b> tokens - * now has <b>tokens_after</b> tokens after refilling at timestamp - * <b>tvnow</b>, capped at <b>milliseconds_elapsed</b> milliseconds since - * last refilling that bucket. Return 0 if the bucket has not been empty - * since the last refill or has not been refilled. */ -uint32_t -bucket_millis_empty(int tokens_before, uint32_t last_empty_time, - int tokens_after, int milliseconds_elapsed, - const struct timeval *tvnow) -{ - uint32_t result = 0, refilled; - if (tokens_before <= 0 && tokens_after > tokens_before) { - refilled = msec_since_midnight(tvnow); - result = (uint32_t)((refilled + 86400L * 1000L - last_empty_time) % - (86400L * 1000L)); - if (result > (uint32_t)milliseconds_elapsed) - result = (uint32_t)milliseconds_elapsed; - } - return result; -} - -/** Check if a bucket which had <b>tokens_before</b> tokens and which got - * <b>tokens_removed</b> tokens removed at timestamp <b>tvnow</b> has run - * out of tokens, and if so, note the milliseconds since midnight in - * <b>timestamp_var</b> for the next TB_EMPTY event. */ -void -connection_buckets_note_empty_ts(uint32_t *timestamp_var, - int tokens_before, size_t tokens_removed, - const struct timeval *tvnow) -{ - if (tokens_before > 0 && (uint32_t)tokens_before <= tokens_removed) - *timestamp_var = msec_since_midnight(tvnow); -} - -/** Last time at which the global or relay buckets were emptied in msec - * since midnight. */ -static uint32_t global_relayed_read_emptied = 0, - global_relayed_write_emptied = 0, - global_read_emptied = 0, - global_write_emptied = 0; - /** We just read <b>num_read</b> and wrote <b>num_written</b> bytes * onto <b>conn</b>. Decrement buckets appropriately. */ static void @@ -3094,39 +3040,13 @@ connection_buckets_decrement(connection_t *conn, time_t now, if (!connection_is_rate_limited(conn)) return; /* local IPs are free */ - /* If one or more of our token buckets ran dry just now, note the - * timestamp for TB_EMPTY events. */ - if (get_options()->TestingEnableTbEmptyEvent) { - struct timeval tvnow; - tor_gettimeofday_cached(&tvnow); - if (connection_counts_as_relayed_traffic(conn, now)) { - connection_buckets_note_empty_ts(&global_relayed_read_emptied, - global_relayed_read_bucket, num_read, &tvnow); - connection_buckets_note_empty_ts(&global_relayed_write_emptied, - global_relayed_write_bucket, num_written, &tvnow); - } - connection_buckets_note_empty_ts(&global_read_emptied, - global_read_bucket, num_read, &tvnow); - connection_buckets_note_empty_ts(&global_write_emptied, - global_write_bucket, num_written, &tvnow); - if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { - or_connection_t *or_conn = TO_OR_CONN(conn); - connection_buckets_note_empty_ts(&or_conn->read_emptied_time, - or_conn->read_bucket, num_read, &tvnow); - connection_buckets_note_empty_ts(&or_conn->write_emptied_time, - or_conn->write_bucket, num_written, &tvnow); - } - } - if (connection_counts_as_relayed_traffic(conn, now)) { - global_relayed_read_bucket -= (int)num_read; - global_relayed_write_bucket -= (int)num_written; + token_bucket_dec(&global_relayed_bucket, num_read, num_written); } - global_read_bucket -= (int)num_read; - global_write_bucket -= (int)num_written; + token_bucket_dec(&global_bucket, num_read, num_written); if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { - TO_OR_CONN(conn)->read_bucket -= (int)num_read; - TO_OR_CONN(conn)->write_bucket -= (int)num_written; + or_connection_t *or_conn = TO_OR_CONN(conn); + token_bucket_dec(&or_conn->bucket, num_read, num_written); } } @@ -3140,14 +3060,14 @@ connection_consider_empty_read_buckets(connection_t *conn) if (!connection_is_rate_limited(conn)) return; /* Always okay. */ - if (global_read_bucket <= 0) { + if (token_bucket_get_read(&global_bucket) <= 0) { reason = "global read bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && - global_relayed_read_bucket <= 0) { + token_bucket_get_read(&global_relayed_bucket) <= 0) { reason = "global relayed read bucket exhausted. Pausing."; } else if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN && - TO_OR_CONN(conn)->read_bucket <= 0) { + token_bucket_get_read(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection read bucket exhausted. Pausing."; } else return; /* all good, no need to stop it */ @@ -3167,14 +3087,14 @@ connection_consider_empty_write_buckets(connection_t *conn) if (!connection_is_rate_limited(conn)) return; /* Always okay. */ - if (global_write_bucket <= 0) { + if (token_bucket_get_write(&global_bucket) <= 0) { reason = "global write bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && - global_relayed_write_bucket <= 0) { + token_bucket_get_write(&global_relayed_bucket) <= 0) { reason = "global relayed write bucket exhausted. Pausing."; } else if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN && - TO_OR_CONN(conn)->write_bucket <= 0) { + token_bucket_get_write(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection write bucket exhausted. Pausing."; } else return; /* all good, no need to stop it */ @@ -3184,180 +3104,79 @@ connection_consider_empty_write_buckets(connection_t *conn) connection_stop_writing(conn); } -/** Initialize the global read bucket to options-\>BandwidthBurst. */ +/** Initialize the global buckets to the values configured in the + * options */ void connection_bucket_init(void) { const or_options_t *options = get_options(); - /* start it at max traffic */ - global_read_bucket = (int)options->BandwidthBurst; - global_write_bucket = (int)options->BandwidthBurst; + const uint32_t now_ts = monotime_coarse_get_stamp(); + token_bucket_init(&global_bucket, + (int32_t)options->BandwidthRate, + (int32_t)options->BandwidthBurst, + now_ts); if (options->RelayBandwidthRate) { - global_relayed_read_bucket = (int)options->RelayBandwidthBurst; - global_relayed_write_bucket = (int)options->RelayBandwidthBurst; + token_bucket_init(&global_relayed_bucket, + (int32_t)options->RelayBandwidthRate, + (int32_t)options->RelayBandwidthBurst, + now_ts); } else { - global_relayed_read_bucket = (int)options->BandwidthBurst; - global_relayed_write_bucket = (int)options->BandwidthBurst; + token_bucket_init(&global_relayed_bucket, + (int32_t)options->BandwidthRate, + (int32_t)options->BandwidthBurst, + now_ts); } } -/** Refill a single <b>bucket</b> called <b>name</b> with bandwidth rate per - * second <b>rate</b> and bandwidth burst <b>burst</b>, assuming that - * <b>milliseconds_elapsed</b> milliseconds have passed since the last - * call. */ -static void -connection_bucket_refill_helper(int *bucket, int rate, int burst, - int milliseconds_elapsed, - const char *name) +/** Update the global connection bucket settings to a new value. */ +void +connection_bucket_adjust(const or_options_t *options) { - int starting_bucket = *bucket; - if (starting_bucket < burst && milliseconds_elapsed > 0) { - int64_t incr = (((int64_t)rate) * milliseconds_elapsed) / 1000; - if ((burst - starting_bucket) < incr) { - *bucket = burst; /* We would overflow the bucket; just set it to - * the maximum. */ - } else { - *bucket += (int)incr; - if (*bucket > burst || *bucket < starting_bucket) { - /* If we overflow the burst, or underflow our starting bucket, - * cap the bucket value to burst. */ - /* XXXX this might be redundant now, but it doesn't show up - * in profiles. Remove it after analysis. */ - *bucket = burst; - } - } - log_debug(LD_NET,"%s now %d.", name, *bucket); + token_bucket_adjust(&global_bucket, + (int32_t)options->BandwidthRate, + (int32_t)options->BandwidthBurst); + if (options->RelayBandwidthRate) { + token_bucket_adjust(&global_relayed_bucket, + (int32_t)options->RelayBandwidthRate, + (int32_t)options->RelayBandwidthBurst); + } else { + token_bucket_adjust(&global_relayed_bucket, + (int32_t)options->BandwidthRate, + (int32_t)options->BandwidthBurst); } } /** Time has passed; increment buckets appropriately. */ void -connection_bucket_refill(int milliseconds_elapsed, time_t now) +connection_bucket_refill(time_t now, uint32_t now_ts) { - const or_options_t *options = get_options(); smartlist_t *conns = get_connection_array(); - int bandwidthrate, bandwidthburst, relayrate, relayburst; - - int prev_global_read = global_read_bucket; - int prev_global_write = global_write_bucket; - int prev_relay_read = global_relayed_read_bucket; - int prev_relay_write = global_relayed_write_bucket; - struct timeval tvnow; /*< Only used if TB_EMPTY events are enabled. */ - - bandwidthrate = (int)options->BandwidthRate; - bandwidthburst = (int)options->BandwidthBurst; - - if (options->RelayBandwidthRate) { - relayrate = (int)options->RelayBandwidthRate; - relayburst = (int)options->RelayBandwidthBurst; - } else { - relayrate = bandwidthrate; - relayburst = bandwidthburst; - } - - tor_assert(milliseconds_elapsed >= 0); write_buckets_empty_last_second = - global_relayed_write_bucket <= 0 || global_write_bucket <= 0; + token_bucket_get_write(&global_bucket) <= 0 || + token_bucket_get_write(&global_relayed_bucket) <= 0; /* refill the global buckets */ - connection_bucket_refill_helper(&global_read_bucket, - bandwidthrate, bandwidthburst, - milliseconds_elapsed, - "global_read_bucket"); - connection_bucket_refill_helper(&global_write_bucket, - bandwidthrate, bandwidthburst, - milliseconds_elapsed, - "global_write_bucket"); - connection_bucket_refill_helper(&global_relayed_read_bucket, - relayrate, relayburst, - milliseconds_elapsed, - "global_relayed_read_bucket"); - connection_bucket_refill_helper(&global_relayed_write_bucket, - relayrate, relayburst, - milliseconds_elapsed, - "global_relayed_write_bucket"); - - /* If buckets were empty before and have now been refilled, tell any - * interested controllers. */ - if (get_options()->TestingEnableTbEmptyEvent) { - uint32_t global_read_empty_time, global_write_empty_time, - relay_read_empty_time, relay_write_empty_time; - tor_gettimeofday_cached(&tvnow); - global_read_empty_time = bucket_millis_empty(prev_global_read, - global_read_emptied, global_read_bucket, - milliseconds_elapsed, &tvnow); - global_write_empty_time = bucket_millis_empty(prev_global_write, - global_write_emptied, global_write_bucket, - milliseconds_elapsed, &tvnow); - control_event_tb_empty("GLOBAL", global_read_empty_time, - global_write_empty_time, milliseconds_elapsed); - relay_read_empty_time = bucket_millis_empty(prev_relay_read, - global_relayed_read_emptied, - global_relayed_read_bucket, - milliseconds_elapsed, &tvnow); - relay_write_empty_time = bucket_millis_empty(prev_relay_write, - global_relayed_write_emptied, - global_relayed_write_bucket, - milliseconds_elapsed, &tvnow); - control_event_tb_empty("RELAY", relay_read_empty_time, - relay_write_empty_time, milliseconds_elapsed); - } + token_bucket_refill(&global_bucket, now_ts); + token_bucket_refill(&global_relayed_bucket, now_ts); /* refill the per-connection buckets */ SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { if (connection_speaks_cells(conn)) { or_connection_t *or_conn = TO_OR_CONN(conn); - int orbandwidthrate = or_conn->bandwidthrate; - int orbandwidthburst = or_conn->bandwidthburst; - - int prev_conn_read = or_conn->read_bucket; - int prev_conn_write = or_conn->write_bucket; - - if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) { - connection_bucket_refill_helper(&or_conn->read_bucket, - orbandwidthrate, - orbandwidthburst, - milliseconds_elapsed, - "or_conn->read_bucket"); - } - if (connection_bucket_should_increase(or_conn->write_bucket, or_conn)) { - connection_bucket_refill_helper(&or_conn->write_bucket, - orbandwidthrate, - orbandwidthburst, - milliseconds_elapsed, - "or_conn->write_bucket"); - } - /* If buckets were empty before and have now been refilled, tell any - * interested controllers. */ - if (get_options()->TestingEnableTbEmptyEvent) { - char *bucket; - uint32_t conn_read_empty_time, conn_write_empty_time; - tor_asprintf(&bucket, "ORCONN ID="U64_FORMAT, - U64_PRINTF_ARG(or_conn->base_.global_identifier)); - conn_read_empty_time = bucket_millis_empty(prev_conn_read, - or_conn->read_emptied_time, - or_conn->read_bucket, - milliseconds_elapsed, &tvnow); - conn_write_empty_time = bucket_millis_empty(prev_conn_write, - or_conn->write_emptied_time, - or_conn->write_bucket, - milliseconds_elapsed, &tvnow); - control_event_tb_empty(bucket, conn_read_empty_time, - conn_write_empty_time, - milliseconds_elapsed); - tor_free(bucket); + if (conn->state == OR_CONN_STATE_OPEN) { + token_bucket_refill(&or_conn->bucket, now_ts); } } if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */ - && global_read_bucket > 0 /* and we're allowed to read */ + && token_bucket_get_read(&global_bucket) > 0 /* and we can read */ && (!connection_counts_as_relayed_traffic(conn, now) || - global_relayed_read_bucket > 0) /* even if we're relayed traffic */ + token_bucket_get_read(&global_relayed_bucket) > 0) && (!connection_speaks_cells(conn) || conn->state != OR_CONN_STATE_OPEN || - TO_OR_CONN(conn)->read_bucket > 0)) { + token_bucket_get_read(&TO_OR_CONN(conn)->bucket) > 0)) { /* and either a non-cell conn or a cell conn with non-empty bucket */ LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, "waking up conn (fd %d) for read", (int)conn->s)); @@ -3366,12 +3185,12 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) } if (conn->write_blocked_on_bw == 1 - && global_write_bucket > 0 /* and we're allowed to write */ + && token_bucket_get_write(&global_bucket) > 0 /* and we can write */ && (!connection_counts_as_relayed_traffic(conn, now) || - global_relayed_write_bucket > 0) /* even if it's relayed traffic */ + token_bucket_get_write(&global_relayed_bucket) > 0) && (!connection_speaks_cells(conn) || conn->state != OR_CONN_STATE_OPEN || - TO_OR_CONN(conn)->write_bucket > 0)) { + token_bucket_get_write(&TO_OR_CONN(conn)->bucket) > 0)) { LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, "waking up conn (fd %d) for write", (int)conn->s)); conn->write_blocked_on_bw = 0; @@ -3380,22 +3199,6 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) } SMARTLIST_FOREACH_END(conn); } -/** Is the <b>bucket</b> for connection <b>conn</b> low enough that we - * should add another pile of tokens to it? - */ -static int -connection_bucket_should_increase(int bucket, or_connection_t *conn) -{ - tor_assert(conn); - - if (conn->base_.state != OR_CONN_STATE_OPEN) - return 0; /* only open connections play the rate limiting game */ - if (bucket >= conn->bandwidthburst) - return 0; - - return 1; -} - /** Read bytes from conn-\>s and process them. * * It calls connection_buf_read_from_socket() to bring in any new bytes, diff --git a/src/or/connection.h b/src/or/connection.h index 6bc5a7cfd0..cfe31c3727 100644 --- a/src/or/connection.h +++ b/src/or/connection.h @@ -122,7 +122,9 @@ void connection_mark_all_noncontrol_connections(void); ssize_t connection_bucket_write_limit(connection_t *conn, time_t now); int global_write_bucket_low(connection_t *conn, size_t attempt, int priority); void connection_bucket_init(void); -void connection_bucket_refill(int seconds_elapsed, time_t now); +void connection_bucket_adjust(const or_options_t *options); +void connection_bucket_refill(time_t now, + uint32_t now_ts); int connection_handle_read(connection_t *conn); @@ -272,13 +274,6 @@ void connection_check_oos(int n_socks, int failed); STATIC void connection_free_minimal(connection_t *conn); /* Used only by connection.c and test*.c */ -uint32_t bucket_millis_empty(int tokens_before, uint32_t last_empty_time, - int tokens_after, int milliseconds_elapsed, - const struct timeval *tvnow); -void connection_buckets_note_empty_ts(uint32_t *timestamp_var, - int tokens_before, - size_t tokens_removed, - const struct timeval *tvnow); MOCK_DECL(STATIC int,connection_connect_sockaddr, (connection_t *conn, const struct sockaddr *sa, diff --git a/src/or/connection_or.c b/src/or/connection_or.c index 267463312c..3afdfa6b5a 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -793,18 +793,10 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset, (int)options->BandwidthBurst, 1, INT32_MAX); } - conn->bandwidthrate = rate; - conn->bandwidthburst = burst; - if (reset) { /* set up the token buckets to be full */ - conn->read_bucket = conn->write_bucket = burst; - return; + token_bucket_adjust(&conn->bucket, rate, burst); + if (reset) { + token_bucket_reset(&conn->bucket, monotime_coarse_get_stamp()); } - /* If the new token bucket is smaller, take out the extra tokens. - * (If it's larger, don't -- the buckets can grow to reach the cap.) */ - if (conn->read_bucket > burst) - conn->read_bucket = burst; - if (conn->write_bucket > burst) - conn->write_bucket = burst; } /** Either our set of relays or our per-conn rate limits have changed. diff --git a/src/or/control.c b/src/or/control.c index 5a2fae64e7..0539ddaca3 100644 --- a/src/or/control.c +++ b/src/or/control.c @@ -1214,7 +1214,6 @@ static const struct control_event_t control_event_table[] = { { EVENT_CONF_CHANGED, "CONF_CHANGED"}, { EVENT_CONN_BW, "CONN_BW" }, { EVENT_CELL_STATS, "CELL_STATS" }, - { EVENT_TB_EMPTY, "TB_EMPTY" }, { EVENT_CIRC_BANDWIDTH_USED, "CIRC_BW" }, { EVENT_TRANSPORT_LAUNCHED, "TRANSPORT_LAUNCHED" }, { EVENT_HS_DESC, "HS_DESC" }, @@ -6077,28 +6076,6 @@ control_event_circuit_cell_stats(void) return 0; } -/** Tokens in <b>bucket</b> have been refilled: the read bucket was empty - * for <b>read_empty_time</b> millis, the write bucket was empty for - * <b>write_empty_time</b> millis, and buckets were last refilled - * <b>milliseconds_elapsed</b> millis ago. Only emit TB_EMPTY event if - * either read or write bucket have been empty before. */ -int -control_event_tb_empty(const char *bucket, uint32_t read_empty_time, - uint32_t write_empty_time, - int milliseconds_elapsed) -{ - if (get_options()->TestingEnableTbEmptyEvent && - EVENT_IS_INTERESTING(EVENT_TB_EMPTY) && - (read_empty_time > 0 || write_empty_time > 0)) { - send_control_event(EVENT_TB_EMPTY, - "650 TB_EMPTY %s READ=%d WRITTEN=%d " - "LAST=%d\r\n", - bucket, read_empty_time, write_empty_time, - milliseconds_elapsed); - } - return 0; -} - /* about 5 minutes worth. */ #define N_BW_EVENTS_TO_CACHE 300 /* Index into cached_bw_events to next write. */ diff --git a/src/or/control.h b/src/or/control.h index 28ffeaed86..2fd3c553f3 100644 --- a/src/or/control.h +++ b/src/or/control.h @@ -59,9 +59,6 @@ int control_event_circ_bandwidth_used(void); int control_event_conn_bandwidth(connection_t *conn); int control_event_conn_bandwidth_used(void); int control_event_circuit_cell_stats(void); -int control_event_tb_empty(const char *bucket, uint32_t read_empty_time, - uint32_t write_empty_time, - int milliseconds_elapsed); void control_event_logmsg(int severity, uint32_t domain, const char *msg); int control_event_descriptors_changed(smartlist_t *routers); int control_event_address_mapped(const char *from, const char *to, @@ -194,7 +191,7 @@ void control_free_all(void); #define EVENT_CONF_CHANGED 0x0019 #define EVENT_CONN_BW 0x001A #define EVENT_CELL_STATS 0x001B -#define EVENT_TB_EMPTY 0x001C +/* UNUSED : 0x001C */ #define EVENT_CIRC_BANDWIDTH_USED 0x001D #define EVENT_TRANSPORT_LAUNCHED 0x0020 #define EVENT_HS_DESC 0x0021 diff --git a/src/or/main.c b/src/or/main.c index 4cc51de07b..0e67ea6f68 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -152,19 +152,19 @@ static void shutdown_did_not_work_callback(evutil_socket_t fd, short event, void *arg) ATTR_NORETURN; /********* START VARIABLES **********/ -int global_read_bucket; /**< Max number of bytes I can read this second. */ -int global_write_bucket; /**< Max number of bytes I can write this second. */ - -/** Max number of relayed (bandwidth class 1) bytes I can read this second. */ -int global_relayed_read_bucket; -/** Max number of relayed (bandwidth class 1) bytes I can write this second. */ -int global_relayed_write_bucket; -/** What was the read bucket before the last second_elapsed_callback() call? - * (used to determine how many bytes we've read). */ -static int stats_prev_global_read_bucket; + +/* Token bucket for all traffic. */ +token_bucket_t global_bucket; + +/* Token bucket for relayed traffic. */ +token_bucket_t global_relayed_bucket; + +/** What was the read/write bucket before the last second_elapsed_callback() + * call? (used to determine how many bytes we've read). */ +static size_t stats_prev_global_read_bucket; /** What was the write bucket before the last second_elapsed_callback() call? * (used to determine how many bytes we've written). */ -static int stats_prev_global_write_bucket; +static size_t stats_prev_global_write_bucket; /* DOCDOC stats_prev_n_read */ static uint64_t stats_prev_n_read = 0; @@ -2389,19 +2389,23 @@ refill_callback(periodic_timer_t *timer, void *arg) refill_timer_current_millisecond.tv_sec); } - bytes_written = stats_prev_global_write_bucket - global_write_bucket; - bytes_read = stats_prev_global_read_bucket - global_read_bucket; + bytes_written = stats_prev_global_write_bucket - + token_bucket_get_write(&global_bucket); + bytes_read = stats_prev_global_read_bucket - + token_bucket_get_read(&global_bucket); stats_n_bytes_read += bytes_read; stats_n_bytes_written += bytes_written; if (accounting_is_enabled(options) && milliseconds_elapsed >= 0) accounting_add_bytes(bytes_read, bytes_written, seconds_rolled_over); - if (milliseconds_elapsed > 0) - connection_bucket_refill(milliseconds_elapsed, (time_t)now.tv_sec); + if (milliseconds_elapsed > 0) { + connection_bucket_refill((time_t)now.tv_sec, + monotime_coarse_get_stamp()); + } - stats_prev_global_read_bucket = global_read_bucket; - stats_prev_global_write_bucket = global_write_bucket; + stats_prev_global_read_bucket = token_bucket_get_read(&global_bucket); + stats_prev_global_write_bucket = token_bucket_get_write(&global_bucket); /* remember what time it is, for next time */ refill_timer_current_millisecond = now; @@ -2605,8 +2609,8 @@ do_main_loop(void) /* Set up our buckets */ connection_bucket_init(); - stats_prev_global_read_bucket = global_read_bucket; - stats_prev_global_write_bucket = global_write_bucket; + stats_prev_global_read_bucket = token_bucket_get_read(&global_bucket); + stats_prev_global_write_bucket = token_bucket_get_write(&global_bucket); /* initialize the bootstrap status events to know we're starting up */ control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0); @@ -3501,8 +3505,8 @@ tor_free_all(int postfork) periodic_timer_free(systemd_watchdog_timer); #endif - global_read_bucket = global_write_bucket = 0; - global_relayed_read_bucket = global_relayed_write_bucket = 0; + memset(&global_bucket, 0, sizeof(global_bucket)); + memset(&global_relayed_bucket, 0, sizeof(global_relayed_bucket)); stats_prev_global_read_bucket = stats_prev_global_write_bucket = 0; stats_prev_n_read = stats_prev_n_written = 0; stats_n_bytes_read = stats_n_bytes_written = 0; diff --git a/src/or/main.h b/src/or/main.h index f01506fcea..9ef5b9472f 100644 --- a/src/or/main.h +++ b/src/or/main.h @@ -89,10 +89,8 @@ uint64_t get_main_loop_idle_count(void); extern time_t time_of_process_start; extern int quiet_level; -extern int global_read_bucket; -extern int global_write_bucket; -extern int global_relayed_read_bucket; -extern int global_relayed_write_bucket; +extern token_bucket_t global_bucket; +extern token_bucket_t global_relayed_bucket; #ifdef MAIN_PRIVATE STATIC void init_connection_lists(void); diff --git a/src/or/or.h b/src/or/or.h index b845443947..829b6755d2 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -80,6 +80,7 @@ #include "crypto_curve25519.h" #include "crypto_ed25519.h" #include "tor_queue.h" +#include "token_bucket.h" #include "util_format.h" #include "hs_circuitmap.h" @@ -1660,20 +1661,8 @@ typedef struct or_connection_t { time_t timestamp_lastempty; /**< When was the outbuf last completely empty?*/ - /* bandwidth* and *_bucket only used by ORs in OPEN state: */ - int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */ - int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */ - int read_bucket; /**< When this hits 0, stop receiving. Every second we - * add 'bandwidthrate' to this, capping it at - * bandwidthburst. (OPEN ORs only) */ - int write_bucket; /**< When this hits 0, stop writing. Like read_bucket. */ - - /** Last emptied read token bucket in msec since midnight; only used if - * TB_EMPTY events are enabled. */ - uint32_t read_emptied_time; - /** Last emptied write token bucket in msec since midnight; only used if - * TB_EMPTY events are enabled. */ - uint32_t write_emptied_time; + token_bucket_t bucket; /**< Used for rate limiting when the connection is + * in state CONN_OPEN. */ /* * Count the number of bytes flushed out on this orconn, and the number of @@ -4431,9 +4420,6 @@ typedef struct { /** Enable CELL_STATS events. Only altered on testing networks. */ int TestingEnableCellStatsEvent; - /** Enable TB_EMPTY events. Only altered on testing networks. */ - int TestingEnableTbEmptyEvent; - /** If true, and we have GeoIP data, and we're a bridge, keep a per-country * count of how many client addresses have contacted us so that we can help * the bridge authority guess which countries have blocked access to us. */ diff --git a/src/test/include.am b/src/test/include.am index a663fa5524..474da3f880 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -90,6 +90,7 @@ src_test_test_SOURCES = \ src/test/test_address_set.c \ src/test/test_bridges.c \ src/test/test_buffers.c \ + src/test/test_bwmgt.c \ src/test/test_cell_formats.c \ src/test/test_cell_queue.c \ src/test/test_channel.c \ diff --git a/src/test/test.c b/src/test/test.c index f90669b5dd..422e181b94 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -813,6 +813,7 @@ struct testgroup_t testgroups[] = { { "address_set/", address_set_tests }, { "bridges/", bridges_tests }, { "buffer/", buffer_tests }, + { "bwmgt/", bwmgt_tests }, { "cellfmt/", cell_format_tests }, { "cellqueue/", cell_queue_tests }, { "channel/", channel_tests }, diff --git a/src/test/test.h b/src/test/test.h index 34c6e46427..1728831ed0 100644 --- a/src/test/test.h +++ b/src/test/test.h @@ -187,6 +187,7 @@ extern struct testcase_t addr_tests[]; extern struct testcase_t address_tests[]; extern struct testcase_t address_set_tests[]; extern struct testcase_t bridges_tests[]; +extern struct testcase_t bwmgt_tests[]; extern struct testcase_t buffer_tests[]; extern struct testcase_t cell_format_tests[]; extern struct testcase_t cell_queue_tests[]; diff --git a/src/test/test_bwmgt.c b/src/test/test_bwmgt.c new file mode 100644 index 0000000000..1a54f44fc4 --- /dev/null +++ b/src/test/test_bwmgt.c @@ -0,0 +1,205 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file test_bwmgt.c + * \brief tests for bandwidth management / token bucket functions + */ + +#define TOKEN_BUCKET_PRIVATE + +#include "or.h" +#include "test.h" + +#include "token_bucket.h" + +// an imaginary time, in timestamp units. Chosen so it will roll over. +static const uint32_t START_TS = UINT32_MAX-10; +static const int32_t KB = 1024; + +static void +test_bwmgt_token_buf_init(void *arg) +{ + (void)arg; + token_bucket_t b; + + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + // Burst is correct + tt_uint_op(b.burst, OP_EQ, 64*KB); + // Rate is correct, within 1 percent. + { + uint32_t ticks_per_sec = + (uint32_t) monotime_msec_to_approx_coarse_stamp_units(1000); + uint32_t rate_per_sec = (b.rate * ticks_per_sec / TICKS_PER_STEP); + + tt_uint_op(rate_per_sec, OP_GT, 16*KB-160); + tt_uint_op(rate_per_sec, OP_LT, 16*KB+160); + } + // Bucket starts out full: + tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS); + tt_int_op(b.read_bucket, OP_EQ, 64*KB); + + done: + ; +} + +static void +test_bwmgt_token_buf_adjust(void *arg) +{ + (void)arg; + token_bucket_t b; + + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + + uint32_t rate_orig = b.rate; + // Increasing burst + token_bucket_adjust(&b, 16*KB, 128*KB); + tt_uint_op(b.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket, OP_EQ, 64*KB); + tt_uint_op(b.burst, OP_EQ, 128*KB); + + // Decreasing burst but staying above bucket + token_bucket_adjust(&b, 16*KB, 96*KB); + tt_uint_op(b.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket, OP_EQ, 64*KB); + tt_uint_op(b.burst, OP_EQ, 96*KB); + + // Decreasing burst below bucket, + token_bucket_adjust(&b, 16*KB, 48*KB); + tt_uint_op(b.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket, OP_EQ, 48*KB); + tt_uint_op(b.burst, OP_EQ, 48*KB); + + // Changing rate. + token_bucket_adjust(&b, 32*KB, 48*KB); + tt_uint_op(b.rate, OP_GE, rate_orig*2 - 10); + tt_uint_op(b.rate, OP_LE, rate_orig*2 + 10); + tt_uint_op(b.read_bucket, OP_EQ, 48*KB); + tt_uint_op(b.burst, OP_EQ, 48*KB); + + done: + ; +} + +static void +test_bwmgt_token_buf_dec(void *arg) +{ + (void)arg; + token_bucket_t b; + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + + // full-to-not-full. + tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, KB)); + tt_int_op(b.read_bucket, OP_EQ, 63*KB); + + // Full to almost-not-full + tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 63*KB - 1)); + tt_int_op(b.read_bucket, OP_EQ, 1); + + // almost-not-full to empty. + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 1)); + tt_int_op(b.read_bucket, OP_EQ, 0); + + // reset bucket, try full-to-empty + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB)); + tt_int_op(b.read_bucket, OP_EQ, 0); + + // reset bucket, try underflow. + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB + 1)); + tt_int_op(b.read_bucket, OP_EQ, -1); + + // A second underflow does not make the bucket empty. + tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 1000)); + tt_int_op(b.read_bucket, OP_EQ, -1001); + + done: + ; +} + +static void +test_bwmgt_token_buf_refill(void *arg) +{ + (void)arg; + token_bucket_t b; + const uint32_t SEC = + (uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000); + printf("%d\n", (int)SEC); + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + + /* Make the buffer much emptier, then let one second elapse. */ + token_bucket_dec_read(&b, 48*KB); + tt_int_op(b.read_bucket, OP_EQ, 16*KB); + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC)); + tt_int_op(b.read_bucket, OP_GT, 32*KB - 300); + tt_int_op(b.read_bucket, OP_LT, 32*KB + 300); + + /* Another half second. */ + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2)); + tt_int_op(b.read_bucket, OP_GT, 40*KB - 400); + tt_int_op(b.read_bucket, OP_LT, 40*KB + 400); + tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2); + + /* No time: nothing happens. */ + { + const uint32_t bucket_orig = b.read_bucket; + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2)); + tt_int_op(b.read_bucket, OP_EQ, bucket_orig); + } + + /* Another 30 seconds: fill the bucket. */ + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*30)); + tt_int_op(b.read_bucket, OP_EQ, b.burst); + tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*30); + + /* Another 30 seconds: nothing happens. */ + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*60)); + tt_int_op(b.read_bucket, OP_EQ, b.burst); + tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*60); + + /* Empty the bucket, let two seconds pass, and make sure that a refill is + * noticed. */ + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.burst)); + tt_int_op(0, OP_EQ, b.read_bucket); + tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*61)); + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*62)); + tt_int_op(b.read_bucket, OP_GT, 32*KB-400); + tt_int_op(b.read_bucket, OP_LT, 32*KB+400); + + /* Underflow the bucket, make sure we detect when it has tokens again. */ + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.read_bucket+16*KB)); + tt_int_op(-16*KB, OP_EQ, b.read_bucket); + // half a second passes... + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64)); + tt_int_op(b.read_bucket, OP_GT, -8*KB-300); + tt_int_op(b.read_bucket, OP_LT, -8*KB+300); + // a second passes + tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*65)); + tt_int_op(b.read_bucket, OP_GT, 8*KB-400); + tt_int_op(b.read_bucket, OP_LT, 8*KB+400); + + // We step a second backwards, and nothing happens. + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64)); + tt_int_op(b.read_bucket, OP_GT, 8*KB-400); + tt_int_op(b.read_bucket, OP_LT, 8*KB+400); + + // A ridiculous amount of time passes. + tt_int_op(0, OP_EQ, token_bucket_refill(&b, INT32_MAX)); + tt_int_op(b.read_bucket, OP_EQ, b.burst); + + done: + ; +} + +#define BWMGT(name) \ + { #name, test_bwmgt_ ## name , 0, NULL, NULL } + +struct testcase_t bwmgt_tests[] = { + BWMGT(token_buf_init), + BWMGT(token_buf_adjust), + BWMGT(token_buf_dec), + BWMGT(token_buf_refill), + END_OF_TESTCASES +}; + diff --git a/src/test/test_controller_events.c b/src/test/test_controller_events.c index 901ad7ab3d..e81aea8d66 100644 --- a/src/test/test_controller_events.c +++ b/src/test/test_controller_events.c @@ -12,79 +12,6 @@ #include "test.h" static void -help_test_bucket_note_empty(uint32_t expected_msec_since_midnight, - int tokens_before, size_t tokens_removed, - uint32_t msec_since_epoch) -{ - uint32_t timestamp_var = 0; - struct timeval tvnow; - tvnow.tv_sec = msec_since_epoch / 1000; - tvnow.tv_usec = (msec_since_epoch % 1000) * 1000; - connection_buckets_note_empty_ts(×tamp_var, tokens_before, - tokens_removed, &tvnow); - tt_int_op(expected_msec_since_midnight, OP_EQ, timestamp_var); - - done: - ; -} - -static void -test_cntev_bucket_note_empty(void *arg) -{ - (void)arg; - - /* Two cases with nothing to note, because bucket was empty before; - * 86442200 == 1970-01-02 00:00:42.200000 */ - help_test_bucket_note_empty(0, 0, 0, 86442200); - help_test_bucket_note_empty(0, -100, 100, 86442200); - - /* Nothing to note, because bucket has not been emptied. */ - help_test_bucket_note_empty(0, 101, 100, 86442200); - - /* Bucket was emptied, note 42200 msec since midnight. */ - help_test_bucket_note_empty(42200, 101, 101, 86442200); - help_test_bucket_note_empty(42200, 101, 102, 86442200); -} - -static void -test_cntev_bucket_millis_empty(void *arg) -{ - struct timeval tvnow; - (void)arg; - - /* 1970-01-02 00:00:42.200000 */ - tvnow.tv_sec = 86400 + 42; - tvnow.tv_usec = 200000; - - /* Bucket has not been refilled. */ - tt_int_op(0, OP_EQ, bucket_millis_empty(0, 42120, 0, 100, &tvnow)); - tt_int_op(0, OP_EQ, bucket_millis_empty(-10, 42120, -10, 100, &tvnow)); - - /* Bucket was not empty. */ - tt_int_op(0, OP_EQ, bucket_millis_empty(10, 42120, 20, 100, &tvnow)); - - /* Bucket has been emptied 80 msec ago and has just been refilled. */ - tt_int_op(80, OP_EQ, bucket_millis_empty(-20, 42120, -10, 100, &tvnow)); - tt_int_op(80, OP_EQ, bucket_millis_empty(-10, 42120, 0, 100, &tvnow)); - tt_int_op(80, OP_EQ, bucket_millis_empty(0, 42120, 10, 100, &tvnow)); - - /* Bucket has been emptied 180 msec ago, last refill was 100 msec ago - * which was insufficient to make it positive, so cap msec at 100. */ - tt_int_op(100, OP_EQ, bucket_millis_empty(0, 42020, 1, 100, &tvnow)); - - /* 1970-01-02 00:00:00:050000 */ - tvnow.tv_sec = 86400; - tvnow.tv_usec = 50000; - - /* Last emptied 30 msec before midnight, tvnow is 50 msec after - * midnight, that's 80 msec in total. */ - tt_int_op(80, OP_EQ, bucket_millis_empty(0, 86400000 - 30, 1, 100, &tvnow)); - - done: - ; -} - -static void add_testing_cell_stats_entry(circuit_t *circ, uint8_t command, unsigned int waiting_time, unsigned int removed, unsigned int exitward) @@ -395,8 +322,6 @@ test_cntev_event_mask(void *arg) { #name, test_cntev_ ## name, flags, 0, NULL } struct testcase_t controller_event_tests[] = { - TEST(bucket_note_empty, TT_FORK), - TEST(bucket_millis_empty, TT_FORK), TEST(sum_up_cell_stats, TT_FORK), TEST(append_cell_stats, TT_FORK), TEST(format_cell_stats, TT_FORK), diff --git a/src/test/test_options.c b/src/test/test_options.c index af349ed015..9974ed2575 100644 --- a/src/test/test_options.c +++ b/src/test/test_options.c @@ -4104,16 +4104,6 @@ test_options_validate__testing_options(void *ignored) free_options_test_data(tdata); tdata = get_options_test_data(TEST_OPTIONS_DEFAULT_VALUES "TestingEnableTbEmptyEvent 1\n" - ); - ret = options_validate(tdata->old_opt, tdata->opt, tdata->def_opt, 0, &msg); - tt_int_op(ret, OP_EQ, -1); - tt_str_op(msg, OP_EQ, "TestingEnableTbEmptyEvent may only be changed " - "in testing Tor networks!"); - tor_free(msg); - - free_options_test_data(tdata); - tdata = get_options_test_data(TEST_OPTIONS_DEFAULT_VALUES - "TestingEnableTbEmptyEvent 1\n" VALID_DIR_AUTH "TestingTorNetwork 1\n" "___UsingTestNetworkDefaults 0\n" diff --git a/src/test/test_util.c b/src/test/test_util.c index ce8567d9af..3dd2b51a3a 100644 --- a/src/test/test_util.c +++ b/src/test/test_util.c @@ -5907,6 +5907,13 @@ test_util_monotonic_time(void *arg) tt_u64_op(coarse_stamp_diff, OP_GE, 120); tt_u64_op(coarse_stamp_diff, OP_LE, 1200); + { + uint64_t units = monotime_msec_to_approx_coarse_stamp_units(5000); + uint64_t ms = monotime_coarse_stamp_units_to_approx_msec(units); + tt_int_op(ms, OP_GE, 4950); + tt_int_op(ms, OP_LT, 5050); + } + done: ; } |