diff options
author | Andrea Shepard <andrea@torproject.org> | 2013-12-12 04:22:53 -0800 |
---|---|---|
committer | Andrea Shepard <andrea@torproject.org> | 2014-09-30 22:49:56 -0700 |
commit | ed1927d6bf8b9d60d40f6fbc20f9e1575a35e59d (patch) | |
tree | 55c8909471a659eff05fddcbfab76c50632206d2 | |
parent | 3530825c53b7a28cf894b64e6d97aa90d0acccae (diff) | |
download | tor-ed1927d6bf8b9d60d40f6fbc20f9e1575a35e59d.tar.gz tor-ed1927d6bf8b9d60d40f6fbc20f9e1575a35e59d.zip |
Use a non-stupid data structure in the scheduler
-rw-r--r-- | src/or/channel.h | 3 | ||||
-rw-r--r-- | src/or/scheduler.c | 190 |
2 files changed, 137 insertions, 56 deletions
diff --git a/src/or/channel.h b/src/or/channel.h index ced717a531..023c39d0dd 100644 --- a/src/or/channel.h +++ b/src/or/channel.h @@ -80,6 +80,9 @@ struct channel_s { SCHED_CHAN_PENDING } scheduler_state; + /** Heap index for use by the scheduler */ + int sched_heap_idx; + /** Timestamps for both cell channels and listeners */ time_t timestamp_created; /* Channel created */ time_t timestamp_active; /* Any activity */ diff --git a/src/or/scheduler.c b/src/or/scheduler.c index c1b64dfbb6..4f12696c68 100644 --- a/src/or/scheduler.c +++ b/src/or/scheduler.c @@ -24,6 +24,14 @@ #define SCHED_Q_HIGH_WATER (2 * SCHED_Q_LOW_WATER) /* + * Maximum cells to flush in a single call to channel_flush_some_cells(); + * setting this low means more calls, but too high and we could overshoot + * SCHED_Q_HIGH_WATER. + */ + +#define SCHED_MAX_FLUSH_CELLS 16 + +/* * Write scheduling works by keeping track of which channels can * accept cells, and have cells to write. From the scheduler's perspective, * a channel can be in four possible states: @@ -100,7 +108,7 @@ * is reserved for our use. */ -/* List of channels that can write and have cells (pending work) */ +/* Pqueue of channels that can write and have cells (pending work) */ static smartlist_t *channels_pending = NULL; /* @@ -125,7 +133,7 @@ static time_t queue_heuristic_timestamp = 0; /* Scheduler static function declarations */ -static int scheduler_compare_channels(const void **c1_v, const void **c2_v); +static int scheduler_compare_channels(const void *c1_v, const void *c2_v); static void scheduler_evt_callback(evutil_socket_t fd, short events, void *arg); static int scheduler_more_work(void); @@ -162,7 +170,7 @@ scheduler_free_all(void) */ static int -scheduler_compare_channels(const void **c1_v, const void **c2_v) +scheduler_compare_channels(const void *c1_v, const void *c2_v) { channel_t *c1 = NULL, *c2 = NULL; /* These are a workaround for -Wbad-function-cast throwing a fit */ @@ -172,8 +180,8 @@ scheduler_compare_channels(const void **c1_v, const void **c2_v) tor_assert(c1_v); tor_assert(c2_v); - c1 = (channel_t *)(*c1_v); - c2 = (channel_t *)(*c2_v); + c1 = (channel_t *)(c1_v); + c2 = (channel_t *)(c2_v); tor_assert(c1); tor_assert(c2); @@ -241,7 +249,10 @@ scheduler_channel_doesnt_want_writes(channel_t *chan) * the other lists. It can't write any more, so it goes to * channels_waiting_to_write. */ - smartlist_remove(channels_pending, chan); + smartlist_pqueue_remove(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx), + chan); chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; log_debug(LD_SCHED, "Channel " U64_FORMAT " at %p went from pending " @@ -280,7 +291,10 @@ scheduler_channel_has_waiting_cells(channel_t *chan) * channels_pending. */ chan->scheduler_state = SCHED_CHAN_PENDING; - smartlist_add(channels_pending, chan); + smartlist_pqueue_add(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx), + chan); log_debug(LD_SCHED, "Channel " U64_FORMAT " at %p went from waiting_for_cells " "to pending", @@ -353,7 +367,10 @@ scheduler_release_channel(channel_t *chan) tor_assert(channels_pending); if (chan->scheduler_state == SCHED_CHAN_PENDING) { - smartlist_remove(channels_pending, chan); + smartlist_pqueue_remove(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx), + chan); } chan->scheduler_state = SCHED_CHAN_IDLE; @@ -364,10 +381,11 @@ scheduler_release_channel(channel_t *chan) void scheduler_run(void) { - smartlist_t *tmp = NULL; int n_cells, n_chans_before, n_chans_after; uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after; ssize_t flushed, flushed_this_time; + smartlist_t *to_readd = NULL; + channel_t *chan = NULL; log_debug(LD_SCHED, "We have a chance to run the scheduler"); @@ -375,61 +393,118 @@ scheduler_run(void) n_chans_before = smartlist_len(channels_pending); q_len_before = channel_get_global_queue_estimate(); q_heur_before = scheduler_get_queue_heuristic(); - tmp = channels_pending; - channels_pending = smartlist_new(); - - /* - * UGLY HACK: sort the list on each invocation - * - * TODO smarter data structures - */ - smartlist_sort(tmp, scheduler_compare_channels); - - SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) { - if (scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER) { - n_cells = channel_num_cells_writeable(chan); - if (n_cells > 0) { - log_debug(LD_SCHED, - "Scheduler saw pending channel " U64_FORMAT " at %p with " - "%d cells writeable", - U64_PRINTF_ARG(chan->global_identifier), chan, n_cells); - - flushed = 0; - while (flushed < n_cells) { - flushed_this_time = - channel_flush_some_cells(chan, n_cells - flushed); - if (flushed_this_time <= 0) break; - flushed += flushed_this_time; - } - if (flushed < n_cells) { - /* We ran out of cells to flush */ - chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; - } else { - /* TODO get this right */ - } + while (scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER && + smartlist_len(channels_pending) > 0) { + /* Pop off a channel */ + chan = smartlist_pqueue_pop(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx)); + tor_assert(chan); + + /* Figure out how many cells we can write */ + n_cells = channel_num_cells_writeable(chan); + if (n_cells > 0) { + log_debug(LD_SCHED, + "Scheduler saw pending channel " U64_FORMAT " at %p with " + "%d cells writeable", + U64_PRINTF_ARG(chan->global_identifier), chan, n_cells); + + flushed = 0; + while (flushed < n_cells && + scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER) { + flushed_this_time = + channel_flush_some_cells(chan, + MIN(SCHED_MAX_FLUSH_CELLS, + n_cells - flushed)); + if (flushed_this_time <= 0) break; + flushed += flushed_this_time; + } + if (flushed < n_cells) { + /* We ran out of cells to flush */ + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; log_debug(LD_SCHED, - "Scheduler flushed %d cells onto pending channel " - U64_FORMAT " at %p", - (int)flushed, U64_PRINTF_ARG(chan->global_identifier), + "Channel " U64_FORMAT " at %p " + "entered waiting_for_cells from pending", + U64_PRINTF_ARG(chan->global_identifier), chan); } else { - log_info(LD_SCHED, - "Scheduler saw pending channel " U64_FORMAT " at %p with " - "no cells writeable", - U64_PRINTF_ARG(chan->global_identifier), chan); - /* Put it back to WAITING_TO_WRITE */ - chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; + /* The channel may still have some cells */ + if (channel_more_to_flush(chan)) { + /* The channel goes to either pending or waiting_to_write */ + if (channel_num_cells_writeable(chan) > 0) { + /* Add it back to pending later */ + if (!to_readd) to_readd = smartlist_new(); + smartlist_add(to_readd, chan); + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "is still pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + /* It's waiting to be able to write more */ + chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "entered waiting_to_write from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } + } else { + /* No cells left; it can go to idle or waiting_for_cells */ + if (channel_num_cells_writeable(chan) > 0) { + /* + * It can still accept writes, so it goes to + * waiting_for_cells + */ + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "entered waiting_for_cells from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + /* + * We exactly filled up the output queue with all available + * cells; go to idle. + */ + chan->scheduler_state = SCHED_CHAN_IDLE; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "become idle from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } + } } + + log_debug(LD_SCHED, + "Scheduler flushed %d cells onto pending channel " + U64_FORMAT " at %p", + (int)flushed, U64_PRINTF_ARG(chan->global_identifier), + chan); } else { - /* Not getting it this round; put it back on the list */ - smartlist_add(channels_pending, chan); - /* It states in SCHED_CHAN_PENDING */ + log_info(LD_SCHED, + "Scheduler saw pending channel " U64_FORMAT " at %p with " + "no cells writeable", + U64_PRINTF_ARG(chan->global_identifier), chan); + /* Put it back to WAITING_TO_WRITE */ + chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; } - } SMARTLIST_FOREACH_END(chan); + } - smartlist_free(tmp); + /* Readd any channels we need to */ + if (to_readd) { + SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, chan) { + chan->scheduler_state = SCHED_CHAN_PENDING; + smartlist_pqueue_add(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx), + chan); + } SMARTLIST_FOREACH_END(chan); + smartlist_free(to_readd); + } n_chans_after = smartlist_len(channels_pending); q_len_after = channel_get_global_queue_estimate(); @@ -473,7 +548,10 @@ scheduler_channel_wants_writes(channel_t *chan) /* * It can write now, so it goes to channels_pending. */ - smartlist_add(channels_pending, chan); + smartlist_pqueue_add(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx), + chan); chan->scheduler_state = SCHED_CHAN_PENDING; log_debug(LD_SCHED, "Channel " U64_FORMAT " at %p went from waiting_to_write " |