diff options
-rw-r--r-- | changes/bug25373 | 7 | ||||
-rw-r--r-- | changes/bug25828 | 7 | ||||
-rw-r--r-- | doc/tor.1.txt | 8 | ||||
-rw-r--r-- | src/common/token_bucket.c | 13 | ||||
-rw-r--r-- | src/common/token_bucket.h | 4 | ||||
-rw-r--r-- | src/or/connection.c | 251 | ||||
-rw-r--r-- | src/or/connection.h | 8 | ||||
-rw-r--r-- | src/or/hibernate.c | 2 | ||||
-rw-r--r-- | src/or/main.c | 120 | ||||
-rw-r--r-- | src/or/main.h | 1 | ||||
-rw-r--r-- | src/or/or.h | 1 |
11 files changed, 252 insertions, 170 deletions
diff --git a/changes/bug25373 b/changes/bug25373 new file mode 100644 index 0000000000..03e870e692 --- /dev/null +++ b/changes/bug25373 @@ -0,0 +1,7 @@ + o Major features (main loop, CPU wakeup): + - The bandwidth-limitation logic has been refactored so that + bandwidth calculations are performed on-demand, rather than + every TokenBucketRefillInterval milliseconds. + This change should improve the granularity of our bandwidth + calculations, and limit the number of times that the Tor process needs + to wake up when it is idle. Closes ticket 25373. diff --git a/changes/bug25828 b/changes/bug25828 new file mode 100644 index 0000000000..45cd1f4ae8 --- /dev/null +++ b/changes/bug25828 @@ -0,0 +1,7 @@ + o Minor bugfixes (bandwidth management): + - Consider ourselves "low on write bandwidth" if we have exhausted our + write bandwidth some time in the last second. This was the + documented behavior before, but the actual behavior was to change + this value every TokenBucketRefillInterval. Fixes bug 25828; bugfix on + 0.2.3.5-alpha. + diff --git a/doc/tor.1.txt b/doc/tor.1.txt index 594dedf397..cf666e9142 100644 --- a/doc/tor.1.txt +++ b/doc/tor.1.txt @@ -1286,9 +1286,11 @@ The following options are useful only for clients (that is, if 2 minutes) [[TokenBucketRefillInterval]] **TokenBucketRefillInterval** __NUM__ [**msec**|**second**]:: - Set the refill interval of Tor's token bucket to NUM milliseconds. - NUM must be between 1 and 1000, inclusive. Note that the configured - bandwidth limits are still expressed in bytes per second: this + Set the refill delay interval of Tor's token bucket to NUM milliseconds. + NUM must be between 1 and 1000, inclusive. When Tor is out of bandwidth, + on a connection or globally, it will wait up to this long before it tries + to use that connection again. + Note that bandwidth limits are still expressed in bytes per second: this option only affects the frequency with which Tor checks to see whether previously exhausted connections may read again. Can not be changed while tor is running. (Default: 100 msec) diff --git a/src/common/token_bucket.c b/src/common/token_bucket.c index 747189e751..f2396ec58a 100644 --- a/src/common/token_bucket.c +++ b/src/common/token_bucket.c @@ -238,13 +238,18 @@ token_bucket_rw_dec_write(token_bucket_rw_t *bucket, /** * As token_bucket_rw_dec_read and token_bucket_rw_dec_write, in a single - * operation. + * operation. Return a bitmask of TB_READ and TB_WRITE to indicate + * which buckets became empty. */ -void +int token_bucket_rw_dec(token_bucket_rw_t *bucket, ssize_t n_read, ssize_t n_written) { - token_bucket_rw_dec_read(bucket, n_read); - token_bucket_rw_dec_write(bucket, n_written); + int flags = 0; + if (token_bucket_rw_dec_read(bucket, n_read)) + flags |= TB_READ; + if (token_bucket_rw_dec_write(bucket, n_written)) + flags |= TB_WRITE; + return flags; } diff --git a/src/common/token_bucket.h b/src/common/token_bucket.h index fb5d9fc60a..0e7832e838 100644 --- a/src/common/token_bucket.h +++ b/src/common/token_bucket.h @@ -85,8 +85,8 @@ int token_bucket_rw_dec_read(token_bucket_rw_t *bucket, int token_bucket_rw_dec_write(token_bucket_rw_t *bucket, ssize_t n); -void token_bucket_rw_dec(token_bucket_rw_t *bucket, - ssize_t n_read, ssize_t n_written); +int token_bucket_rw_dec(token_bucket_rw_t *bucket, + ssize_t n_read, ssize_t n_written); static inline size_t token_bucket_rw_get_read(const token_bucket_rw_t *bucket); static inline size_t diff --git a/src/or/connection.c b/src/or/connection.c index e8ecf0db9f..389199f18e 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -1,4 +1,4 @@ - /* Copyright (c) 2001 Matej Pfajfar. +/* Copyright (c) 2001 Matej Pfajfar. * Copyright (c) 2001-2004, Roger Dingledine. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. * Copyright (c) 2007-2017, The Tor Project, Inc. */ @@ -85,6 +85,7 @@ #include "ext_orport.h" #include "geoip.h" #include "main.h" +#include "hibernate.h" #include "hs_common.h" #include "hs_ident.h" #include "nodelist.h" @@ -137,6 +138,8 @@ static const char *proxy_type_to_string(int proxy_type); static int get_proxy_type(void); const tor_addr_t *conn_get_outbound_address(sa_family_t family, const or_options_t *options, unsigned int conn_type); +static void reenable_blocked_connection_init(const or_options_t *options); +static void reenable_blocked_connection_schedule(void); /** The last addresses that our network interface seemed to have been * binding to. We use this as one way to detect when our IP changes. @@ -772,8 +775,8 @@ connection_close_immediate(connection_t *conn) connection_unregister_events(conn); /* Prevent the event from getting unblocked. */ - conn->read_blocked_on_bw = - conn->write_blocked_on_bw = 0; + conn->read_blocked_on_bw = 0; + conn->write_blocked_on_bw = 0; if (SOCKET_OK(conn->s)) tor_close_socket(conn->s); @@ -2814,10 +2817,10 @@ connection_is_rate_limited(connection_t *conn) return 1; } -/** Did either global write bucket run dry last second? If so, - * we are likely to run dry again this second, so be stingy with the - * tokens we just put in. */ -static int write_buckets_empty_last_second = 0; +/** When was either global write bucket last empty? If this was recent, then + * we're probably low on bandwidth, and we should be stingy with our bandwidth + * usage. */ +static time_t write_buckets_last_empty_at = -100; /** How many seconds of no active local circuits will make the * connection revert to the "relayed" bandwidth class? */ @@ -2845,14 +2848,14 @@ connection_counts_as_relayed_traffic(connection_t *conn, time_t now) * write many of them or just a few; and <b>conn_bucket</b> (if * non-negative) provides an upper limit for our answer. */ static ssize_t -connection_bucket_round_robin(int base, int priority, - ssize_t global_bucket_val, ssize_t conn_bucket) +connection_bucket_get_share(int base, int priority, + ssize_t global_bucket_val, ssize_t conn_bucket) { ssize_t at_most; ssize_t num_bytes_high = (priority ? 32 : 16) * base; ssize_t num_bytes_low = (priority ? 4 : 2) * base; - /* Do a rudimentary round-robin so one circuit can't hog a connection. + /* Do a rudimentary limiting so one circuit can't hog a connection. * Pick at most 32 cells, at least 4 cells if possible, and if we're in * the middle pick 1/8 of the available bandwidth. */ at_most = global_bucket_val / 8; @@ -2899,8 +2902,8 @@ connection_bucket_read_limit(connection_t *conn, time_t now) global_bucket_val = MIN(global_bucket_val, relayed); } - return connection_bucket_round_robin(base, priority, - global_bucket_val, conn_bucket); + return connection_bucket_get_share(base, priority, + global_bucket_val, conn_bucket); } /** How many bytes at most can we write onto this connection? */ @@ -2931,8 +2934,8 @@ connection_bucket_write_limit(connection_t *conn, time_t now) global_bucket_val = MIN(global_bucket_val, relayed); } - return connection_bucket_round_robin(base, priority, - global_bucket_val, conn_bucket); + return connection_bucket_get_share(base, priority, + global_bucket_val, conn_bucket); } /** Return 1 if the global write buckets are low enough that we @@ -2969,8 +2972,11 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority) if (smaller_bucket < attempt) return 1; /* not enough space no matter the priority */ - if (write_buckets_empty_last_second) - return 1; /* we're already hitting our limits, no more please */ + { + const time_t diff = approx_time() - write_buckets_last_empty_at; + if (diff <= 1) + return 1; /* we're already hitting our limits, no more please */ + } if (priority == 1) { /* old-style v1 query */ /* Could we handle *two* of these requests within the next two seconds? */ @@ -2986,6 +2992,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority) return 0; } +/** When did we last tell the accounting subsystem about transmitted + * bandwidth? */ +static time_t last_recorded_accounting_at = 0; + /** Helper: adjusts our bandwidth history and informs the controller as * appropriate, given that we have just read <b>num_read</b> bytes and written * <b>num_written</b> bytes on <b>conn</b>. */ @@ -3016,6 +3026,20 @@ record_num_bytes_transferred_impl(connection_t *conn, } if (conn->type == CONN_TYPE_EXIT) rep_hist_note_exit_bytes(conn->port, num_written, num_read); + + /* Remember these bytes towards statistics. */ + stats_increment_bytes_read_and_written(num_read, num_written); + + /* Remember these bytes towards accounting. */ + if (accounting_is_enabled(get_options())) { + if (now > last_recorded_accounting_at && last_recorded_accounting_at) { + accounting_add_bytes(num_read, num_written, + (int)(now - last_recorded_accounting_at)); + } else { + accounting_add_bytes(num_read, num_written, 0); + } + last_recorded_accounting_at = now; + } } /** We just read <b>num_read</b> and wrote <b>num_written</b> bytes @@ -3042,19 +3066,54 @@ connection_buckets_decrement(connection_t *conn, time_t now, if (!connection_is_rate_limited(conn)) return; /* local IPs are free */ + unsigned flags = 0; if (connection_counts_as_relayed_traffic(conn, now)) { - token_bucket_rw_dec(&global_relayed_bucket, num_read, num_written); + flags = token_bucket_rw_dec(&global_relayed_bucket, num_read, num_written); + } + flags |= token_bucket_rw_dec(&global_bucket, num_read, num_written); + + if (flags & TB_WRITE) { + write_buckets_last_empty_at = now; } - token_bucket_rw_dec(&global_bucket, num_read, num_written); if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { or_connection_t *or_conn = TO_OR_CONN(conn); token_bucket_rw_dec(&or_conn->bucket, num_read, num_written); } } +/** + * Mark <b>conn</b> as needing to stop reading because bandwidth has been + * exhausted. If <b>is_global_bw</b>, it is closing because global bandwidth + * limit has been exhausted. Otherwise, it is closing because its own + * bandwidth limit has been exhausted. + */ +void +connection_read_bw_exhausted(connection_t *conn, bool is_global_bw) +{ + (void)is_global_bw; + conn->read_blocked_on_bw = 1; + connection_stop_reading(conn); + reenable_blocked_connection_schedule(); +} + +/** + * Mark <b>conn</b> as needing to stop reading because write bandwidth has + * been exhausted. If <b>is_global_bw</b>, it is closing because global + * bandwidth limit has been exhausted. Otherwise, it is closing because its + * own bandwidth limit has been exhausted. +*/ +void +connection_write_bw_exhausted(connection_t *conn, bool is_global_bw) +{ + (void)is_global_bw; + conn->write_blocked_on_bw = 1; + connection_stop_reading(conn); + reenable_blocked_connection_schedule(); +} + /** If we have exhausted our global buckets, or the buckets for conn, * stop reading. */ -static void +void connection_consider_empty_read_buckets(connection_t *conn) { const char *reason; @@ -3062,6 +3121,8 @@ connection_consider_empty_read_buckets(connection_t *conn) if (!connection_is_rate_limited(conn)) return; /* Always okay. */ + int is_global = 1; + if (token_bucket_rw_get_read(&global_bucket) <= 0) { reason = "global read bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && @@ -3071,17 +3132,17 @@ connection_consider_empty_read_buckets(connection_t *conn) conn->state == OR_CONN_STATE_OPEN && token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection read bucket exhausted. Pausing."; + is_global = false; } else return; /* all good, no need to stop it */ LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason)); - conn->read_blocked_on_bw = 1; - connection_stop_reading(conn); + connection_read_bw_exhausted(conn, is_global); } /** If we have exhausted our global buckets, or the buckets for conn, * stop writing. */ -static void +void connection_consider_empty_write_buckets(connection_t *conn) { const char *reason; @@ -3089,6 +3150,7 @@ connection_consider_empty_write_buckets(connection_t *conn) if (!connection_is_rate_limited(conn)) return; /* Always okay. */ + bool is_global = true; if (token_bucket_rw_get_write(&global_bucket) <= 0) { reason = "global write bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && @@ -3098,12 +3160,12 @@ connection_consider_empty_write_buckets(connection_t *conn) conn->state == OR_CONN_STATE_OPEN && token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection write bucket exhausted. Pausing."; + is_global = false; } else return; /* all good, no need to stop it */ LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason)); - conn->write_blocked_on_bw = 1; - connection_stop_writing(conn); + connection_write_bw_exhausted(conn, is_global); } /** Initialize the global buckets to the values configured in the @@ -3128,6 +3190,8 @@ connection_bucket_init(void) (int32_t)options->BandwidthBurst, now_ts); } + + reenable_blocked_connection_init(options); } /** Update the global connection bucket settings to a new value. */ @@ -3148,57 +3212,104 @@ connection_bucket_adjust(const or_options_t *options) } } -/** Time has passed; increment buckets appropriately. */ -void -connection_bucket_refill(time_t now, uint32_t now_ts) +/** + * Cached value of the last coarse-timestamp when we refilled the + * global buckets. + */ +static uint32_t last_refilled_global_buckets_ts=0; +/** + * Refill the token buckets for a single connection <b>conn</b>, and the + * global token buckets as appropriate. Requires that <b>now_ts</b> is + * the time in coarse timestamp units. + */ +static void +connection_bucket_refill_single(connection_t *conn, uint32_t now_ts) { - smartlist_t *conns = get_connection_array(); + /* Note that we only check for equality here: the underlying + * token bucket functions can handle moving backwards in time if they + * need to. */ + if (now_ts != last_refilled_global_buckets_ts) { + token_bucket_rw_refill(&global_bucket, now_ts); + token_bucket_rw_refill(&global_relayed_bucket, now_ts); + last_refilled_global_buckets_ts = now_ts; + } - write_buckets_empty_last_second = - token_bucket_rw_get_write(&global_bucket) <= 0 || - token_bucket_rw_get_write(&global_relayed_bucket) <= 0; + if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { + or_connection_t *or_conn = TO_OR_CONN(conn); + token_bucket_rw_refill(&or_conn->bucket, now_ts); + } +} - /* refill the global buckets */ - token_bucket_rw_refill(&global_bucket, now_ts); - token_bucket_rw_refill(&global_relayed_bucket, now_ts); +/** + * Event to re-enable all connections that were previously blocked on read or + * write. + */ +static mainloop_event_t *reenable_blocked_connections_ev = NULL; - /* refill the per-connection buckets */ - SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { - if (connection_speaks_cells(conn)) { - or_connection_t *or_conn = TO_OR_CONN(conn); +/** True iff reenable_blocked_connections_ev is currently scheduled. */ +static int reenable_blocked_connections_is_scheduled = 0; - if (conn->state == OR_CONN_STATE_OPEN) { - token_bucket_rw_refill(&or_conn->bucket, now_ts); - } - } +/** Delay after which to run reenable_blocked_connections_ev. */ +static struct timeval reenable_blocked_connections_delay; - if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */ - && token_bucket_rw_get_read(&global_bucket) > 0 /* and we can read */ - && (!connection_counts_as_relayed_traffic(conn, now) || - token_bucket_rw_get_read(&global_relayed_bucket) > 0) - && (!connection_speaks_cells(conn) || - conn->state != OR_CONN_STATE_OPEN || - token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) > 0)) { - /* and either a non-cell conn or a cell conn with non-empty bucket */ - LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, - "waking up conn (fd %d) for read", (int)conn->s)); - conn->read_blocked_on_bw = 0; +/** + * Re-enable all connections that were previously blocked on read or write. + * This event is scheduled after enough time has elapsed to be sure + * that the buckets will refill when the connections have something to do. + */ +static void +reenable_blocked_connections_cb(mainloop_event_t *ev, void *arg) +{ + (void)ev; + (void)arg; + SMARTLIST_FOREACH_BEGIN(get_connection_array(), connection_t *, conn) { + if (conn->read_blocked_on_bw == 1) { connection_start_reading(conn); + conn->read_blocked_on_bw = 0; } - - if (conn->write_blocked_on_bw == 1 - && token_bucket_rw_get_write(&global_bucket) > 0 /* and we can write */ - && (!connection_counts_as_relayed_traffic(conn, now) || - token_bucket_rw_get_write(&global_relayed_bucket) > 0) - && (!connection_speaks_cells(conn) || - conn->state != OR_CONN_STATE_OPEN || - token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) > 0)) { - LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, - "waking up conn (fd %d) for write", (int)conn->s)); - conn->write_blocked_on_bw = 0; + if (conn->write_blocked_on_bw == 1) { connection_start_writing(conn); + conn->write_blocked_on_bw = 0; } } SMARTLIST_FOREACH_END(conn); + + reenable_blocked_connections_is_scheduled = 0; +} + +/** + * Initialize the mainloop event that we use to wake up connections that + * find themselves blocked on bandwidth. + */ +static void +reenable_blocked_connection_init(const or_options_t *options) +{ + if (! reenable_blocked_connections_ev) { + reenable_blocked_connections_ev = + mainloop_event_new(reenable_blocked_connections_cb, NULL); + reenable_blocked_connections_is_scheduled = 0; + } + time_t sec = options->TokenBucketRefillInterval / 1000; + int msec = (options->TokenBucketRefillInterval % 1000); + reenable_blocked_connections_delay.tv_sec = sec; + reenable_blocked_connections_delay.tv_usec = msec * 1000; +} + +/** + * Called when we have blocked a connection for being low on bandwidth: + * schedule an event to reenable such connections, if it is not already + * scheduled. + */ +static void +reenable_blocked_connection_schedule(void) +{ + if (reenable_blocked_connections_is_scheduled) + return; + if (BUG(reenable_blocked_connections_ev == NULL)) { + reenable_blocked_connection_init(get_options()); + } + mainloop_event_schedule(reenable_blocked_connections_ev, + &reenable_blocked_connections_delay); + reenable_blocked_connections_is_scheduled = 1; } /** Read bytes from conn-\>s and process them. @@ -3221,6 +3332,8 @@ connection_handle_read_impl(connection_t *conn) conn->timestamp_last_read_allowed = approx_time(); + connection_bucket_refill_single(conn, monotime_coarse_get_stamp()); + switch (conn->type) { case CONN_TYPE_OR_LISTENER: return connection_handle_listener_read(conn, CONN_TYPE_OR); @@ -3645,6 +3758,8 @@ connection_handle_write_impl(connection_t *conn, int force) conn->timestamp_last_write_allowed = now; + connection_bucket_refill_single(conn, monotime_coarse_get_stamp()); + /* Sometimes, "writable" means "connected". */ if (connection_state_is_connecting(conn)) { if (getsockopt(conn->s, SOL_SOCKET, SO_ERROR, (void*)&e, &len) < 0) { @@ -3758,8 +3873,7 @@ connection_handle_write_impl(connection_t *conn, int force) /* Make sure to avoid a loop if the receive buckets are empty. */ log_debug(LD_NET,"wanted read."); if (!connection_is_reading(conn)) { - connection_stop_writing(conn); - conn->write_blocked_on_bw = 1; + connection_write_bw_exhausted(conn, true); /* we'll start reading again when we get more tokens in our * read bucket; then we'll start writing again too. */ @@ -5109,6 +5223,11 @@ connection_free_all(void) tor_free(last_interface_ipv4); tor_free(last_interface_ipv6); + last_recorded_accounting_at = 0; + + mainloop_event_free(reenable_blocked_connections_ev); + reenable_blocked_connections_is_scheduled = 0; + memset(&reenable_blocked_connections_delay, 0, sizeof(struct timeval)); } /** Log a warning, and possibly emit a control event, that <b>received</b> came diff --git a/src/or/connection.h b/src/or/connection.h index cfe31c3727..a2dce2435f 100644 --- a/src/or/connection.h +++ b/src/or/connection.h @@ -123,8 +123,12 @@ ssize_t connection_bucket_write_limit(connection_t *conn, time_t now); int global_write_bucket_low(connection_t *conn, size_t attempt, int priority); void connection_bucket_init(void); void connection_bucket_adjust(const or_options_t *options); -void connection_bucket_refill(time_t now, - uint32_t now_ts); +void connection_bucket_refill_all(time_t now, + uint32_t now_ts); +void connection_read_bw_exhausted(connection_t *conn, bool is_global_bw); +void connection_write_bw_exhausted(connection_t *conn, bool is_global_bw); +void connection_consider_empty_read_buckets(connection_t *conn); +void connection_consider_empty_write_buckets(connection_t *conn); int connection_handle_read(connection_t *conn); diff --git a/src/or/hibernate.c b/src/or/hibernate.c index 7261cf8002..9fed338555 100644 --- a/src/or/hibernate.c +++ b/src/or/hibernate.c @@ -297,7 +297,7 @@ accounting_get_end_time,(void)) return interval_end_time; } -/** Called from main.c to tell us that <b>seconds</b> seconds have +/** Called from connection.c to tell us that <b>seconds</b> seconds have * passed, <b>n_read</b> bytes have been read, and <b>n_written</b> * bytes have been written. */ void diff --git a/src/or/main.c b/src/or/main.c index 595246bfcd..d64c40240d 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -160,13 +160,6 @@ token_bucket_rw_t global_bucket; /* Token bucket for relayed traffic. */ token_bucket_rw_t global_relayed_bucket; -/** What was the read/write bucket before the last second_elapsed_callback() - * call? (used to determine how many bytes we've read). */ -static size_t stats_prev_global_read_bucket; -/** What was the write bucket before the last second_elapsed_callback() call? - * (used to determine how many bytes we've written). */ -static size_t stats_prev_global_write_bucket; - /* DOCDOC stats_prev_n_read */ static uint64_t stats_prev_n_read = 0; /* DOCDOC stats_prev_n_written */ @@ -480,21 +473,37 @@ get_connection_array, (void)) return connection_array; } -/** Provides the traffic read and written over the life of the process. */ - +/** + * Return the amount of network traffic read, in bytes, over the life of this + * process. + */ MOCK_IMPL(uint64_t, get_bytes_read,(void)) { return stats_n_bytes_read; } -/* DOCDOC get_bytes_written */ +/** + * Return the amount of network traffic read, in bytes, over the life of this + * process. + */ MOCK_IMPL(uint64_t, get_bytes_written,(void)) { return stats_n_bytes_written; } +/** + * Increment the amount of network traffic read and written, over the life of + * this process. + */ +void +stats_increment_bytes_read_and_written(uint64_t r, uint64_t w) +{ + stats_n_bytes_read += r; + stats_n_bytes_written += w; +} + /** Set the event mask on <b>conn</b> to <b>events</b>. (The event * mask is a bitmask whose bits are READ_EVENT and WRITE_EVENT) */ @@ -1026,19 +1035,22 @@ conn_close_if_marked(int i) * busy Libevent loops where we keep ending up here and returning * 0 until we are no longer blocked on bandwidth. */ - if (connection_is_writing(conn)) { - conn->write_blocked_on_bw = 1; - connection_stop_writing(conn); + connection_consider_empty_read_buckets(conn); + connection_consider_empty_write_buckets(conn); + + /* Make sure that consider_empty_buckets really disabled the + * connection: */ + if (BUG(connection_is_writing(conn))) { + connection_write_bw_exhausted(conn, true); } - if (connection_is_reading(conn)) { + if (BUG(connection_is_reading(conn))) { /* XXXX+ We should make this code unreachable; if a connection is * marked for close and flushing, there is no point in reading to it * at all. Further, checking at this point is a bit of a hack: it * would make much more sense to react in * connection_handle_read_impl, or to just stop reading in * mark_and_flush */ - conn->read_blocked_on_bw = 1; - connection_stop_reading(conn); + connection_read_bw_exhausted(conn, true/* kludge. */); } } return 0; @@ -2440,63 +2452,6 @@ systemd_watchdog_callback(periodic_timer_t *timer, void *arg) } #endif /* defined(HAVE_SYSTEMD_209) */ -/** Timer: used to invoke refill_callback(). */ -static periodic_timer_t *refill_timer = NULL; - -/** Millisecond when refall_callback was last invoked. */ -static struct timeval refill_timer_current_millisecond; - -/** Libevent callback: invoked periodically to refill token buckets - * and count r/w bytes. */ -static void -refill_callback(periodic_timer_t *timer, void *arg) -{ - struct timeval now; - - size_t bytes_written; - size_t bytes_read; - int milliseconds_elapsed = 0; - int seconds_rolled_over = 0; - - const or_options_t *options = get_options(); - - (void)timer; - (void)arg; - - tor_gettimeofday(&now); - - /* If this is our first time, no time has passed. */ - if (refill_timer_current_millisecond.tv_sec) { - long mdiff = tv_mdiff(&refill_timer_current_millisecond, &now); - if (mdiff > INT_MAX) - mdiff = INT_MAX; - milliseconds_elapsed = (int)mdiff; - seconds_rolled_over = (int)(now.tv_sec - - refill_timer_current_millisecond.tv_sec); - } - - bytes_written = stats_prev_global_write_bucket - - token_bucket_rw_get_write(&global_bucket); - bytes_read = stats_prev_global_read_bucket - - token_bucket_rw_get_read(&global_bucket); - - stats_n_bytes_read += bytes_read; - stats_n_bytes_written += bytes_written; - if (accounting_is_enabled(options) && milliseconds_elapsed >= 0) - accounting_add_bytes(bytes_read, bytes_written, seconds_rolled_over); - - if (milliseconds_elapsed > 0) { - connection_bucket_refill((time_t)now.tv_sec, - monotime_coarse_get_stamp()); - } - - stats_prev_global_read_bucket = token_bucket_rw_get_read(&global_bucket); - stats_prev_global_write_bucket = token_bucket_rw_get_write(&global_bucket); - - /* remember what time it is, for next time */ - refill_timer_current_millisecond = now; -} - #ifndef _WIN32 /** Called when a possibly ignorable libevent error occurs; ensures that we * don't get into an infinite loop by ignoring too many errors from @@ -2700,8 +2655,6 @@ do_main_loop(void) /* Set up our buckets */ connection_bucket_init(); - stats_prev_global_read_bucket = token_bucket_rw_get_read(&global_bucket); - stats_prev_global_write_bucket = token_bucket_rw_get_write(&global_bucket); /* initialize the bootstrap status events to know we're starting up */ control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0); @@ -2799,20 +2752,6 @@ do_main_loop(void) } #endif /* defined(HAVE_SYSTEMD_209) */ - if (!refill_timer) { - struct timeval refill_interval; - int msecs = get_options()->TokenBucketRefillInterval; - - refill_interval.tv_sec = msecs/1000; - refill_interval.tv_usec = (msecs%1000)*1000; - - refill_timer = periodic_timer_new(tor_libevent_get_base(), - &refill_interval, - refill_callback, - NULL); - tor_assert(refill_timer); - } - #ifdef HAVE_SYSTEMD { const int r = sd_notify(0, "READY=1"); @@ -3569,7 +3508,6 @@ tor_free_all(int postfork) smartlist_free(active_linked_connection_lst); periodic_timer_free(second_timer); teardown_periodic_events(); - periodic_timer_free(refill_timer); tor_event_free(shutdown_did_not_work_event); tor_event_free(initialize_periodic_events_event); mainloop_event_free(directory_all_unreachable_cb_event); @@ -3581,7 +3519,6 @@ tor_free_all(int postfork) memset(&global_bucket, 0, sizeof(global_bucket)); memset(&global_relayed_bucket, 0, sizeof(global_relayed_bucket)); - stats_prev_global_read_bucket = stats_prev_global_write_bucket = 0; stats_prev_n_read = stats_prev_n_written = 0; stats_n_bytes_read = stats_n_bytes_written = 0; time_of_process_start = 0; @@ -3598,7 +3535,6 @@ tor_free_all(int postfork) heartbeat_callback_first_time = 1; n_libevent_errors = 0; current_second = 0; - memset(&refill_timer_current_millisecond, 0, sizeof(struct timeval)); if (!postfork) { release_lockfile(); diff --git a/src/or/main.h b/src/or/main.h index 57ac8573a6..0e3de7d4be 100644 --- a/src/or/main.h +++ b/src/or/main.h @@ -28,6 +28,7 @@ int connection_is_on_closeable_list(connection_t *conn); MOCK_DECL(smartlist_t *, get_connection_array, (void)); MOCK_DECL(uint64_t,get_bytes_read,(void)); MOCK_DECL(uint64_t,get_bytes_written,(void)); +void stats_increment_bytes_read_and_written(uint64_t r, uint64_t w); /** Bitmask for events that we can turn on and off with * connection_watch_events. */ diff --git a/src/or/or.h b/src/or/or.h index c5a039e939..e27f25197b 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -57,6 +57,7 @@ #ifdef HAVE_TIME_H #include <time.h> #endif +#include <stdbool.h> #ifdef _WIN32 #include <winsock2.h> |