summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoger Dingledine <arma@torproject.org>2004-03-14 16:00:52 +0000
committerRoger Dingledine <arma@torproject.org>2004-03-14 16:00:52 +0000
commit703b2d3cf8d003084cdd11be22683d712db8e5b0 (patch)
tree961ca155f3ecbfb3e9e6b31f20bef962aedebddf
parent3ccd545c9a2916f9b03fe2f5e44afe49f424f6a0 (diff)
downloadtor-703b2d3cf8d003084cdd11be22683d712db8e5b0.tar.gz
tor-703b2d3cf8d003084cdd11be22683d712db8e5b0.zip
refactor bandwidth-control token buckets
this is a checkpoint commit; there still remain some bugs, er, somewhere. svn:r1269
-rw-r--r--src/or/connection.c167
-rw-r--r--src/or/main.c36
-rw-r--r--src/or/or.h8
3 files changed, 129 insertions, 82 deletions
diff --git a/src/or/connection.c b/src/or/connection.c
index b11d82dbcd..56643fd972 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -8,8 +8,6 @@
extern or_options_t options; /* command-line and config-file options */
-extern int global_read_bucket;
-
char *conn_type_to_string[] = {
"", /* 0 */
"OP listener", /* 1 */
@@ -72,6 +70,7 @@ char *conn_state_to_string[][_CONN_TYPE_MAX+1] = {
static int connection_init_accepted_conn(connection_t *conn);
static int connection_handle_listener_read(connection_t *conn, int new_type);
+static int connection_receiver_bucket_should_increase(connection_t *conn);
/**************************************************************/
@@ -444,6 +443,121 @@ int retry_all_connections(void) {
return 0;
}
+extern int global_read_bucket;
+
+/* how many bytes at most can we read onto this connection? */
+int connection_bucket_read_limit(connection_t *conn) {
+ int at_most;
+
+ if(options.LinkPadding) {
+ at_most = global_read_bucket;
+ } else {
+ /* do a rudimentary round-robin so one circuit can't hog a connection */
+ if(connection_speaks_cells(conn)) {
+ at_most = 32*(CELL_NETWORK_SIZE);
+ } else {
+ at_most = 32*(RELAY_PAYLOAD_SIZE);
+ }
+
+ if(at_most > global_read_bucket)
+ at_most = global_read_bucket;
+ }
+
+ if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN)
+ if(at_most > conn->receiver_bucket)
+ at_most = conn->receiver_bucket;
+
+ return at_most;
+}
+
+/* we just read num_read onto conn. Decrement buckets appropriately. */
+void connection_bucket_decrement(connection_t *conn, int num_read) {
+ global_read_bucket -= num_read; assert(global_read_bucket >= 0);
+ if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
+ conn->receiver_bucket -= num_read; assert(conn->receiver_bucket >= 0);
+ }
+ if(global_read_bucket == 0) {
+ log_fn(LOG_DEBUG,"global bucket exhausted. Pausing.");
+ conn->wants_to_read = 1;
+ connection_stop_reading(conn);
+ return;
+ }
+ if(connection_speaks_cells(conn) &&
+ conn->state == OR_CONN_STATE_OPEN &&
+ conn->receiver_bucket == 0) {
+ log_fn(LOG_DEBUG,"receiver bucket exhausted. Pausing.");
+ conn->wants_to_read = 1;
+ connection_stop_reading(conn);
+ }
+}
+
+/* keep a timeval to know when time has passed enough to refill buckets */
+static struct timeval current_time;
+
+void connection_bucket_init() {
+ tor_gettimeofday(&current_time);
+ global_read_bucket = options.BandwidthBurst; /* start it at max traffic */
+}
+
+/* some time has passed; increment buckets appropriately. */
+void connection_bucket_refill(struct timeval *now) {
+ int i, n;
+ connection_t *conn;
+ connection_t **carray;
+
+ if(now->tv_sec <= current_time.tv_sec)
+ return; /* wait until the second has rolled over */
+
+ current_time.tv_sec = now->tv_sec; /* update current_time */
+ /* (ignore usecs for now) */
+
+ /* refill the global bucket */
+ if(global_read_bucket < options.BandwidthBurst) {
+ global_read_bucket += options.BandwidthRate;
+ log_fn(LOG_DEBUG,"global_read_bucket now %d.", global_read_bucket);
+ }
+
+ /* refill the per-connection buckets */
+ get_connection_array(&carray,&n);
+ for(i=0;i<n;i++) {
+ conn = carray[i];
+
+ if(connection_receiver_bucket_should_increase(conn)) {
+ conn->receiver_bucket += conn->bandwidth;
+ //log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i, conn->receiver_bucket);
+ }
+
+ if(conn->wants_to_read == 1 /* it's marked to turn reading back on now */
+ && global_read_bucket > 0 /* and we're allowed to read */
+ && (!connection_speaks_cells(conn) ||
+ conn->state != OR_CONN_STATE_OPEN ||
+ conn->receiver_bucket > 0)) {
+ /* and either a non-cell conn or a cell conn with non-empty bucket */
+ conn->wants_to_read = 0;
+ connection_start_reading(conn);
+ if(conn->wants_to_write == 1) {
+ conn->wants_to_write = 0;
+ connection_start_writing(conn);
+ }
+ }
+ }
+}
+
+static int connection_receiver_bucket_should_increase(connection_t *conn) {
+ assert(conn);
+
+ if(!connection_speaks_cells(conn))
+ return 0; /* edge connections don't use receiver_buckets */
+ if(conn->state != OR_CONN_STATE_OPEN)
+ return 0; /* only open connections play the rate limiting game */
+
+ assert(conn->bandwidth > 0);
+ if(conn->receiver_bucket > 9*conn->bandwidth)
+ return 0;
+
+ return 1;
+}
+
int connection_handle_read(connection_t *conn) {
conn->timestamp_lastread = time(NULL);
@@ -482,27 +596,14 @@ int connection_read_to_buf(connection_t *conn) {
int result;
int at_most;
- if(options.LinkPadding) {
- at_most = global_read_bucket;
- } else {
- /* do a rudimentary round-robin so one connection can't hog a thickpipe */
- if(connection_speaks_cells(conn)) {
- at_most = 32*(CELL_NETWORK_SIZE);
- } else {
- at_most = 32*(RELAY_PAYLOAD_SIZE);
- }
-
- if(at_most > global_read_bucket)
- at_most = global_read_bucket;
- }
+ /* how many bytes are we allowed to read? */
+ at_most = connection_bucket_read_limit(conn);
if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING) {
if(conn->state == OR_CONN_STATE_HANDSHAKING)
return connection_tls_continue_handshake(conn);
/* else open, or closing */
- if(at_most > conn->receiver_bucket)
- at_most = conn->receiver_bucket;
result = read_to_buf_tls(conn->tls, at_most, conn->inbuf);
switch(result) {
@@ -527,22 +628,7 @@ int connection_read_to_buf(connection_t *conn) {
return -1;
}
- global_read_bucket -= result; assert(global_read_bucket >= 0);
- if(global_read_bucket == 0) {
- log_fn(LOG_DEBUG,"global bucket exhausted. Pausing.");
- conn->wants_to_read = 1;
- connection_stop_reading(conn);
- return 0;
- }
- if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
- conn->receiver_bucket -= result; assert(conn->receiver_bucket >= 0);
- if(conn->receiver_bucket == 0) {
- log_fn(LOG_DEBUG,"receiver bucket exhausted. Pausing.");
- conn->wants_to_read = 1;
- connection_stop_reading(conn);
- return 0;
- }
- }
+ connection_bucket_decrement(conn, result);
return 0;
}
@@ -754,21 +840,6 @@ connection_t *connection_get_by_type_state_lastwritten(int type, int state) {
return best;
}
-int connection_receiver_bucket_should_increase(connection_t *conn) {
- assert(conn);
-
- if(!connection_speaks_cells(conn))
- return 0; /* edge connections don't use receiver_buckets */
- if(conn->state != OR_CONN_STATE_OPEN)
- return 0; /* only open connections play the rate limiting game */
-
- assert(conn->bandwidth > 0);
- if(conn->receiver_bucket > 9*conn->bandwidth)
- return 0;
-
- return 1;
-}
-
int connection_is_listener(connection_t *conn) {
if(conn->type == CONN_TYPE_OR_LISTENER ||
conn->type == CONN_TYPE_AP_LISTENER ||
diff --git a/src/or/main.c b/src/or/main.c
index 52c51a2f17..e3c5a5a8c6 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -11,7 +11,6 @@ static int init_from_config(int argc, char **argv);
/********* START VARIABLES **********/
-extern char *conn_type_to_string[];
extern char *conn_state_to_string[][_CONN_TYPE_MAX+1];
or_options_t options; /* command-line and config-file options */
@@ -281,23 +280,6 @@ static void run_connection_housekeeping(int i, time_t now) {
cell_t cell;
connection_t *conn = connection_array[i];
- if(connection_receiver_bucket_should_increase(conn)) {
- conn->receiver_bucket += conn->bandwidth;
- // log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i, conn->receiver_bucket);
- }
-
- if(conn->wants_to_read == 1 /* it's marked to turn reading back on now */
- && global_read_bucket > 0 /* and we're allowed to read */
- && (!connection_speaks_cells(conn) || conn->receiver_bucket > 0)) {
- /* and either a non-cell conn or a cell conn with non-empty bucket */
- conn->wants_to_read = 0;
- connection_start_reading(conn);
- if(conn->wants_to_write == 1) {
- conn->wants_to_write = 0;
- connection_start_writing(conn);
- }
- }
-
/* check connections to see whether we should send a keepalive, expire, or wait */
if(!connection_speaks_cells(conn))
return;
@@ -397,16 +379,6 @@ static void run_scheduled_events(time_t now) {
}
}
- /* 4. Every second, we check how much bandwidth we've consumed and
- * increment global_read_bucket.
- */
- stats_n_bytes_read += stats_prev_global_read_bucket-global_read_bucket;
- if(global_read_bucket < options.BandwidthBurst) {
- global_read_bucket += options.BandwidthRate;
- log_fn(LOG_DEBUG,"global_read_bucket now %d.", global_read_bucket);
- }
- stats_prev_global_read_bucket = global_read_bucket;
-
/* 5. We do housekeeping for each connection... */
for(i=0;i<nfds;i++) {
run_connection_housekeeping(i, now);
@@ -433,6 +405,12 @@ static int prepare_for_poll(void) {
tor_gettimeofday(&now);
+ /* Check how much bandwidth we've consumed,
+ * and increment the token buckets. */
+ stats_n_bytes_read += stats_prev_global_read_bucket-global_read_bucket;
+ connection_bucket_refill(&now);
+ stats_prev_global_read_bucket = global_read_bucket;
+
if(now.tv_sec > current_second) { /* the second has rolled over. check more stuff. */
++stats_n_seconds_reading;
@@ -486,7 +464,7 @@ static int init_from_config(int argc, char **argv) {
log_fn(LOG_DEBUG, "Successfully opened DebugLogFile '%s'.", options.DebugLogFile);
}
- global_read_bucket = options.BandwidthBurst; /* start it at max traffic */
+ connection_bucket_init();
stats_prev_global_read_bucket = global_read_bucket;
if(options.RunAsDaemon) {
diff --git a/src/or/or.h b/src/or/or.h
index 5fd5b9222f..e25a0692c3 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -664,8 +664,8 @@ int getconfig(int argc, char **argv, or_options_t *options);
/********************************* connection.c ***************************/
-#define CONN_TYPE_TO_STRING(t) (((t) < _CONN_TYPE_MIN || (t) > _CONN_TYPE_MAX) ? "Unknown" : \
- conn_type_to_string[(t)])
+#define CONN_TYPE_TO_STRING(t) (((t) < _CONN_TYPE_MIN || (t) > _CONN_TYPE_MAX) ? \
+ "Unknown" : conn_type_to_string[(t)])
extern char *conn_type_to_string[];
@@ -711,13 +711,11 @@ connection_t *connection_get_by_type(int type);
connection_t *connection_get_by_type_state(int type, int state);
connection_t *connection_get_by_type_state_lastwritten(int type, int state);
-int connection_receiver_bucket_should_increase(connection_t *conn);
-
#define connection_speaks_cells(conn) ((conn)->type == CONN_TYPE_OR)
#define connection_has_pending_tls_data(conn) \
((conn)->type == CONN_TYPE_OR && \
(conn)->state == OR_CONN_STATE_OPEN && \
- tor_tls_get_pending_bytes(conn->tls))
+ tor_tls_get_pending_bytes((conn)->tls))
int connection_is_listener(connection_t *conn);
int connection_state_is_open(connection_t *conn);