aboutsummaryrefslogtreecommitdiff
path: root/src/or/scheduler.c
diff options
context:
space:
mode:
authorAndrea Shepard <andrea@torproject.org>2013-12-12 04:22:53 -0800
committerAndrea Shepard <andrea@torproject.org>2014-09-30 22:49:56 -0700
commited1927d6bf8b9d60d40f6fbc20f9e1575a35e59d (patch)
tree55c8909471a659eff05fddcbfab76c50632206d2 /src/or/scheduler.c
parent3530825c53b7a28cf894b64e6d97aa90d0acccae (diff)
downloadtor-ed1927d6bf8b9d60d40f6fbc20f9e1575a35e59d.tar.gz
tor-ed1927d6bf8b9d60d40f6fbc20f9e1575a35e59d.zip
Use a non-stupid data structure in the scheduler
Diffstat (limited to 'src/or/scheduler.c')
-rw-r--r--src/or/scheduler.c190
1 files changed, 134 insertions, 56 deletions
diff --git a/src/or/scheduler.c b/src/or/scheduler.c
index c1b64dfbb6..4f12696c68 100644
--- a/src/or/scheduler.c
+++ b/src/or/scheduler.c
@@ -24,6 +24,14 @@
#define SCHED_Q_HIGH_WATER (2 * SCHED_Q_LOW_WATER)
/*
+ * Maximum cells to flush in a single call to channel_flush_some_cells();
+ * setting this low means more calls, but too high and we could overshoot
+ * SCHED_Q_HIGH_WATER.
+ */
+
+#define SCHED_MAX_FLUSH_CELLS 16
+
+/*
* Write scheduling works by keeping track of which channels can
* accept cells, and have cells to write. From the scheduler's perspective,
* a channel can be in four possible states:
@@ -100,7 +108,7 @@
* is reserved for our use.
*/
-/* List of channels that can write and have cells (pending work) */
+/* Pqueue of channels that can write and have cells (pending work) */
static smartlist_t *channels_pending = NULL;
/*
@@ -125,7 +133,7 @@ static time_t queue_heuristic_timestamp = 0;
/* Scheduler static function declarations */
-static int scheduler_compare_channels(const void **c1_v, const void **c2_v);
+static int scheduler_compare_channels(const void *c1_v, const void *c2_v);
static void scheduler_evt_callback(evutil_socket_t fd,
short events, void *arg);
static int scheduler_more_work(void);
@@ -162,7 +170,7 @@ scheduler_free_all(void)
*/
static int
-scheduler_compare_channels(const void **c1_v, const void **c2_v)
+scheduler_compare_channels(const void *c1_v, const void *c2_v)
{
channel_t *c1 = NULL, *c2 = NULL;
/* These are a workaround for -Wbad-function-cast throwing a fit */
@@ -172,8 +180,8 @@ scheduler_compare_channels(const void **c1_v, const void **c2_v)
tor_assert(c1_v);
tor_assert(c2_v);
- c1 = (channel_t *)(*c1_v);
- c2 = (channel_t *)(*c2_v);
+ c1 = (channel_t *)(c1_v);
+ c2 = (channel_t *)(c2_v);
tor_assert(c1);
tor_assert(c2);
@@ -241,7 +249,10 @@ scheduler_channel_doesnt_want_writes(channel_t *chan)
* the other lists. It can't write any more, so it goes to
* channels_waiting_to_write.
*/
- smartlist_remove(channels_pending, chan);
+ smartlist_pqueue_remove(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx),
+ chan);
chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p went from pending "
@@ -280,7 +291,10 @@ scheduler_channel_has_waiting_cells(channel_t *chan)
* channels_pending.
*/
chan->scheduler_state = SCHED_CHAN_PENDING;
- smartlist_add(channels_pending, chan);
+ smartlist_pqueue_add(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx),
+ chan);
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p went from waiting_for_cells "
"to pending",
@@ -353,7 +367,10 @@ scheduler_release_channel(channel_t *chan)
tor_assert(channels_pending);
if (chan->scheduler_state == SCHED_CHAN_PENDING) {
- smartlist_remove(channels_pending, chan);
+ smartlist_pqueue_remove(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx),
+ chan);
}
chan->scheduler_state = SCHED_CHAN_IDLE;
@@ -364,10 +381,11 @@ scheduler_release_channel(channel_t *chan)
void
scheduler_run(void)
{
- smartlist_t *tmp = NULL;
int n_cells, n_chans_before, n_chans_after;
uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after;
ssize_t flushed, flushed_this_time;
+ smartlist_t *to_readd = NULL;
+ channel_t *chan = NULL;
log_debug(LD_SCHED, "We have a chance to run the scheduler");
@@ -375,61 +393,118 @@ scheduler_run(void)
n_chans_before = smartlist_len(channels_pending);
q_len_before = channel_get_global_queue_estimate();
q_heur_before = scheduler_get_queue_heuristic();
- tmp = channels_pending;
- channels_pending = smartlist_new();
-
- /*
- * UGLY HACK: sort the list on each invocation
- *
- * TODO smarter data structures
- */
- smartlist_sort(tmp, scheduler_compare_channels);
-
- SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) {
- if (scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER) {
- n_cells = channel_num_cells_writeable(chan);
- if (n_cells > 0) {
- log_debug(LD_SCHED,
- "Scheduler saw pending channel " U64_FORMAT " at %p with "
- "%d cells writeable",
- U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
-
- flushed = 0;
- while (flushed < n_cells) {
- flushed_this_time =
- channel_flush_some_cells(chan, n_cells - flushed);
- if (flushed_this_time <= 0) break;
- flushed += flushed_this_time;
- }
- if (flushed < n_cells) {
- /* We ran out of cells to flush */
- chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
- } else {
- /* TODO get this right */
- }
+ while (scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER &&
+ smartlist_len(channels_pending) > 0) {
+ /* Pop off a channel */
+ chan = smartlist_pqueue_pop(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx));
+ tor_assert(chan);
+
+ /* Figure out how many cells we can write */
+ n_cells = channel_num_cells_writeable(chan);
+ if (n_cells > 0) {
+ log_debug(LD_SCHED,
+ "Scheduler saw pending channel " U64_FORMAT " at %p with "
+ "%d cells writeable",
+ U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
+
+ flushed = 0;
+ while (flushed < n_cells &&
+ scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER) {
+ flushed_this_time =
+ channel_flush_some_cells(chan,
+ MIN(SCHED_MAX_FLUSH_CELLS,
+ n_cells - flushed));
+ if (flushed_this_time <= 0) break;
+ flushed += flushed_this_time;
+ }
+ if (flushed < n_cells) {
+ /* We ran out of cells to flush */
+ chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
log_debug(LD_SCHED,
- "Scheduler flushed %d cells onto pending channel "
- U64_FORMAT " at %p",
- (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
+ "Channel " U64_FORMAT " at %p "
+ "entered waiting_for_cells from pending",
+ U64_PRINTF_ARG(chan->global_identifier),
chan);
} else {
- log_info(LD_SCHED,
- "Scheduler saw pending channel " U64_FORMAT " at %p with "
- "no cells writeable",
- U64_PRINTF_ARG(chan->global_identifier), chan);
- /* Put it back to WAITING_TO_WRITE */
- chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+ /* The channel may still have some cells */
+ if (channel_more_to_flush(chan)) {
+ /* The channel goes to either pending or waiting_to_write */
+ if (channel_num_cells_writeable(chan) > 0) {
+ /* Add it back to pending later */
+ if (!to_readd) to_readd = smartlist_new();
+ smartlist_add(to_readd, chan);
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "is still pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ } else {
+ /* It's waiting to be able to write more */
+ chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "entered waiting_to_write from pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ }
+ } else {
+ /* No cells left; it can go to idle or waiting_for_cells */
+ if (channel_num_cells_writeable(chan) > 0) {
+ /*
+ * It can still accept writes, so it goes to
+ * waiting_for_cells
+ */
+ chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "entered waiting_for_cells from pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ } else {
+ /*
+ * We exactly filled up the output queue with all available
+ * cells; go to idle.
+ */
+ chan->scheduler_state = SCHED_CHAN_IDLE;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "become idle from pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ }
+ }
}
+
+ log_debug(LD_SCHED,
+ "Scheduler flushed %d cells onto pending channel "
+ U64_FORMAT " at %p",
+ (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
+ chan);
} else {
- /* Not getting it this round; put it back on the list */
- smartlist_add(channels_pending, chan);
- /* It states in SCHED_CHAN_PENDING */
+ log_info(LD_SCHED,
+ "Scheduler saw pending channel " U64_FORMAT " at %p with "
+ "no cells writeable",
+ U64_PRINTF_ARG(chan->global_identifier), chan);
+ /* Put it back to WAITING_TO_WRITE */
+ chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
}
- } SMARTLIST_FOREACH_END(chan);
+ }
- smartlist_free(tmp);
+ /* Readd any channels we need to */
+ if (to_readd) {
+ SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, chan) {
+ chan->scheduler_state = SCHED_CHAN_PENDING;
+ smartlist_pqueue_add(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx),
+ chan);
+ } SMARTLIST_FOREACH_END(chan);
+ smartlist_free(to_readd);
+ }
n_chans_after = smartlist_len(channels_pending);
q_len_after = channel_get_global_queue_estimate();
@@ -473,7 +548,10 @@ scheduler_channel_wants_writes(channel_t *chan)
/*
* It can write now, so it goes to channels_pending.
*/
- smartlist_add(channels_pending, chan);
+ smartlist_pqueue_add(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx),
+ chan);
chan->scheduler_state = SCHED_CHAN_PENDING;
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p went from waiting_to_write "