diff options
author | Andrea Shepard <andrea@torproject.org> | 2013-11-14 04:45:47 -0800 |
---|---|---|
committer | Andrea Shepard <andrea@torproject.org> | 2014-09-30 22:49:35 -0700 |
commit | 1275002a46dfb131f6db5c0fe28bc1828db327e2 (patch) | |
tree | ee0377d0be78e6ac2cb8c4d7bd445c1856b02618 /src/or/scheduler.c | |
parent | 4f567c8cc8cd68a8ca9bb93fc57d518d7eb55cf0 (diff) | |
download | tor-1275002a46dfb131f6db5c0fe28bc1828db327e2.tar.gz tor-1275002a46dfb131f6db5c0fe28bc1828db327e2.zip |
Schedule according to a queue size heuristic
Diffstat (limited to 'src/or/scheduler.c')
-rw-r--r-- | src/or/scheduler.c | 192 |
1 files changed, 166 insertions, 26 deletions
diff --git a/src/or/scheduler.c b/src/or/scheduler.c index 1c6a2fdecb..1baf3750d6 100644 --- a/src/or/scheduler.c +++ b/src/or/scheduler.c @@ -7,7 +7,10 @@ **/ #include "or.h" + +#define TOR_CHANNEL_INTERNAL_ /* For channel_flush_some_cells() */ #include "channel.h" + #include "compat_libevent.h" #include "scheduler.h" @@ -17,6 +20,9 @@ #include <event.h> #endif +#define SCHED_Q_LOW_WATER 16384 +#define SCHED_Q_HIGH_WATER (2 * SCHED_Q_LOW_WATER) + /* * Write scheduling works by keeping track of lists of channels that can * accept cells, and have cells to write. From the scheduler's perspective, @@ -118,6 +124,19 @@ static smartlist_t *channels_pending = NULL; static struct event *run_sched_ev = NULL; +/* + * Queue heuristic; this is not the queue size, but an 'effective queuesize' + * that ages out contributions from stalled channels. + */ + +static uint64_t queue_heuristic = 0; + +/* + * Timestamp for last queue heuristic update + */ + +static time_t queue_heuristic_timestamp = 0; + /* Scheduler static function declarations */ static void scheduler_evt_callback(evutil_socket_t fd, @@ -127,6 +146,8 @@ static void scheduler_retrigger(void); #if 0 static void scheduler_trigger(void); #endif +static uint64_t scheduler_get_queue_heuristic(void); +static void scheduler_update_queue_heuristic(time_t now); /* Scheduler function implementations */ @@ -281,6 +302,8 @@ scheduler_init(void) channels_waiting_for_cells = smartlist_new(); channels_waiting_to_write = smartlist_new(); channels_pending = smartlist_new(); + queue_heuristic = 0; + queue_heuristic_timestamp = approx_time(); } /** Check if there's more scheduling work */ @@ -290,7 +313,8 @@ scheduler_more_work(void) { tor_assert(channels_pending); - return (smartlist_len(channels_pending) > 0) ? 1 : 0; + return ((scheduler_get_queue_heuristic() < SCHED_Q_LOW_WATER) && + ((smartlist_len(channels_pending) > 0))) ? 1 : 0; } /** Retrigger the scheduler in a way safe to use from the callback */ @@ -324,39 +348,70 @@ void scheduler_run(void) { smartlist_t *tmp = NULL; - int n_cells; + int n_cells, n_chans_before, n_chans_after; + uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after; ssize_t flushed, flushed_this_time; log_debug(LD_SCHED, "We have a chance to run the scheduler"); - tmp = channels_pending; - channels_pending = smartlist_new(); + if (scheduler_get_queue_heuristic() < SCHED_Q_LOW_WATER) { + n_chans_before = smartlist_len(channels_pending); + q_len_before = channel_get_global_queue_estimate(); + q_heur_before = scheduler_get_queue_heuristic(); + tmp = channels_pending; + channels_pending = smartlist_new(); - /* For now, just run the old scheduler on all the chans in the list */ + /* + * For now, just run the old scheduler on all the chans in the list, until + * we hit the high-water mark. TODO real channel priority API + */ - SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) { - n_cells = channel_num_cells_writeable(chan); - if (n_cells > 0) { - log_debug(LD_SCHED, - "Scheduler saw pending channel " U64_FORMAT " at %p with " - "%d cells writeable", - U64_PRINTF_ARG(chan->global_identifier), chan, n_cells); - - flushed = 0; - while (flushed < n_cells) { - flushed_this_time = channel_flush_some_cells(chan, n_cells - flushed); - if (flushed_this_time <= 0) break; - flushed += flushed_this_time; + SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) { + if (scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER) { + n_cells = channel_num_cells_writeable(chan); + if (n_cells > 0) { + log_debug(LD_SCHED, + "Scheduler saw pending channel " U64_FORMAT " at %p with " + "%d cells writeable", + U64_PRINTF_ARG(chan->global_identifier), chan, n_cells); + + flushed = 0; + while (flushed < n_cells) { + flushed_this_time = + channel_flush_some_cells(chan, n_cells - flushed); + if (flushed_this_time <= 0) break; + flushed += flushed_this_time; + } + + log_debug(LD_SCHED, + "Scheduler flushed %d cells onto pending channel " + U64_FORMAT " at %p", + flushed, U64_PRINTF_ARG(chan->global_identifier), chan); + } else { + log_info(LD_SCHED, + "Scheduler saw pending channel " U64_FORMAT " at %p with " + "no cells writeable", + U64_PRINTF_ARG(chan->global_identifier), chan); + } + } else { + /* Not getting it this round; put it back on the list */ + smartlist_add(channels_pending, chan); } - } else { - log_info(LD_SCHED, - "Scheduler saw pending channel " U64_FORMAT " at %p with " - "no cells writeable", - U64_PRINTF_ARG(chan->global_identifier), chan); - } - } SMARTLIST_FOREACH_END(chan); + } SMARTLIST_FOREACH_END(chan); - smartlist_free(tmp); + smartlist_free(tmp); + + n_chans_after = smartlist_len(channels_pending); + q_len_after = channel_get_global_queue_estimate(); + q_heur_after = scheduler_get_queue_heuristic(); + log_debug(LD_SCHED, + "Scheduler handled %d of %d pending channels, queue size from " + U64_FORMAT " to " U64_FORMAT ", queue heuristic from " + U64_FORMAT " to " U64_FORMAT, + n_chans_before - n_chans_after, n_chans_before, + U64_PRINTF_ARG(q_len_before), U64_PRINTF_ARG(q_len_after), + U64_PRINTF_ARG(q_heur_before), U64_PRINTF_ARG(q_heur_after)); + } } /** Trigger the scheduling event so we run the scheduler later */ @@ -420,3 +475,88 @@ scheduler_channel_wants_writes(channel_t *chan) if (became_pending) scheduler_retrigger(); } +/** + * Notify the scheduler of a queue size adjustment, to recalculate the + * queue heuristic. + */ + +void +scheduler_adjust_queue_size(channel_t *chan, char dir, uint64_t adj) +{ + time_t now = approx_time(); + + log_debug(LD_SCHED, + "Queue size adjustment by %s" U64_FORMAT " for channel " + U64_FORMAT, + (dir >= 0) ? "+" : "-", + U64_PRINTF_ARG(adj), + U64_PRINTF_ARG(chan->global_identifier)); + + /* Get the queue heuristic up to date */ + scheduler_update_queue_heuristic(now); + + /* Adjust as appropriate */ + if (dir >= 0) { + /* Increasing it */ + queue_heuristic += adj; + } else { + /* Decreasing it */ + if (queue_heuristic > adj) queue_heuristic -= adj; + else queue_heuristic = 0; + } + + log_debug(LD_SCHED, + "Queue heuristic is now " U64_FORMAT, + U64_PRINTF_ARG(queue_heuristic)); +} + +/** + * Query the current value of the queue heuristic + */ + +static uint64_t +scheduler_get_queue_heuristic(void) +{ + time_t now = approx_time(); + + scheduler_update_queue_heuristic(now); + + return queue_heuristic; +} + +/** + * Adjust the queue heuristic value to the present time + */ + +static void +scheduler_update_queue_heuristic(time_t now) +{ + time_t diff; + + if (queue_heuristic_timestamp == 0) { + /* + * Nothing we can sensibly do; must not have been initted properly. + * Oh well. + */ + queue_heuristic_timestamp = now; + } else if (queue_heuristic_timestamp < now) { + diff = now - queue_heuristic_timestamp; + /* + * This is a simple exponential age-out; the other proposed alternative + * was a linear age-out using the bandwidth history in rephist.c; I'm + * going with this out of concern that if an adversary can jam the + * scheduler long enough, it would cause the bandwidth to drop to + * zero and render the aging mechanism ineffective thereafter. + */ + if (0 <= diff && diff < 64) queue_heuristic >>= diff; + else queue_heuristic = 0; + + queue_heuristic_timestamp = now; + + log_debug(LD_SCHED, + "Queue heuristic is now " U64_FORMAT, + U64_PRINTF_ARG(queue_heuristic)); + } + /* else no update needed, or time went backward */ +} + |