aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAndrea Shepard <andrea@torproject.org>2013-11-14 04:45:47 -0800
committerAndrea Shepard <andrea@torproject.org>2014-09-30 22:49:35 -0700
commit1275002a46dfb131f6db5c0fe28bc1828db327e2 (patch)
treeee0377d0be78e6ac2cb8c4d7bd445c1856b02618 /src
parent4f567c8cc8cd68a8ca9bb93fc57d518d7eb55cf0 (diff)
downloadtor-1275002a46dfb131f6db5c0fe28bc1828db327e2.tar.gz
tor-1275002a46dfb131f6db5c0fe28bc1828db327e2.zip
Schedule according to a queue size heuristic
Diffstat (limited to 'src')
-rw-r--r--src/or/channel.c4
-rw-r--r--src/or/scheduler.c192
-rw-r--r--src/or/scheduler.h3
3 files changed, 173 insertions, 26 deletions
diff --git a/src/or/channel.c b/src/or/channel.c
index dddd7ab1f1..7ed38945ba 100644
--- a/src/or/channel.c
+++ b/src/or/channel.c
@@ -4563,6 +4563,8 @@ channel_update_xmit_queue_size(channel_t *chan)
U64_FORMAT ", new size is " U64_FORMAT,
U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(estimated_total_queue_size));
+ /* Tell the scheduler we're increasing the queue size */
+ scheduler_adjust_queue_size(chan, 1, adj);
}
} else if (queued < chan->bytes_queued_for_xmit) {
adj = chan->bytes_queued_for_xmit - queued;
@@ -4585,6 +4587,8 @@ channel_update_xmit_queue_size(channel_t *chan)
U64_FORMAT ", new size is " U64_FORMAT,
U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(estimated_total_queue_size));
+ /* Tell the scheduler we're decreasing the queue size */
+ scheduler_adjust_queue_size(chan, -1, adj);
}
}
}
diff --git a/src/or/scheduler.c b/src/or/scheduler.c
index 1c6a2fdecb..1baf3750d6 100644
--- a/src/or/scheduler.c
+++ b/src/or/scheduler.c
@@ -7,7 +7,10 @@
**/
#include "or.h"
+
+#define TOR_CHANNEL_INTERNAL_ /* For channel_flush_some_cells() */
#include "channel.h"
+
#include "compat_libevent.h"
#include "scheduler.h"
@@ -17,6 +20,9 @@
#include <event.h>
#endif
+#define SCHED_Q_LOW_WATER 16384
+#define SCHED_Q_HIGH_WATER (2 * SCHED_Q_LOW_WATER)
+
/*
* Write scheduling works by keeping track of lists of channels that can
* accept cells, and have cells to write. From the scheduler's perspective,
@@ -118,6 +124,19 @@ static smartlist_t *channels_pending = NULL;
static struct event *run_sched_ev = NULL;
+/*
+ * Queue heuristic; this is not the queue size, but an 'effective queuesize'
+ * that ages out contributions from stalled channels.
+ */
+
+static uint64_t queue_heuristic = 0;
+
+/*
+ * Timestamp for last queue heuristic update
+ */
+
+static time_t queue_heuristic_timestamp = 0;
+
/* Scheduler static function declarations */
static void scheduler_evt_callback(evutil_socket_t fd,
@@ -127,6 +146,8 @@ static void scheduler_retrigger(void);
#if 0
static void scheduler_trigger(void);
#endif
+static uint64_t scheduler_get_queue_heuristic(void);
+static void scheduler_update_queue_heuristic(time_t now);
/* Scheduler function implementations */
@@ -281,6 +302,8 @@ scheduler_init(void)
channels_waiting_for_cells = smartlist_new();
channels_waiting_to_write = smartlist_new();
channels_pending = smartlist_new();
+ queue_heuristic = 0;
+ queue_heuristic_timestamp = approx_time();
}
/** Check if there's more scheduling work */
@@ -290,7 +313,8 @@ scheduler_more_work(void)
{
tor_assert(channels_pending);
- return (smartlist_len(channels_pending) > 0) ? 1 : 0;
+ return ((scheduler_get_queue_heuristic() < SCHED_Q_LOW_WATER) &&
+ ((smartlist_len(channels_pending) > 0))) ? 1 : 0;
}
/** Retrigger the scheduler in a way safe to use from the callback */
@@ -324,39 +348,70 @@ void
scheduler_run(void)
{
smartlist_t *tmp = NULL;
- int n_cells;
+ int n_cells, n_chans_before, n_chans_after;
+ uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after;
ssize_t flushed, flushed_this_time;
log_debug(LD_SCHED, "We have a chance to run the scheduler");
- tmp = channels_pending;
- channels_pending = smartlist_new();
+ if (scheduler_get_queue_heuristic() < SCHED_Q_LOW_WATER) {
+ n_chans_before = smartlist_len(channels_pending);
+ q_len_before = channel_get_global_queue_estimate();
+ q_heur_before = scheduler_get_queue_heuristic();
+ tmp = channels_pending;
+ channels_pending = smartlist_new();
- /* For now, just run the old scheduler on all the chans in the list */
+ /*
+ * For now, just run the old scheduler on all the chans in the list, until
+ * we hit the high-water mark. TODO real channel priority API
+ */
- SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) {
- n_cells = channel_num_cells_writeable(chan);
- if (n_cells > 0) {
- log_debug(LD_SCHED,
- "Scheduler saw pending channel " U64_FORMAT " at %p with "
- "%d cells writeable",
- U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
-
- flushed = 0;
- while (flushed < n_cells) {
- flushed_this_time = channel_flush_some_cells(chan, n_cells - flushed);
- if (flushed_this_time <= 0) break;
- flushed += flushed_this_time;
+ SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) {
+ if (scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER) {
+ n_cells = channel_num_cells_writeable(chan);
+ if (n_cells > 0) {
+ log_debug(LD_SCHED,
+ "Scheduler saw pending channel " U64_FORMAT " at %p with "
+ "%d cells writeable",
+ U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
+
+ flushed = 0;
+ while (flushed < n_cells) {
+ flushed_this_time =
+ channel_flush_some_cells(chan, n_cells - flushed);
+ if (flushed_this_time <= 0) break;
+ flushed += flushed_this_time;
+ }
+
+ log_debug(LD_SCHED,
+ "Scheduler flushed %d cells onto pending channel "
+ U64_FORMAT " at %p",
+ flushed, U64_PRINTF_ARG(chan->global_identifier), chan);
+ } else {
+ log_info(LD_SCHED,
+ "Scheduler saw pending channel " U64_FORMAT " at %p with "
+ "no cells writeable",
+ U64_PRINTF_ARG(chan->global_identifier), chan);
+ }
+ } else {
+ /* Not getting it this round; put it back on the list */
+ smartlist_add(channels_pending, chan);
}
- } else {
- log_info(LD_SCHED,
- "Scheduler saw pending channel " U64_FORMAT " at %p with "
- "no cells writeable",
- U64_PRINTF_ARG(chan->global_identifier), chan);
- }
- } SMARTLIST_FOREACH_END(chan);
+ } SMARTLIST_FOREACH_END(chan);
- smartlist_free(tmp);
+ smartlist_free(tmp);
+
+ n_chans_after = smartlist_len(channels_pending);
+ q_len_after = channel_get_global_queue_estimate();
+ q_heur_after = scheduler_get_queue_heuristic();
+ log_debug(LD_SCHED,
+ "Scheduler handled %d of %d pending channels, queue size from "
+ U64_FORMAT " to " U64_FORMAT ", queue heuristic from "
+ U64_FORMAT " to " U64_FORMAT,
+ n_chans_before - n_chans_after, n_chans_before,
+ U64_PRINTF_ARG(q_len_before), U64_PRINTF_ARG(q_len_after),
+ U64_PRINTF_ARG(q_heur_before), U64_PRINTF_ARG(q_heur_after));
+ }
}
/** Trigger the scheduling event so we run the scheduler later */
@@ -420,3 +475,88 @@ scheduler_channel_wants_writes(channel_t *chan)
if (became_pending) scheduler_retrigger();
}
+/**
+ * Notify the scheduler of a queue size adjustment, to recalculate the
+ * queue heuristic.
+ */
+
+void
+scheduler_adjust_queue_size(channel_t *chan, char dir, uint64_t adj)
+{
+ time_t now = approx_time();
+
+ log_debug(LD_SCHED,
+ "Queue size adjustment by %s" U64_FORMAT " for channel "
+ U64_FORMAT,
+ (dir >= 0) ? "+" : "-",
+ U64_PRINTF_ARG(adj),
+ U64_PRINTF_ARG(chan->global_identifier));
+
+ /* Get the queue heuristic up to date */
+ scheduler_update_queue_heuristic(now);
+
+ /* Adjust as appropriate */
+ if (dir >= 0) {
+ /* Increasing it */
+ queue_heuristic += adj;
+ } else {
+ /* Decreasing it */
+ if (queue_heuristic > adj) queue_heuristic -= adj;
+ else queue_heuristic = 0;
+ }
+
+ log_debug(LD_SCHED,
+ "Queue heuristic is now " U64_FORMAT,
+ U64_PRINTF_ARG(queue_heuristic));
+}
+
+/**
+ * Query the current value of the queue heuristic
+ */
+
+static uint64_t
+scheduler_get_queue_heuristic(void)
+{
+ time_t now = approx_time();
+
+ scheduler_update_queue_heuristic(now);
+
+ return queue_heuristic;
+}
+
+/**
+ * Adjust the queue heuristic value to the present time
+ */
+
+static void
+scheduler_update_queue_heuristic(time_t now)
+{
+ time_t diff;
+
+ if (queue_heuristic_timestamp == 0) {
+ /*
+ * Nothing we can sensibly do; must not have been initted properly.
+ * Oh well.
+ */
+ queue_heuristic_timestamp = now;
+ } else if (queue_heuristic_timestamp < now) {
+ diff = now - queue_heuristic_timestamp;
+ /*
+ * This is a simple exponential age-out; the other proposed alternative
+ * was a linear age-out using the bandwidth history in rephist.c; I'm
+ * going with this out of concern that if an adversary can jam the
+ * scheduler long enough, it would cause the bandwidth to drop to
+ * zero and render the aging mechanism ineffective thereafter.
+ */
+ if (0 <= diff && diff < 64) queue_heuristic >>= diff;
+ else queue_heuristic = 0;
+
+ queue_heuristic_timestamp = now;
+
+ log_debug(LD_SCHED,
+ "Queue heuristic is now " U64_FORMAT,
+ U64_PRINTF_ARG(queue_heuristic));
+ }
+ /* else no update needed, or time went backward */
+}
+
diff --git a/src/or/scheduler.h b/src/or/scheduler.h
index b25e36e902..8fe59cb0b3 100644
--- a/src/or/scheduler.h
+++ b/src/or/scheduler.h
@@ -27,5 +27,8 @@ void scheduler_channel_wants_writes(channel_t *chan);
/* Notify the scheduler of a channel being closed */
void scheduler_release_channel(channel_t *chan);
+/* Notify scheduler of queue size adjustments */
+void scheduler_adjust_queue_size(channel_t *chan, char dir, uint64_t adj);
+
#endif /* !defined(TOR_SCHEDULER_H) */