diff options
Diffstat (limited to 'src/or')
-rw-r--r-- | src/or/channel.c | 4 | ||||
-rw-r--r-- | src/or/config.c | 11 | ||||
-rw-r--r-- | src/or/scheduler.c | 639 | ||||
-rw-r--r-- | src/or/scheduler.h | 185 | ||||
-rw-r--r-- | src/or/scheduler_kist.c | 585 | ||||
-rw-r--r-- | src/or/scheduler_vanilla.c | 186 |
6 files changed, 1156 insertions, 454 deletions
diff --git a/src/or/channel.c b/src/or/channel.c index d64a0347a9..56c54c0c51 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -4826,8 +4826,6 @@ channel_update_xmit_queue_size(channel_t *chan) U64_FORMAT ", new size is " U64_FORMAT, U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), U64_PRINTF_ARG(estimated_total_queue_size)); - /* Tell the scheduler we're increasing the queue size */ - scheduler_adjust_queue_size(chan, 1, adj); } } else if (queued < chan->bytes_queued_for_xmit) { adj = chan->bytes_queued_for_xmit - queued; @@ -4850,8 +4848,6 @@ channel_update_xmit_queue_size(channel_t *chan) U64_FORMAT ", new size is " U64_FORMAT, U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), U64_PRINTF_ARG(estimated_total_queue_size)); - /* Tell the scheduler we're decreasing the queue size */ - scheduler_adjust_queue_size(chan, -1, adj); } } } diff --git a/src/or/config.c b/src/or/config.c index a7bc23af59..285d4952f2 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -1813,14 +1813,9 @@ options_act(const or_options_t *old_options) return -1; } - /* XXXFORTOR remove set_watermarks */ - /* Set up scheduler thresholds */ - scheduler_set_watermarks(100 * 1024*1024 /* 100 MB */, - 101 * 1024*1024 /* 101 MB */, - 100); - - /* XXXFORTOR enable notification to sched that the conf might have changed */ - //scheduler_conf_changed(); + /* Inform the scheduler subsystem that a configuration changed happened. It + * might be a change of scheduler or parameter. */ + scheduler_conf_changed(); /* Set up accounting */ if (accounting_parse_options(options, 0)<0) { diff --git a/src/or/scheduler.c b/src/or/scheduler.c index eb31bc2152..b04bdceb40 100644 --- a/src/or/scheduler.c +++ b/src/or/scheduler.c @@ -2,9 +2,7 @@ /* See LICENSE for licensing information */ #include "or.h" - -#define TOR_CHANNEL_INTERNAL_ /* For channel_flush_some_cells() */ -#include "channel.h" +#include "config.h" #include "compat_libevent.h" #define SCHEDULER_PRIVATE_ @@ -12,35 +10,41 @@ #include <event2/event.h> -/* - * Scheduler high/low watermarks - */ - -static uint32_t sched_q_low_water = 16384; -static uint32_t sched_q_high_water = 32768; - -/* - * Maximum cells to flush in a single call to channel_flush_some_cells(); - * setting this low means more calls, but too high and we could overshoot - * sched_q_high_water. - */ - -static uint32_t sched_max_flush_cells = 16; - /** * \file scheduler.c * \brief Channel scheduling system: decides which channels should send and * receive when. * - * This module implements a scheduler algorithm, to decide - * which channels should send/receive when. + * This module is the global/common parts of the scheduling system. This system + * is what decides what channels get to send cells on their circuits and when. + * + * Terms: + * - "Scheduling system": the collection of scheduler*.{h,c} files and their + * aggregate behavior. + * - "Scheduler implementation": a scheduler_t. The scheduling system has one + * active scheduling implementation at a time. + * + * In this file you will find state that any scheduler implmentation can have + * access to as well as the functions the rest of Tor uses to interact with the + * scheduling system. * * The earliest versions of Tor approximated a kind of round-robin system - * among active connections, but only approximated it. + * among active connections, but only approximated it. It would only consider + * one connection (roughly equal to a channel in today's terms) at a time, and + * thus could only prioritize circuits against others on the same connection. + * + * Then in response to the KIST paper[0], Tor implemented a global + * circuit scheduler. It was supposed to prioritize circuits across man + * channels, but wasn't effective. It is preserved in scheduler_vanilla.c. + * + * [0]: http://www.robgjansen.com/publications/kist-sec2014.pdf * - * Now, write scheduling works by keeping track of which channels can - * accept cells, and have cells to write. From the scheduler's perspective, - * a channel can be in four possible states: + * Then we actually got around to implementing KIST for real. We decided to + * modularize the scheduler so new ones can be implemented. You can find KIST + * in scheduler_kist.c. + * + * Channels have one of four scheduling states based on whether or not they + * have cells to send and whether or not they are able to send. * * <ol> * <li> @@ -125,85 +129,108 @@ static uint32_t sched_max_flush_cells = 16; * </ol> * * Other event-driven parts of the code move channels between these scheduling - * states by calling scheduler functions; the scheduler only runs on open-for- - * writes/has-cells channels and is the only path for those to transition to - * other states. The scheduler_run() function gives us the opportunity to do - * scheduling work, and is called from other scheduler functions whenever a - * state transition occurs, and periodically from the main event loop. + * states by calling scheduler functions. The scheduling system builds up a + * list of channels in the SCHED_CHAN_PENDING state that the scheduler + * implementation should then use when it runs. Scheduling implementations need + * to properly update channel states during their scheduler_t->run() function + * as that is the only opportunity for channels to move from SCHED_CHAN_PENDING + * to any other state. + * + * The remainder of this file is a small amount of state that any scheduler + * implementation should have access to, and the functions the rest of Tor uses + * to interact with the scheduling system. */ -/* Scheduler global data structures */ +/***************************************************************************** + * Scheduling system state + * + * State that can be accessed from any scheduler implementation (but not + * outside the scheduling system) + *****************************************************************************/ + +STATIC scheduler_t *scheduler; /* * We keep a list of channels that are pending - i.e, have cells to write - * and can accept them to send. The enum scheduler_state in channel_t + * and can accept them to send. The enum scheduler_state in channel_t * is reserved for our use. + * + * Priority queue of channels that can write and have cells (pending work) */ - -/* Pqueue of channels that can write and have cells (pending work) */ STATIC smartlist_t *channels_pending = NULL; /* * This event runs the scheduler from its callback, and is manually * activated whenever a channel enters open for writes/cells to send. */ - STATIC struct event *run_sched_ev = NULL; -/* - * Queue heuristic; this is not the queue size, but an 'effective queuesize' - * that ages out contributions from stalled channels. - */ - -STATIC uint64_t queue_heuristic = 0; +/***************************************************************************** + * Scheduling system static function definitions + * + * Functions that can only be accessed from this file. + *****************************************************************************/ /* - * Timestamp for last queue heuristic update + * Scheduler event callback; this should get triggered once per event loop + * if any scheduling work was created during the event loop. */ +static void +scheduler_evt_callback(evutil_socket_t fd, short events, void *arg) +{ + (void) fd; + (void) events; + (void) arg; -STATIC time_t queue_heuristic_timestamp = 0; + log_debug(LD_SCHED, "Scheduler event callback called"); -/* Scheduler static function declarations */ + tor_assert(run_sched_ev); -static void scheduler_evt_callback(evutil_socket_t fd, - short events, void *arg); -static int scheduler_more_work(void); -static void scheduler_retrigger(void); -#if 0 -static void scheduler_trigger(void); -#endif + /* Run the scheduler. This is a mandatory function. */ + tor_assert(scheduler->run); + scheduler->run(); -/* Scheduler function implementations */ + /* Schedule itself back in if it has more work. */ + tor_assert(scheduler->schedule); + scheduler->schedule(); +} -/** Free everything and shut down the scheduling system */ +/***************************************************************************** + * Scheduling system private function definitions + * + * Functions that can only be accessed from scheduler*.c + *****************************************************************************/ -void -scheduler_free_all(void) +/* Return the pending channel list. */ +smartlist_t * +get_channels_pending(void) { - log_debug(LD_SCHED, "Shutting down scheduler"); - - if (run_sched_ev) { - if (event_del(run_sched_ev) < 0) { - log_warn(LD_BUG, "Problem deleting run_sched_ev"); - } - tor_event_free(run_sched_ev); - run_sched_ev = NULL; - } + return channels_pending; +} - if (channels_pending) { - smartlist_free(channels_pending); - channels_pending = NULL; - } +/* Return our libevent scheduler event. */ +struct event * +get_run_sched_ev(void) +{ + return run_sched_ev; } -/** - * Comparison function to use when sorting pending channels - */ +/* Return true iff the scheduler subsystem should use KIST. */ +int +scheduler_should_use_kist(void) +{ + int64_t run_freq = kist_scheduler_run_interval(); + log_info(LD_SCHED, "Determined sched_run_interval should be %" PRId64 ". " + "Will%s use KIST.", + run_freq, (run_freq > 0 ? "" : " not")); + return run_freq > 0; +} -MOCK_IMPL(STATIC int, +/* Comparison function to use when sorting pending channels */ +MOCK_IMPL(int, scheduler_compare_channels, (const void *c1_v, const void *c2_v)) { - channel_t *c1 = NULL, *c2 = NULL; + const channel_t *c1 = NULL, *c2 = NULL; /* These are a workaround for -Wbad-function-cast throwing a fit */ const circuitmux_policy_t *p1, *p2; uintptr_t p1_i, p2_i; @@ -211,8 +238,8 @@ scheduler_compare_channels, (const void *c1_v, const void *c2_v)) tor_assert(c1_v); tor_assert(c2_v); - c1 = (channel_t *)(c1_v); - c2 = (channel_t *)(c2_v); + c1 = (const channel_t *)(c1_v); + c2 = (const channel_t *)(c2_v); tor_assert(c1); tor_assert(c2); @@ -242,26 +269,109 @@ scheduler_compare_channels, (const void *c1_v, const void *c2_v)) } } +/***************************************************************************** + * Scheduling system global functions + * + * Functions that can be accessed from anywhere in Tor. + *****************************************************************************/ + /* - * Scheduler event callback; this should get triggered once per event loop - * if any scheduling work was created during the event loop. + * Little helper function called from a few different places. It changes the + * scheduler implementation, if necessary. And if it did, it then tells the + * old one to free its state and the new one to initialize. */ - static void -scheduler_evt_callback(evutil_socket_t fd, short events, void *arg) +set_scheduler(void) { - (void)fd; - (void)events; - (void)arg; - log_debug(LD_SCHED, "Scheduler event callback called"); + int have_kist = 0; - tor_assert(run_sched_ev); + /* Switch, if needed */ + scheduler_t *old_scheduler = scheduler; + if (scheduler_should_use_kist()) { + scheduler = get_kist_scheduler(); + have_kist = 1; + } else { + scheduler = get_vanilla_scheduler(); + } + tor_assert(scheduler); + + if (old_scheduler != scheduler) { + /* Allow the old scheduler to clean up, if needed. */ + if (old_scheduler && old_scheduler->free_all) { + old_scheduler->free_all(); + } + /* We don't clean up the old one, we keep any type of scheduler we've + * allocated so we can do an easy switch back. */ + + /* Initialize the new scheduler. */ + if (scheduler->init) { + scheduler->init(); + } + log_notice(LD_CONFIG, "Using the %s scheduler.", + have_kist ? "KIST" : "vanilla"); + } +} + +/* + * This is how the scheduling system is notified of Tor's configuration + * changing. For example: a SIGHUP was issued. + */ +void +scheduler_conf_changed(void) +{ + /* Let the scheduler decide what it should do. */ + set_scheduler(); + + /* Then tell the (possibly new) scheduler that we have new options. */ + if (scheduler->on_new_options) { + scheduler->on_new_options(); + } +} + +/* + * Whenever we get a new consensus, this function is called. + */ +void +scheduler_notify_networkstatus_changed(const networkstatus_t *old_c, + const networkstatus_t *new_c) +{ + /* Then tell the (possibly new) scheduler that we have a new consensus */ + if (scheduler->on_new_consensus) { + scheduler->on_new_consensus(old_c, new_c); + } + /* Maybe the consensus param made us change the scheduler. */ + set_scheduler(); +} + +/* + * Free everything scheduling-related from main.c. Note this is only called + * when Tor is shutting down, while scheduler_t->free_all() is called both when + * Tor is shutting down and when we are switching schedulers. + */ +void +scheduler_free_all(void) +{ + log_debug(LD_SCHED, "Shutting down scheduler"); + + if (run_sched_ev) { + if (event_del(run_sched_ev) < 0) { + log_warn(LD_BUG, "Problem deleting run_sched_ev"); + } + tor_event_free(run_sched_ev); + run_sched_ev = NULL; + } - /* Run the scheduler */ - scheduler_run(); + if (channels_pending) { + /* We don't have ownership of the object in this list. */ + smartlist_free(channels_pending); + channels_pending = NULL; + } - /* Do we have more work to do? */ - if (scheduler_more_work()) scheduler_retrigger(); + if (scheduler && scheduler->free_all) { + scheduler->free_all(); + } + tor_free(scheduler); + scheduler = NULL; } /** Mark a channel as no longer ready to accept writes */ @@ -309,8 +419,6 @@ scheduler_channel_doesnt_want_writes,(channel_t *chan)) MOCK_IMPL(void, scheduler_channel_has_waiting_cells,(channel_t *chan)) { - int became_pending = 0; - tor_assert(chan); tor_assert(channels_pending); @@ -330,7 +438,9 @@ scheduler_channel_has_waiting_cells,(channel_t *chan)) "Channel " U64_FORMAT " at %p went from waiting_for_cells " "to pending", U64_PRINTF_ARG(chan->global_identifier), chan); - became_pending = 1; + /* If we made a channel pending, we potentially have scheduling work to + * do. */ + scheduler->schedule(); } else { /* * It's not in waiting_for_cells, so it can't become pending; it's @@ -345,16 +455,13 @@ scheduler_channel_has_waiting_cells,(channel_t *chan)) U64_PRINTF_ARG(chan->global_identifier), chan); } } - - /* - * If we made a channel pending, we potentially have scheduling work - * to do. - */ - if (became_pending) scheduler_retrigger(); } -/** Set up the scheduling system */ - +/* + * Initialize everything scheduling-related from config.c. Note this is only + * called when Tor is starting up, while scheduler_t->init() is called both + * when Tor is starting up and when we are switching schedulers. + */ void scheduler_init(void) { @@ -363,34 +470,17 @@ scheduler_init(void) tor_assert(!run_sched_ev); run_sched_ev = tor_event_new(tor_libevent_get_base(), -1, 0, scheduler_evt_callback, NULL); - channels_pending = smartlist_new(); - queue_heuristic = 0; - queue_heuristic_timestamp = approx_time(); -} -/** Check if there's more scheduling work */ - -static int -scheduler_more_work(void) -{ - tor_assert(channels_pending); - - return ((scheduler_get_queue_heuristic() < sched_q_low_water) && - ((smartlist_len(channels_pending) > 0))) ? 1 : 0; + set_scheduler(); } -/** Retrigger the scheduler in a way safe to use from the callback */ - -static void -scheduler_retrigger(void) -{ - tor_assert(run_sched_ev); - event_active(run_sched_ev, EV_TIMEOUT, 1); -} - -/** Notify the scheduler of a channel being closed */ - +/* + * If a channel is going away, this is how the scheduling system is informed + * so it can do any freeing necessary. This ultimately calls + * scheduler_t->on_channel_free() so the current scheduler can release any + * state specific to this channel. + */ MOCK_IMPL(void, scheduler_release_channel,(channel_t *chan)) { @@ -398,179 +488,29 @@ scheduler_release_channel,(channel_t *chan)) tor_assert(channels_pending); if (chan->scheduler_state == SCHED_CHAN_PENDING) { - smartlist_pqueue_remove(channels_pending, - scheduler_compare_channels, - offsetof(channel_t, sched_heap_idx), - chan); - } - - chan->scheduler_state = SCHED_CHAN_IDLE; -} - -/** Run the scheduling algorithm if necessary */ - -MOCK_IMPL(void, -scheduler_run, (void)) -{ - int n_cells, n_chans_before, n_chans_after; - uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after; - ssize_t flushed, flushed_this_time; - smartlist_t *to_readd = NULL; - channel_t *chan = NULL; - - log_debug(LD_SCHED, "We have a chance to run the scheduler"); - - if (scheduler_get_queue_heuristic() < sched_q_low_water) { - n_chans_before = smartlist_len(channels_pending); - q_len_before = channel_get_global_queue_estimate(); - q_heur_before = scheduler_get_queue_heuristic(); - - while (scheduler_get_queue_heuristic() <= sched_q_high_water && - smartlist_len(channels_pending) > 0) { - /* Pop off a channel */ - chan = smartlist_pqueue_pop(channels_pending, - scheduler_compare_channels, - offsetof(channel_t, sched_heap_idx)); - tor_assert(chan); - - /* Figure out how many cells we can write */ - n_cells = channel_num_cells_writeable(chan); - if (n_cells > 0) { - log_debug(LD_SCHED, - "Scheduler saw pending channel " U64_FORMAT " at %p with " - "%d cells writeable", - U64_PRINTF_ARG(chan->global_identifier), chan, n_cells); - - flushed = 0; - while (flushed < n_cells && - scheduler_get_queue_heuristic() <= sched_q_high_water) { - flushed_this_time = - channel_flush_some_cells(chan, - MIN(sched_max_flush_cells, - (size_t) n_cells - flushed)); - if (flushed_this_time <= 0) break; - flushed += flushed_this_time; - } - - if (flushed < n_cells) { - /* We ran out of cells to flush */ - chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; - log_debug(LD_SCHED, - "Channel " U64_FORMAT " at %p " - "entered waiting_for_cells from pending", - U64_PRINTF_ARG(chan->global_identifier), - chan); - } else { - /* The channel may still have some cells */ - if (channel_more_to_flush(chan)) { - /* The channel goes to either pending or waiting_to_write */ - if (channel_num_cells_writeable(chan) > 0) { - /* Add it back to pending later */ - if (!to_readd) to_readd = smartlist_new(); - smartlist_add(to_readd, chan); - log_debug(LD_SCHED, - "Channel " U64_FORMAT " at %p " - "is still pending", - U64_PRINTF_ARG(chan->global_identifier), - chan); - } else { - /* It's waiting to be able to write more */ - chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; - log_debug(LD_SCHED, - "Channel " U64_FORMAT " at %p " - "entered waiting_to_write from pending", - U64_PRINTF_ARG(chan->global_identifier), - chan); - } - } else { - /* No cells left; it can go to idle or waiting_for_cells */ - if (channel_num_cells_writeable(chan) > 0) { - /* - * It can still accept writes, so it goes to - * waiting_for_cells - */ - chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; - log_debug(LD_SCHED, - "Channel " U64_FORMAT " at %p " - "entered waiting_for_cells from pending", - U64_PRINTF_ARG(chan->global_identifier), - chan); - } else { - /* - * We exactly filled up the output queue with all available - * cells; go to idle. - */ - chan->scheduler_state = SCHED_CHAN_IDLE; - log_debug(LD_SCHED, - "Channel " U64_FORMAT " at %p " - "become idle from pending", - U64_PRINTF_ARG(chan->global_identifier), - chan); - } - } - } - - log_debug(LD_SCHED, - "Scheduler flushed %d cells onto pending channel " - U64_FORMAT " at %p", - (int)flushed, U64_PRINTF_ARG(chan->global_identifier), - chan); - } else { - log_info(LD_SCHED, - "Scheduler saw pending channel " U64_FORMAT " at %p with " - "no cells writeable", - U64_PRINTF_ARG(chan->global_identifier), chan); - /* Put it back to WAITING_TO_WRITE */ - chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; - } + if (smartlist_pos(channels_pending, chan) == -1) { + log_warn(LD_SCHED, "Scheduler asked to release channel %" PRIu64 " " + "but it wasn't in channels_pending", + chan->global_identifier); + } else { + smartlist_pqueue_remove(channels_pending, + scheduler_compare_channels, + offsetof(channel_t, sched_heap_idx), + chan); } - - /* Readd any channels we need to */ - if (to_readd) { - SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) { - readd_chan->scheduler_state = SCHED_CHAN_PENDING; - smartlist_pqueue_add(channels_pending, - scheduler_compare_channels, - offsetof(channel_t, sched_heap_idx), - readd_chan); - } SMARTLIST_FOREACH_END(readd_chan); - smartlist_free(to_readd); + if (scheduler->on_channel_free) { + scheduler->on_channel_free(chan); } - - n_chans_after = smartlist_len(channels_pending); - q_len_after = channel_get_global_queue_estimate(); - q_heur_after = scheduler_get_queue_heuristic(); - log_debug(LD_SCHED, - "Scheduler handled %d of %d pending channels, queue size from " - U64_FORMAT " to " U64_FORMAT ", queue heuristic from " - U64_FORMAT " to " U64_FORMAT, - n_chans_before - n_chans_after, n_chans_before, - U64_PRINTF_ARG(q_len_before), U64_PRINTF_ARG(q_len_after), - U64_PRINTF_ARG(q_heur_before), U64_PRINTF_ARG(q_heur_after)); } -} - -/** Trigger the scheduling event so we run the scheduler later */ - -#if 0 -static void -scheduler_trigger(void) -{ - log_debug(LD_SCHED, "Triggering scheduler event"); - - tor_assert(run_sched_ev); - event_add(run_sched_ev, EV_TIMEOUT, 1); + chan->scheduler_state = SCHED_CHAN_IDLE; } -#endif /** Mark a channel as ready to accept writes */ void scheduler_channel_wants_writes(channel_t *chan) { - int became_pending = 0; - tor_assert(chan); tor_assert(channels_pending); @@ -579,6 +519,8 @@ scheduler_channel_wants_writes(channel_t *chan) /* * It can write now, so it goes to channels_pending. */ + log_debug(LD_SCHED, "chan=%" PRIu64 " became pending", + chan->global_identifier); smartlist_pqueue_add(channels_pending, scheduler_compare_channels, offsetof(channel_t, sched_heap_idx), @@ -588,7 +530,8 @@ scheduler_channel_wants_writes(channel_t *chan) "Channel " U64_FORMAT " at %p went from waiting_to_write " "to pending", U64_PRINTF_ARG(chan->global_identifier), chan); - became_pending = 1; + /* We just made a channel pending, we have scheduling work to do. */ + scheduler->schedule(); } else { /* * It's not in SCHED_CHAN_WAITING_TO_WRITE, so it can't become pending; @@ -602,19 +545,13 @@ scheduler_channel_wants_writes(channel_t *chan) U64_PRINTF_ARG(chan->global_identifier), chan); } } - - /* - * If we made a channel pending, we potentially have scheduling work - * to do. - */ - if (became_pending) scheduler_retrigger(); } -/** - * Notify the scheduler that a channel's position in the pqueue may have - * changed - */ +#ifdef TOR_UNIT_TESTS +/* + * Notify scheduler that a channel's queue position may have changed. + */ void scheduler_touch_channel(channel_t *chan) { @@ -634,115 +571,5 @@ scheduler_touch_channel(channel_t *chan) /* else no-op, since it isn't in the queue */ } -/** - * Notify the scheduler of a queue size adjustment, to recalculate the - * queue heuristic. - */ - -void -scheduler_adjust_queue_size(channel_t *chan, int dir, uint64_t adj) -{ - time_t now = approx_time(); - - log_debug(LD_SCHED, - "Queue size adjustment by %s" U64_FORMAT " for channel " - U64_FORMAT, - (dir >= 0) ? "+" : "-", - U64_PRINTF_ARG(adj), - U64_PRINTF_ARG(chan->global_identifier)); - - /* Get the queue heuristic up to date */ - scheduler_update_queue_heuristic(now); - - /* Adjust as appropriate */ - if (dir >= 0) { - /* Increasing it */ - queue_heuristic += adj; - } else { - /* Decreasing it */ - if (queue_heuristic > adj) queue_heuristic -= adj; - else queue_heuristic = 0; - } - - log_debug(LD_SCHED, - "Queue heuristic is now " U64_FORMAT, - U64_PRINTF_ARG(queue_heuristic)); -} - -/** - * Query the current value of the queue heuristic - */ - -STATIC uint64_t -scheduler_get_queue_heuristic(void) -{ - time_t now = approx_time(); - - scheduler_update_queue_heuristic(now); - - return queue_heuristic; -} - -/** - * Adjust the queue heuristic value to the present time - */ - -STATIC void -scheduler_update_queue_heuristic(time_t now) -{ - time_t diff; - - if (queue_heuristic_timestamp == 0) { - /* - * Nothing we can sensibly do; must not have been initted properly. - * Oh well. - */ - queue_heuristic_timestamp = now; - } else if (queue_heuristic_timestamp < now) { - diff = now - queue_heuristic_timestamp; - /* - * This is a simple exponential age-out; the other proposed alternative - * was a linear age-out using the bandwidth history in rephist.c; I'm - * going with this out of concern that if an adversary can jam the - * scheduler long enough, it would cause the bandwidth to drop to - * zero and render the aging mechanism ineffective thereafter. - */ - if (0 <= diff && diff < 64) queue_heuristic >>= diff; - else queue_heuristic = 0; - - queue_heuristic_timestamp = now; - - log_debug(LD_SCHED, - "Queue heuristic is now " U64_FORMAT, - U64_PRINTF_ARG(queue_heuristic)); - } - /* else no update needed, or time went backward */ -} - -/** - * Set scheduler watermarks and flush size - */ - -void -scheduler_set_watermarks(uint32_t lo, uint32_t hi, uint32_t max_flush) -{ - /* Sanity assertions - caller should ensure these are true */ - tor_assert(lo > 0); - tor_assert(hi > lo); - tor_assert(max_flush > 0); - - sched_q_low_water = lo; - sched_q_high_water = hi; - sched_max_flush_cells = max_flush; -} - -/* XXXFORTOR Temp def of this func to get this commit to compile. Replace with - * real func */ -void -scheduler_notify_networkstatus_changed(const networkstatus_t *old_c, - const networkstatus_t *new_c) -{ - (void) old_c; - (void) new_c; -} +#endif /* TOR_UNIT_TESTS */ diff --git a/src/or/scheduler.h b/src/or/scheduler.h index 699ccde7a3..3932e60490 100644 --- a/src/or/scheduler.h +++ b/src/or/scheduler.h @@ -1,9 +1,9 @@ -/* * Copyright (c) 2013-2017, The Tor Project, Inc. */ +/* * Copyright (c) 2017, The Tor Project, Inc. */ /* See LICENSE for licensing information */ /** * \file scheduler.h - * \brief Header file for scheduler.c + * \brief Header file for scheduler*.c **/ #ifndef TOR_SCHEDULER_H @@ -13,50 +13,163 @@ #include "channel.h" #include "testsupport.h" -/* Global-visibility scheduler functions */ +/* + * A scheduler implementation is a collection of function pointers. If you + * would like to add a new scheduler called foo, create scheduler_foo.c, + * implement at least the mandatory ones, and implement get_foo_scheduler() + * that returns a complete scheduler_t for your foo scheduler. See + * scheduler_kist.c for an example. + * + * These function pointers SHOULD NOT be used anywhere outside of the + * scheduling source files. The rest of Tor should communicate with the + * scheduling system through the functions near the bottom of this file, and + * those functions will call into the current scheduler implementation as + * necessary. + * + * If your scheduler doesn't need to implement something (for example: it + * doesn't create any state for itself, thus it has nothing to free when Tor + * is shutting down), then set that function pointer to NULL. + */ +typedef struct scheduler_s { + /* (Optional) To be called when we want to prepare a scheduler for use. + * Perhaps Tor just started and we are the lucky chosen scheduler, or + * perhaps Tor is switching to this scheduler. No matter the case, this is + * where we would prepare any state and initialize parameters. You might + * think of this as the opposite of free_all(). */ + void (*init)(void); + + /* (Optional) To be called when we want to tell the scheduler to delete all + * of its state (if any). Perhaps Tor is shutting down or perhaps we are + * switching schedulers. */ + void (*free_all)(void); + + /* (Mandatory) Libevent controls the main event loop in Tor, and this is + * where we register with libevent the next execution of run_sched_ev [which + * ultimately calls run()]. */ + void (*schedule)(void); + + /* (Mandatory) This is the heart of a scheduler! This is where the + * excitement happens! Here libevent has given us the chance to execute, and + * we should do whatever we need to do in order to move some cells from + * their circuit queues to output buffers in an intelligent manner. We + * should do this quickly. When we are done, we'll try to schedule() ourself + * if more work needs to be done to setup the next scehduling run. */ + void (*run)(void); + + /* + * External event not related to the scheduler but that can influence it. + */ + + /* (Optional) To be called whenever Tor finds out about a new consensus. + * First the scheduling system as a whole will react to the new consensus + * and change the scheduler if needed. After that, whatever is the (possibly + * new) scheduler will call this so it has the chance to react to the new + * consensus too. If there's a consensus parameter that your scheduler wants + * to keep an eye on, this is where you should check for it. */ + void (*on_new_consensus)(const networkstatus_t *old_c, + const networkstatus_t *new_c); + + /* (Optional) To be called when a channel is being freed. Sometimes channels + * go away (for example: the relay on the other end is shutting down). If + * the scheduler keeps any channel-specific state and has memory to free + * when channels go away, implement this and free it here. */ + void (*on_channel_free)(const channel_t *); + + /* (Optional) To be called whenever Tor is reloading configuration options. + * For example: SIGHUP was issued and Tor is rereading its torrc. A + * scheduler should use this as an opportunity to parse and cache torrc + * options so that it doesn't have to call get_options() all the time. */ + void (*on_new_options)(void); +} scheduler_t; + +/***************************************************************************** + * Globally visible scheduler functions + * + * These functions are how the rest of Tor communicates with the scheduling + * system. + *****************************************************************************/ -/* Set up and shut down the scheduler from main.c */ -void scheduler_free_all(void); void scheduler_init(void); -MOCK_DECL(void, scheduler_run, (void)); - -/* Mark channels as having cells or wanting/not wanting writes */ -MOCK_DECL(void,scheduler_channel_doesnt_want_writes,(channel_t *chan)); -MOCK_DECL(void,scheduler_channel_has_waiting_cells,(channel_t *chan)); -void scheduler_channel_wants_writes(channel_t *chan); - -/* Notify the scheduler of a channel being closed */ -MOCK_DECL(void,scheduler_release_channel,(channel_t *chan)); - -/* Notify scheduler of queue size adjustments */ -void scheduler_adjust_queue_size(channel_t *chan, int dir, uint64_t adj); - -/* Notify scheduler that a channel's queue position may have changed */ -void scheduler_touch_channel(channel_t *chan); - -/* Adjust the watermarks from config file*/ -void scheduler_set_watermarks(uint32_t lo, uint32_t hi, uint32_t max_flush); - -/* XXXFORTOR Temp def of this func to get this commit to compile. Replace with - * real func */ +void scheduler_free_all(void); +void scheduler_conf_changed(void); void scheduler_notify_networkstatus_changed(const networkstatus_t *old_c, const networkstatus_t *new_c); +MOCK_DECL(void, scheduler_release_channel, (channel_t *chan)); -/* Things only scheduler.c and its test suite should see */ - +/* + * Ways for a channel to interact with the scheduling system. A channel only + * really knows (i) whether or not it has cells it wants to send, and + * (ii) whether or not it would like to write. + */ +void scheduler_channel_wants_writes(channel_t *chan); +MOCK_DECL(void, scheduler_channel_doesnt_want_writes, (channel_t *chan)); +MOCK_DECL(void, scheduler_channel_has_waiting_cells, (channel_t *chan)); + +/***************************************************************************** + * Private scheduler functions + * + * These functions are only visible to the scheduling system, the current + * scheduler implementation, and tests. + *****************************************************************************/ #ifdef SCHEDULER_PRIVATE_ -MOCK_DECL(STATIC int, scheduler_compare_channels, + +/********************************* + * Defined in scheduler.c + *********************************/ +int scheduler_should_use_kist(void); +smartlist_t *get_channels_pending(void); +struct event *get_run_sched_ev(void); +MOCK_DECL(int, scheduler_compare_channels, (const void *c1_v, const void *c2_v)); -STATIC uint64_t scheduler_get_queue_heuristic(void); -STATIC void scheduler_update_queue_heuristic(time_t now); #ifdef TOR_UNIT_TESTS extern smartlist_t *channels_pending; extern struct event *run_sched_ev; -extern uint64_t queue_heuristic; -extern time_t queue_heuristic_timestamp; -#endif -#endif +extern scheduler_t *scheduler; +void scheduler_touch_channel(channel_t *chan); +#endif /* TOR_UNIT_TESTS */ + +/********************************* + * Defined in scheduler_kist.c + *********************************/ + +/* Socke table entry which holds information of a channel's socket and kernel + * TCP information. Only used by KIST. */ +typedef struct socket_table_ent_s { + HT_ENTRY(socket_table_ent_s) node; + const channel_t *chan; + /* Amount written this scheduling run */ + uint64_t written; + /* Amount that can be written this scheduling run */ + uint64_t limit; + /* TCP info from the kernel */ + uint32_t cwnd; + uint32_t unacked; + uint32_t mss; + uint32_t notsent; +} socket_table_ent_t; + +typedef HT_HEAD(outbuf_table_s, outbuf_table_ent_s) outbuf_table_t; + +MOCK_DECL(int, channel_should_write_to_kernel, + (outbuf_table_t *table, channel_t *chan)); +MOCK_DECL(void, channel_write_to_kernel, (channel_t *chan)); +MOCK_DECL(void, update_socket_info_impl, (socket_table_ent_t *ent)); + +scheduler_t *get_kist_scheduler(void); +int32_t kist_scheduler_run_interval(const networkstatus_t *ns); + +#ifdef TOR_UNIT_TESTS +extern int32_t sched_run_interval; +#endif /* TOR_UNIT_TESTS */ + +/********************************* + * Defined in scheduler_vanilla.c + *********************************/ + +scheduler_t *get_vanilla_scheduler(void); + +#endif /* SCHEDULER_PRIVATE_ */ -#endif /* !defined(TOR_SCHEDULER_H) */ +#endif /* TOR_SCHEDULER_H */ diff --git a/src/or/scheduler_kist.c b/src/or/scheduler_kist.c index c140849538..97722cb254 100644 --- a/src/or/scheduler_kist.c +++ b/src/or/scheduler_kist.c @@ -1,5 +1,590 @@ /* Copyright (c) 2017, The Tor Project, Inc. */ /* See LICENSE for licensing information */ +#include <event2/event.h> +#include <netinet/tcp.h> + +#include "or.h" +#include "buffers.h" +#include "config.h" +#include "connection.h" +#include "networkstatus.h" +#define TOR_CHANNEL_INTERNAL_ +#include "channel.h" +#include "channeltls.h" +#define SCHEDULER_PRIVATE_ #include "scheduler.h" +#define TLS_PER_CELL_OVERHEAD 29 + +/* Kernel interface needed for KIST. */ +#include <linux/sockios.h> + +/***************************************************************************** + * Data structures and supporting functions + *****************************************************************************/ + +/* Socket_table hash table stuff. The socket_table keeps track of per-socket + * limit information imposed by kist and used by kist. */ + +static uint32_t +socket_table_ent_hash(const socket_table_ent_t *ent) +{ + return (uint32_t)ent->chan->global_identifier; +} + +static unsigned +socket_table_ent_eq(const socket_table_ent_t *a, const socket_table_ent_t *b) +{ + return a->chan->global_identifier == b->chan->global_identifier; +} + +typedef HT_HEAD(socket_table_s, socket_table_ent_s) socket_table_t; + +static socket_table_t socket_table = HT_INITIALIZER(); + +HT_PROTOTYPE(socket_table_s, socket_table_ent_s, node, socket_table_ent_hash, + socket_table_ent_eq) +HT_GENERATE2(socket_table_s, socket_table_ent_s, node, socket_table_ent_hash, + socket_table_ent_eq, 0.6, tor_reallocarray, tor_free_) + +/* outbuf_table hash table stuff. The outbuf_table keeps track of which + * channels have data sitting in their outbuf so the kist scheduler can force + * a write from outbuf to kernel periodically during a run and at the end of a + * run. */ + +typedef struct outbuf_table_ent_s { + HT_ENTRY(outbuf_table_ent_s) node; + channel_t *chan; +} outbuf_table_ent_t; + +static uint32_t +outbuf_table_ent_hash(const outbuf_table_ent_t *ent) +{ + return (uint32_t)ent->chan->global_identifier; +} + +static unsigned +outbuf_table_ent_eq(const outbuf_table_ent_t *a, const outbuf_table_ent_t *b) +{ + return a->chan->global_identifier == b->chan->global_identifier; +} + +static outbuf_table_t outbuf_table = HT_INITIALIZER(); + +HT_PROTOTYPE(outbuf_table_s, outbuf_table_ent_s, node, outbuf_table_ent_hash, + outbuf_table_ent_eq) +HT_GENERATE2(outbuf_table_s, outbuf_table_ent_s, node, outbuf_table_ent_hash, + outbuf_table_ent_eq, 0.6, tor_reallocarray, tor_free_) + +/***************************************************************************** + * Other internal data + *****************************************************************************/ + +static struct timeval scheduler_last_run = {0, 0}; +static double sock_buf_size_factor = 1.0; +STATIC int32_t sched_run_interval = 10; +static scheduler_t *kist_scheduler = NULL; + +/***************************************************************************** + * Internally called function implementations + *****************************************************************************/ + +/* Little helper function to get the length of a channel's output buffer */ +static inline size_t +channel_outbuf_length(channel_t *chan) +{ + return buf_datalen(TO_CONN(BASE_CHAN_TO_TLS(chan)->conn)->outbuf); +} + +/* Little helper function for HT_FOREACH_FN. */ +static int +each_channel_write_to_kernel(outbuf_table_ent_t *ent, void *data) +{ + (void) data; /* Make compiler happy. */ + channel_write_to_kernel(ent->chan); + return 0; /* Returning non-zero removes the element from the table. */ +} + +/* Free the given outbuf table entry ent. */ +static int +free_outbuf_info_by_ent(outbuf_table_ent_t *ent, void *data) +{ + (void) data; /* Make compiler happy. */ + log_debug(LD_SCHED, "Freeing outbuf table entry from chan=%" PRIu64, + ent->chan->global_identifier); + tor_free(ent); + return 1; /* So HT_FOREACH_FN will remove the element */ +} + +/* Clean up outbuf_table. Probably because the KIST sched impl is going away */ +static void +free_all_outbuf_info(void) +{ + HT_FOREACH_FN(outbuf_table_s, &outbuf_table, free_outbuf_info_by_ent, NULL); +} + +/* Free the given socket table entry ent. */ +static int +free_socket_info_by_ent(socket_table_ent_t *ent, void *data) +{ + (void) data; /* Make compiler happy. */ + log_debug(LD_SCHED, "Freeing socket table entry from chan=%" PRIu64, + ent->chan->global_identifier); + tor_free(ent); + return 1; /* So HT_FOREACH_FN will remove the element */ +} + +/* Clean up socket_table. Probably because the KIST sched impl is going away */ +static void +free_all_socket_info(void) +{ + HT_FOREACH_FN(socket_table_s, &socket_table, free_socket_info_by_ent, NULL); +} + +static socket_table_ent_t * +socket_table_search(socket_table_t *table, const channel_t *chan) +{ + socket_table_ent_t search, *ent = NULL; + search.chan = chan; + ent = HT_FIND(socket_table_s, table, &search); + return ent; +} + +/* Free a socket entry in table for the given chan. */ +static void +free_socket_info_by_chan(socket_table_t *table, const channel_t *chan) +{ + socket_table_ent_t *ent = NULL; + ent = socket_table_search(table, chan); + if (!ent) + return; + log_debug(LD_SCHED, "scheduler free socket info for chan=%" PRIu64, + chan->global_identifier); + HT_REMOVE(socket_table_s, table, ent); + free_socket_info_by_ent(ent, NULL); +} + +MOCK_IMPL(void, +update_socket_info_impl, (socket_table_ent_t *ent)) +{ + int64_t tcp_space, extra_space; + const tor_socket_t sock = + TO_CONN(BASE_CHAN_TO_TLS((channel_t *) ent->chan)->conn)->s; + struct tcp_info tcp; + socklen_t tcp_info_len = sizeof(tcp); + + /* Gather information */ + getsockopt(sock, SOL_TCP, TCP_INFO, (void *)&(tcp), &tcp_info_len); + ioctl(sock, SIOCOUTQNSD, &(ent->notsent)); + ent->cwnd = tcp.tcpi_snd_cwnd; + ent->unacked = tcp.tcpi_unacked; + ent->mss = tcp.tcpi_snd_mss; + + tcp_space = (ent->cwnd - ent->unacked) * ent->mss; + if (tcp_space < 0) { + tcp_space = 0; + } + extra_space = + clamp_double_to_int64((ent->cwnd * ent->mss) * sock_buf_size_factor) - + ent->notsent; + if (extra_space < 0) { + extra_space = 0; + } + ent->limit = tcp_space + extra_space; + return; +} + +/* Given a socket that isn't in the table, add it. + * Given a socket that is in the table, reinit values that need init-ing + * every scheduling run + */ +static void +init_socket_info(socket_table_t *table, const channel_t *chan) +{ + socket_table_ent_t *ent = NULL; + ent = socket_table_search(table, chan); + if (!ent) { + log_debug(LD_SCHED, "scheduler init socket info for chan=%" PRIu64, + chan->global_identifier); + ent = tor_malloc_zero(sizeof(*ent)); + ent->chan = chan; + HT_INSERT(socket_table_s, table, ent); + } + ent->written = 0; +} + +/* Add chan to the outbuf table if it isn't already in it. If it is, then don't + * do anything */ +static void +outbuf_table_add(outbuf_table_t *table, channel_t *chan) +{ + outbuf_table_ent_t search, *ent; + search.chan = chan; + ent = HT_FIND(outbuf_table_s, table, &search); + if (!ent) { + log_debug(LD_SCHED, "scheduler init outbuf info for chan=%" PRIu64, + chan->global_identifier); + ent = tor_malloc_zero(sizeof(*ent)); + ent->chan = chan; + HT_INSERT(outbuf_table_s, table, ent); + } +} + +static void +outbuf_table_remove(outbuf_table_t *table, channel_t *chan) +{ + outbuf_table_ent_t search, *ent; + search.chan = chan; + ent = HT_FIND(outbuf_table_s, table, &search); + if (ent) { + HT_REMOVE(outbuf_table_s, table, ent); + free_outbuf_info_by_ent(ent, NULL); + } +} + +/* Set the scheduler running interval. */ +static void +set_scheduler_run_interval(const networkstatus_t *ns) +{ + int32_t old_sched_run_interval = sched_run_interval; + sched_run_interval = kist_scheduler_run_interval(ns); + if (old_sched_run_interval != sched_run_interval) { + log_info(LD_SCHED, "Scheduler KIST changing its running interval " + "from %" PRId32 " to %" PRId32, + old_sched_run_interval, sched_run_interval); + } +} + +/* Return true iff the channel associated socket can write to the kernel that + * is hasn't reach the limit. */ +static int +socket_can_write(socket_table_t *table, const channel_t *chan) +{ + socket_table_ent_t *ent = NULL; + ent = socket_table_search(table, chan); + tor_assert(ent); + + int64_t kist_limit_space = + (int64_t) (ent->limit - ent->written) / + (CELL_MAX_NETWORK_SIZE + TLS_PER_CELL_OVERHEAD); + return kist_limit_space > 0; +} + +/* Update the channel's socket kernel information. */ +static void +update_socket_info(socket_table_t *table, const channel_t *chan) +{ + socket_table_ent_t *ent = NULL; + ent = socket_table_search(table, chan); + tor_assert(ent); + update_socket_info_impl(ent); +} + +/* Increament the channel's socket written value by the number of bytes. */ +static void +update_socket_written(socket_table_t *table, channel_t *chan, size_t bytes) +{ + socket_table_ent_t *ent = NULL; + ent = socket_table_search(table, chan); + tor_assert(ent); + + log_debug(LD_SCHED, "chan=%" PRIu64 " wrote %lu bytes, old was %" PRIi64, + chan->global_identifier, bytes, ent->written); + + ent->written += bytes; +} + +/* + * A naive KIST impl would write every single cell all the way to the kernel. + * That would take a lot of system calls. A less bad KIST impl would write a + * channel's outbuf to the kernel only when we are switching to a different + * channel. But if we have two channels with equal priority, we end up writing + * one cell for each and bouncing back and forth. This KIST impl avoids that + * by only writing a channel's outbuf to the kernel if it has 8 cells or more + * in it. + */ +MOCK_IMPL(int, channel_should_write_to_kernel, + (outbuf_table_t *table, channel_t *chan)) +{ + outbuf_table_add(table, chan); + /* CELL_MAX_NETWORK_SIZE * 8 because we only want to write the outbuf to the + * kernel if there's 8 or more cells waiting */ + return channel_outbuf_length(chan) > (CELL_MAX_NETWORK_SIZE * 8); +} + +/* Little helper function to write a channel's outbuf all the way to the + * kernel */ +MOCK_IMPL(void, channel_write_to_kernel, (channel_t *chan)) +{ + log_debug(LD_SCHED, "Writing %lu bytes to kernel for chan %" PRIu64, + channel_outbuf_length(chan), chan->global_identifier); + connection_handle_write(TO_CONN(BASE_CHAN_TO_TLS(chan)->conn), 0); +} + +/* Return true iff the scheduler has work to perform. */ +static int +have_work(void) +{ + smartlist_t *cp = get_channels_pending(); + tor_assert(cp); + return smartlist_len(cp) > 0; +} + +/* Function of the scheduler interface: free_all() */ +static void +kist_free_all(void) +{ + free_all_outbuf_info(); + free_all_socket_info(); +} + +/* Function of the scheduler interface: on_channel_free() */ +static void +kist_on_channel_free(const channel_t *chan) +{ + free_socket_info_by_chan(&socket_table, chan); +} + +/* Function of the scheduler interface: on_new_consensus() */ +static void +kist_scheduler_on_new_consensus(const networkstatus_t *old_c, + const networkstatus_t *new_c) +{ + (void) old_c; + (void) new_c; + + set_scheduler_run_interval(new_c); +} + +/* Function of the scheduler interface: run() */ +static void +kist_scheduler_on_new_options(void) +{ + sock_buf_size_factor = get_options()->KISTSockBufSizeFactor; + + /* Calls kist_scheduler_run_interval which calls get_options(). */ + set_scheduler_run_interval(NULL); +} + +/* Function of the scheduler interface: init() */ +static void +kist_scheduler_init(void) +{ + kist_scheduler_on_new_options(); + tor_assert(sched_run_interval > 0); +} + +/* Function of the scheduler interface: schedule() */ +static void +kist_scheduler_schedule(void) +{ + struct timeval now, next_run; + int32_t diff; + struct event *ev = get_run_sched_ev(); + tor_assert(ev); + if (!have_work()) { + return; + } + tor_gettimeofday(&now); + diff = (int32_t) tv_mdiff(&scheduler_last_run, &now); + if (diff < sched_run_interval) { + next_run.tv_sec = 0; + /* 1000 for ms -> us */ + next_run.tv_usec = (sched_run_interval - diff) * 1000; + /* Readding an event reschedules it. It does not duplicate it. */ + event_add(ev, &next_run); + } else { + event_active(ev, EV_TIMEOUT, 1); + } +} + +/* Function of the scheduler interface: run() */ +static void +kist_scheduler_run(void) +{ + /* Define variables */ + channel_t *chan = NULL; // current working channel + /* The last distinct chan served in a sched loop. */ + channel_t *prev_chan = NULL; + int flush_result; // temporarily store results from flush calls + /* Channels to be readding to pending at the end */ + smartlist_t *to_readd = NULL; + smartlist_t *cp = get_channels_pending(); + + /* For each pending channel, collect new kernel information */ + SMARTLIST_FOREACH_BEGIN(cp, const channel_t *, pchan) { + init_socket_info(&socket_table, pchan); + update_socket_info(&socket_table, pchan); + } SMARTLIST_FOREACH_END(pchan); + + log_debug(LD_SCHED, "Running the scheduler. %d channels pending", + smartlist_len(cp)); + + /* The main scheduling loop. Loop until there are no more pending channels */ + while (smartlist_len(cp) > 0) { + /* get best channel */ + chan = smartlist_pqueue_pop(cp, scheduler_compare_channels, + offsetof(channel_t, sched_heap_idx)); + tor_assert(chan); + outbuf_table_add(&outbuf_table, chan); + + /* if we have switched to a new channel, consider writing the previous + * channel's outbuf to the kernel. */ + if (!prev_chan) prev_chan = chan; + if (prev_chan != chan) { + if (channel_should_write_to_kernel(&outbuf_table, prev_chan)) { + channel_write_to_kernel(prev_chan); + outbuf_table_remove(&outbuf_table, prev_chan); + } + prev_chan = chan; + } + + /* Only flush and write if the per-socket limit hasn't been hit */ + if (socket_can_write(&socket_table, chan)) { + /* flush to channel queue/outbuf */ + flush_result = (int)channel_flush_some_cells(chan, 1); // 1 for num cells + /* flush_result has the # cells flushed */ + if (flush_result > 0) { + update_socket_written(&socket_table, chan, flush_result * + (CELL_MAX_NETWORK_SIZE + TLS_PER_CELL_OVERHEAD)); + } + /* XXX What if we didn't flush? */ + } + + /* Decide what to do with the channel now */ + + if (!channel_more_to_flush(chan) && + !socket_can_write(&socket_table, chan)) { + + /* Case 1: no more cells to send, and cannot write */ + + /* + * You might think we should put the channel in SCHED_CHAN_IDLE. And + * you're probably correct. While implementing KIST, we found that the + * scheduling system would sometimes lose track of channels when we did + * that. We suspect it has to do with the difference between "can't + * write because socket/outbuf is full" and KIST's "can't write because + * we've arbitrarily decided that that's enough for now." Sometimes + * channels run out of cells at the same time they hit their + * kist-imposed write limit and maybe the rest of Tor doesn't put the + * channel back in pending when it is supposed to. + * + * This should be investigated again. It is as simple as changing + * SCHED_CHAN_WAITING_FOR_CELLS to SCHED_CHAN_IDLE and seeing if Tor + * starts having serious throughput issues. Best done in shadow/chutney. + */ + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; + log_debug(LD_SCHED, "chan=%" PRIu64 " now waiting_for_cells", + chan->global_identifier); + } else if (!channel_more_to_flush(chan)) { + + /* Case 2: no more cells to send, but still open for writes */ + + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; + log_debug(LD_SCHED, "chan=%" PRIu64 " now waiting_for_cells", + chan->global_identifier); + } else if (!socket_can_write(&socket_table, chan)) { + + /* Case 3: cells to send, but cannot write */ + + chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; + if (!to_readd) + to_readd = smartlist_new(); + smartlist_add(to_readd, chan); + log_debug(LD_SCHED, "chan=%" PRIu64 " now waiting_to_write", + chan->global_identifier); + } else { + + /* Case 4: cells to send, and still open for writes */ + + chan->scheduler_state = SCHED_CHAN_PENDING; + smartlist_pqueue_add(cp, scheduler_compare_channels, + offsetof(channel_t, sched_heap_idx), chan); + } + } /* End of main scheduling loop */ + + /* Write the outbuf of any channels that still have data */ + HT_FOREACH_FN(outbuf_table_s, &outbuf_table, each_channel_write_to_kernel, + NULL); + free_all_outbuf_info(); + HT_CLEAR(outbuf_table_s, &outbuf_table); + + log_debug(LD_SCHED, "len pending=%d, len to_readd=%d", + smartlist_len(cp), + (to_readd ? smartlist_len(to_readd) : -1)); + + /* Readd any channels we need to */ + if (to_readd) { + SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) { + readd_chan->scheduler_state = SCHED_CHAN_PENDING; + if (!smartlist_contains(cp, readd_chan)) { + smartlist_pqueue_add(cp, scheduler_compare_channels, + offsetof(channel_t, sched_heap_idx), readd_chan); + } + } SMARTLIST_FOREACH_END(readd_chan); + smartlist_free(to_readd); + } + + tor_gettimeofday(&scheduler_last_run); +} + +/***************************************************************************** + * Externally called function implementations not called through scheduler_t + *****************************************************************************/ + +/* Return the KIST scheduler object. If it didn't exists, return a newly + * allocated one but init() is not called. */ +scheduler_t * +get_kist_scheduler(void) +{ + if (!kist_scheduler) { + log_debug(LD_SCHED, "Allocating kist scheduler struct"); + kist_scheduler = tor_malloc_zero(sizeof(*kist_scheduler)); + kist_scheduler->free_all = kist_free_all; + kist_scheduler->on_channel_free = kist_on_channel_free; + kist_scheduler->init = kist_scheduler_init; + kist_scheduler->on_new_consensus = kist_scheduler_on_new_consensus; + kist_scheduler->schedule = kist_scheduler_schedule; + kist_scheduler->run = kist_scheduler_run; + kist_scheduler->on_new_options = kist_scheduler_on_new_options; + } + return kist_scheduler; +} + +/* Default interval that KIST runs (in ms). */ +#define KIST_SCHED_RUN_INTERVAL_DEFAULT 10 +/* Minimum interval that KIST runs. This value disables KIST. */ +#define KIST_SCHED_RUN_INTERVAL_MIN 0 +/* Maximum interval that KIST runs (in ms). */ +#define KIST_SCHED_RUN_INTERVAL_MAX 100 + +/* Check the torrc for the configured KIST scheduler run frequency. + * - If torrc < 0, then return the negative torrc value (shouldn't even be + * using KIST) + * - If torrc > 0, then return the positive torrc value (should use KIST, and + * should use the set value) + * - If torrc == 0, then look in the consensus for what the value should be. + * - If == 0, then return -1 (don't use KIST) + * - If > 0, then return the positive consensus value + * - If consensus doesn't say anything, return 10 milliseconds + */ +int32_t +kist_scheduler_run_interval(const networkstatus_t *ns) +{ + int32_t run_interval = (int32_t)get_options()->KISTSchedRunInterval; + if (run_interval != 0) { + log_debug(LD_SCHED, "Found KISTSchedRunInterval in torrc. Using that."); + return run_interval; + } + + log_debug(LD_SCHED, "Turning to the consensus for KISTSchedRunInterval"); + run_interval = networkstatus_get_param(ns, "KISTSchedRunInterval", + KIST_SCHED_RUN_INTERVAL_DEFAULT, + KIST_SCHED_RUN_INTERVAL_MIN, + KIST_SCHED_RUN_INTERVAL_MAX); + if (run_interval <= 0) + return -1; + return run_interval; +} + diff --git a/src/or/scheduler_vanilla.c b/src/or/scheduler_vanilla.c index c140849538..a5e104e97b 100644 --- a/src/or/scheduler_vanilla.c +++ b/src/or/scheduler_vanilla.c @@ -1,5 +1,191 @@ /* Copyright (c) 2017, The Tor Project, Inc. */ /* See LICENSE for licensing information */ +#include <event2/event.h> + +#include "or.h" +#include "config.h" +#define TOR_CHANNEL_INTERNAL_ +#include "channel.h" +#define SCHEDULER_PRIVATE_ #include "scheduler.h" +/***************************************************************************** + * Other internal data + *****************************************************************************/ + +/* Maximum cells to flush in a single call to channel_flush_some_cells(); */ +#define MAX_FLUSH_CELLS 1000 + +static scheduler_t *vanilla_scheduler = NULL; + +/***************************************************************************** + * Externally called function implementations + *****************************************************************************/ + +/* Return true iff the scheduler has work to perform. */ +static int +have_work(void) +{ + smartlist_t *cp = get_channels_pending(); + tor_assert(cp); + return smartlist_len(cp) > 0; +} + +/** Retrigger the scheduler in a way safe to use from the callback */ + +static void +vanilla_scheduler_schedule(void) +{ + if (!have_work()) { + return; + } + struct event *ev = get_run_sched_ev(); + tor_assert(ev); + event_active(ev, EV_TIMEOUT, 1); +} + +static void +vanilla_scheduler_run(void) +{ + int n_cells, n_chans_before, n_chans_after; + ssize_t flushed, flushed_this_time; + smartlist_t *cp = get_channels_pending(); + smartlist_t *to_readd = NULL; + channel_t *chan = NULL; + + log_debug(LD_SCHED, "We have a chance to run the scheduler"); + + n_chans_before = smartlist_len(cp); + + while (smartlist_len(cp) > 0) { + /* Pop off a channel */ + chan = smartlist_pqueue_pop(cp, + scheduler_compare_channels, + offsetof(channel_t, sched_heap_idx)); + tor_assert(chan); + + /* Figure out how many cells we can write */ + n_cells = channel_num_cells_writeable(chan); + if (n_cells > 0) { + log_debug(LD_SCHED, + "Scheduler saw pending channel " U64_FORMAT " at %p with " + "%d cells writeable", + U64_PRINTF_ARG(chan->global_identifier), chan, n_cells); + + flushed = 0; + while (flushed < n_cells) { + flushed_this_time = + channel_flush_some_cells(chan, + MIN(MAX_FLUSH_CELLS, (size_t) n_cells - flushed)); + if (flushed_this_time <= 0) break; + flushed += flushed_this_time; + } + + if (flushed < n_cells) { + /* We ran out of cells to flush */ + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "entered waiting_for_cells from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + /* The channel may still have some cells */ + if (channel_more_to_flush(chan)) { + /* The channel goes to either pending or waiting_to_write */ + if (channel_num_cells_writeable(chan) > 0) { + /* Add it back to pending later */ + if (!to_readd) to_readd = smartlist_new(); + smartlist_add(to_readd, chan); + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "is still pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + /* It's waiting to be able to write more */ + chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "entered waiting_to_write from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } + } else { + /* No cells left; it can go to idle or waiting_for_cells */ + if (channel_num_cells_writeable(chan) > 0) { + /* + * It can still accept writes, so it goes to + * waiting_for_cells + */ + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "entered waiting_for_cells from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + /* + * We exactly filled up the output queue with all available + * cells; go to idle. + */ + chan->scheduler_state = SCHED_CHAN_IDLE; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "become idle from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } + } + } + + log_debug(LD_SCHED, + "Scheduler flushed %d cells onto pending channel " + U64_FORMAT " at %p", + (int)flushed, U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + log_info(LD_SCHED, + "Scheduler saw pending channel " U64_FORMAT " at %p with " + "no cells writeable", + U64_PRINTF_ARG(chan->global_identifier), chan); + /* Put it back to WAITING_TO_WRITE */ + chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; + } + } + + /* Readd any channels we need to */ + if (to_readd) { + SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) { + readd_chan->scheduler_state = SCHED_CHAN_PENDING; + smartlist_pqueue_add(cp, + scheduler_compare_channels, + offsetof(channel_t, sched_heap_idx), + readd_chan); + } SMARTLIST_FOREACH_END(readd_chan); + smartlist_free(to_readd); + } + + n_chans_after = smartlist_len(cp); + log_debug(LD_SCHED, "Scheduler handled %d of %d pending channels", + n_chans_before - n_chans_after, n_chans_before); +} + +scheduler_t * +get_vanilla_scheduler(void) +{ + if (!vanilla_scheduler) { + log_debug(LD_SCHED, "Initializing vanilla scheduler struct"); + vanilla_scheduler = tor_malloc_zero(sizeof(*vanilla_scheduler)); + vanilla_scheduler->free_all = NULL; + vanilla_scheduler->on_channel_free = NULL; + vanilla_scheduler->init = NULL; + vanilla_scheduler->on_new_consensus = NULL; + vanilla_scheduler->schedule = vanilla_scheduler_schedule; + vanilla_scheduler->run = vanilla_scheduler_run; + vanilla_scheduler->on_new_options = NULL; + } + return vanilla_scheduler; +} + |