aboutsummaryrefslogtreecommitdiff
path: root/src/or
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2017-09-15 12:00:50 -0400
committerNick Mathewson <nickm@torproject.org>2017-09-15 12:00:50 -0400
commit0f4f40b70fe6ea16a43940f86db767e1a16a4f6e (patch)
tree59ba8fa1332fa127ade5882da3836809bd540fea /src/or
parent962b0b849bf0c2fcca387e334985b349a255de0a (diff)
parent06500171434dca543e3daf74ce7033a0aef3d199 (diff)
downloadtor-0f4f40b70fe6ea16a43940f86db767e1a16a4f6e.tar.gz
tor-0f4f40b70fe6ea16a43940f86db767e1a16a4f6e.zip
Merge remote-tracking branch 'dgoulet/ticket12541_032_02'
Diffstat (limited to 'src/or')
-rw-r--r--src/or/channel.c8
-rw-r--r--src/or/channel.h2
-rw-r--r--src/or/config.c91
-rw-r--r--src/or/include.am2
-rw-r--r--src/or/networkstatus.c19
-rw-r--r--src/or/networkstatus.h7
-rw-r--r--src/or/or.h28
-rw-r--r--src/or/scheduler.c735
-rw-r--r--src/or/scheduler.h203
-rw-r--r--src/or/scheduler_kist.c774
-rw-r--r--src/or/scheduler_vanilla.c196
11 files changed, 1578 insertions, 487 deletions
diff --git a/src/or/channel.c b/src/or/channel.c
index 56eeccc2a7..44152ff457 100644
--- a/src/or/channel.c
+++ b/src/or/channel.c
@@ -2603,8 +2603,8 @@ channel_flush_cells(channel_t *chan)
* available.
*/
-int
-channel_more_to_flush(channel_t *chan)
+MOCK_IMPL(int,
+channel_more_to_flush, (channel_t *chan))
{
tor_assert(chan);
@@ -4841,8 +4841,6 @@ channel_update_xmit_queue_size(channel_t *chan)
U64_FORMAT ", new size is " U64_FORMAT,
U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(estimated_total_queue_size));
- /* Tell the scheduler we're increasing the queue size */
- scheduler_adjust_queue_size(chan, 1, adj);
}
} else if (queued < chan->bytes_queued_for_xmit) {
adj = chan->bytes_queued_for_xmit - queued;
@@ -4865,8 +4863,6 @@ channel_update_xmit_queue_size(channel_t *chan)
U64_FORMAT ", new size is " U64_FORMAT,
U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
U64_PRINTF_ARG(estimated_total_queue_size));
- /* Tell the scheduler we're decreasing the queue size */
- scheduler_adjust_queue_size(chan, -1, adj);
}
}
}
diff --git a/src/or/channel.h b/src/or/channel.h
index 2d0ec39924..8c776c53f2 100644
--- a/src/or/channel.h
+++ b/src/or/channel.h
@@ -568,7 +568,7 @@ MOCK_DECL(ssize_t, channel_flush_some_cells,
(channel_t *chan, ssize_t num_cells));
/* Query if data available on this channel */
-int channel_more_to_flush(channel_t *chan);
+MOCK_DECL(int, channel_more_to_flush, (channel_t *chan));
/* Notify flushed outgoing for dirreq handling */
void channel_notify_flushed(channel_t *chan);
diff --git a/src/or/config.c b/src/or/config.c
index a5bda8be00..4a1361f9f4 100644
--- a/src/or/config.c
+++ b/src/or/config.c
@@ -488,9 +488,12 @@ static config_var_t option_vars_[] = {
V(ServerDNSSearchDomains, BOOL, "0"),
V(ServerDNSTestAddresses, CSV,
"www.google.com,www.mit.edu,www.yahoo.com,www.slashdot.org"),
- V(SchedulerLowWaterMark__, MEMUNIT, "100 MB"),
- V(SchedulerHighWaterMark__, MEMUNIT, "101 MB"),
- V(SchedulerMaxFlushCells__, UINT, "1000"),
+ OBSOLETE("SchedulerLowWaterMark__"),
+ OBSOLETE("SchedulerHighWaterMark__"),
+ OBSOLETE("SchedulerMaxFlushCells__"),
+ V(KISTSchedRunInterval, MSEC_INTERVAL, "0 msec"),
+ V(KISTSockBufSizeFactor, DOUBLE, "1.0"),
+ V(Schedulers, CSV, "KIST,KISTLite,Vanilla"),
V(ShutdownWaitLength, INTERVAL, "30 seconds"),
OBSOLETE("SocksListenAddress"),
V(SocksPolicy, LINELIST, NULL),
@@ -918,6 +921,10 @@ or_options_free(or_options_t *options)
rs, routerset_free(rs));
smartlist_free(options->NodeFamilySets);
}
+ if (options->SchedulerTypes_) {
+ SMARTLIST_FOREACH(options->SchedulerTypes_, int *, i, tor_free(i));
+ smartlist_free(options->SchedulerTypes_);
+ }
tor_free(options->BridgePassword_AuthDigest_);
tor_free(options->command_arg);
tor_free(options->master_key_fname);
@@ -1828,11 +1835,9 @@ options_act(const or_options_t *old_options)
return -1;
}
- /* Set up scheduler thresholds */
- scheduler_set_watermarks((uint32_t)options->SchedulerLowWaterMark__,
- (uint32_t)options->SchedulerHighWaterMark__,
- (options->SchedulerMaxFlushCells__ > 0) ?
- options->SchedulerMaxFlushCells__ : 1000);
+ /* Inform the scheduler subsystem that a configuration changed happened. It
+ * might be a change of scheduler or parameter. */
+ scheduler_conf_changed();
/* Set up accounting */
if (accounting_parse_options(options, 0)<0) {
@@ -2928,6 +2933,61 @@ warn_about_relative_paths(or_options_t *options)
return n != 0;
}
+/* Validate options related to the scheduler. From the Schedulers list, the
+ * SchedulerTypes_ list is created with int values so once we select the
+ * scheduler, which can happen anytime at runtime, we don't have to parse
+ * strings and thus be quick.
+ *
+ * Return 0 on success else -1 and msg is set with an error message. */
+static int
+options_validate_scheduler(or_options_t *options, char **msg)
+{
+ tor_assert(options);
+ tor_assert(msg);
+
+ if (!options->Schedulers || smartlist_len(options->Schedulers) == 0) {
+ REJECT("Empty Schedulers list. Either remove the option so the defaults "
+ "can be used or set at least one value.");
+ }
+ /* Ok, we do have scheduler types, validate them. */
+ options->SchedulerTypes_ = smartlist_new();
+ SMARTLIST_FOREACH_BEGIN(options->Schedulers, const char *, type) {
+ int *sched_type;
+ if (!strcasecmp("KISTLite", type)) {
+ sched_type = tor_malloc_zero(sizeof(int));
+ *sched_type = SCHEDULER_KIST_LITE;
+ smartlist_add(options->SchedulerTypes_, sched_type);
+ } else if (!strcasecmp("KIST", type)) {
+ sched_type = tor_malloc_zero(sizeof(int));
+ *sched_type = SCHEDULER_KIST;
+ smartlist_add(options->SchedulerTypes_, sched_type);
+ } else if (!strcasecmp("Vanilla", type)) {
+ sched_type = tor_malloc_zero(sizeof(int));
+ *sched_type = SCHEDULER_VANILLA;
+ smartlist_add(options->SchedulerTypes_, sched_type);
+ } else {
+ tor_asprintf(msg, "Unknown type %s in option Schedulers. "
+ "Possible values are KIST, KISTLite and Vanilla.",
+ escaped(type));
+ return -1;
+ }
+ } SMARTLIST_FOREACH_END(type);
+
+ if (options->KISTSockBufSizeFactor < 0) {
+ REJECT("KISTSockBufSizeFactor must be at least 0");
+ }
+
+ /* Don't need to validate that the Interval is less than anything because
+ * zero is valid and all negative values are valid. */
+ if (options->KISTSchedRunInterval > KIST_SCHED_RUN_INTERVAL_MAX) {
+ tor_asprintf(msg, "KISTSchedRunInterval must not be more than %d (ms)",
+ KIST_SCHED_RUN_INTERVAL_MAX);
+ return -1;
+ }
+
+ return 0;
+}
+
/* Validate options related to single onion services.
* Modifies some options that are incompatible with single onion services.
* On failure returns -1, and sets *msg to an error string.
@@ -3156,17 +3216,6 @@ options_validate(or_options_t *old_options, or_options_t *options,
routerset_union(options->ExcludeExitNodesUnion_,options->ExcludeNodes);
}
- if (options->SchedulerLowWaterMark__ == 0 ||
- options->SchedulerLowWaterMark__ > UINT32_MAX) {
- log_warn(LD_GENERAL, "Bad SchedulerLowWaterMark__ option");
- return -1;
- } else if (options->SchedulerHighWaterMark__ <=
- options->SchedulerLowWaterMark__ ||
- options->SchedulerHighWaterMark__ > UINT32_MAX) {
- log_warn(LD_GENERAL, "Bad SchedulerHighWaterMark option");
- return -1;
- }
-
if (options->NodeFamilies) {
options->NodeFamilySets = smartlist_new();
for (cl = options->NodeFamilies; cl; cl = cl->next) {
@@ -4285,6 +4334,10 @@ options_validate(or_options_t *old_options, or_options_t *options,
REJECT("BridgeRelay is 1, ORPort is not set. This is an invalid "
"combination.");
+ if (options_validate_scheduler(options, msg) < 0) {
+ return -1;
+ }
+
return 0;
}
diff --git a/src/or/include.am b/src/or/include.am
index 021f5f9d5d..7216aba9af 100644
--- a/src/or/include.am
+++ b/src/or/include.am
@@ -99,6 +99,8 @@ LIBTOR_A_SOURCES = \
src/or/routerparse.c \
src/or/routerset.c \
src/or/scheduler.c \
+ src/or/scheduler_kist.c \
+ src/or/scheduler_vanilla.c \
src/or/statefile.c \
src/or/status.c \
src/or/torcert.c \
diff --git a/src/or/networkstatus.c b/src/or/networkstatus.c
index 7953af1cc7..00d9222ef1 100644
--- a/src/or/networkstatus.c
+++ b/src/or/networkstatus.c
@@ -61,6 +61,7 @@
#include "router.h"
#include "routerlist.h"
#include "routerparse.h"
+#include "scheduler.h"
#include "shared_random.h"
#include "transports.h"
#include "torcert.h"
@@ -1561,6 +1562,15 @@ notify_control_networkstatus_changed(const networkstatus_t *old_c,
smartlist_free(changed);
}
+/* Called when the consensus has changed from old_c to new_c. */
+static void
+notify_networkstatus_changed(const networkstatus_t *old_c,
+ const networkstatus_t *new_c)
+{
+ notify_control_networkstatus_changed(old_c, new_c);
+ scheduler_notify_networkstatus_changed(old_c, new_c);
+}
+
/** Copy all the ancillary information (like router download status and so on)
* from <b>old_c</b> to <b>new_c</b>. */
static void
@@ -1886,8 +1896,7 @@ networkstatus_set_current_consensus(const char *consensus,
const int is_usable_flavor = flav == usable_consensus_flavor();
if (is_usable_flavor) {
- notify_control_networkstatus_changed(
- networkstatus_get_latest_consensus(), c);
+ notify_networkstatus_changed(networkstatus_get_latest_consensus(), c);
}
if (flav == FLAV_NS) {
if (current_ns_consensus) {
@@ -2314,9 +2323,9 @@ get_net_param_from_list(smartlist_t *net_params, const char *param_name,
* Make sure the value parsed from the consensus is at least
* <b>min_val</b> and at most <b>max_val</b> and raise/cap the parsed value
* if necessary. */
-int32_t
-networkstatus_get_param(const networkstatus_t *ns, const char *param_name,
- int32_t default_val, int32_t min_val, int32_t max_val)
+MOCK_IMPL(int32_t,
+networkstatus_get_param, (const networkstatus_t *ns, const char *param_name,
+ int32_t default_val, int32_t min_val, int32_t max_val))
{
if (!ns) /* if they pass in null, go find it ourselves */
ns = networkstatus_get_latest_consensus();
diff --git a/src/or/networkstatus.h b/src/or/networkstatus.h
index adaa108904..89ff7eaba6 100644
--- a/src/or/networkstatus.h
+++ b/src/or/networkstatus.h
@@ -109,10 +109,9 @@ void signed_descs_update_status_from_consensus_networkstatus(
char *networkstatus_getinfo_helper_single(const routerstatus_t *rs);
char *networkstatus_getinfo_by_purpose(const char *purpose_string, time_t now);
void networkstatus_dump_bridge_status_to_file(time_t now);
-int32_t networkstatus_get_param(const networkstatus_t *ns,
- const char *param_name,
- int32_t default_val, int32_t min_val,
- int32_t max_val);
+MOCK_DECL(int32_t, networkstatus_get_param,
+ (const networkstatus_t *ns, const char *param_name,
+ int32_t default_val, int32_t min_val, int32_t max_val));
int32_t networkstatus_get_overridable_param(const networkstatus_t *ns,
int32_t torrc_value,
const char *param_name,
diff --git a/src/or/or.h b/src/or/or.h
index b3c4ed8293..72fdea17ea 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -4548,19 +4548,6 @@ typedef struct {
/** How long (seconds) do we keep a guard before picking a new one? */
int GuardLifetime;
- /** Low-water mark for global scheduler - start sending when estimated
- * queued size falls below this threshold.
- */
- uint64_t SchedulerLowWaterMark__;
- /** High-water mark for global scheduler - stop sending when estimated
- * queued size exceeds this threshold.
- */
- uint64_t SchedulerHighWaterMark__;
- /** Flush size for global scheduler - flush this many cells at a time
- * when sending.
- */
- int SchedulerMaxFlushCells__;
-
/** Is this an exit node? This is a tristate, where "1" means "yes, and use
* the default exit policy if none is given" and "0" means "no; exit policy
* is 'reject *'" and "auto" (-1) means "same as 1, but warn the user."
@@ -4633,6 +4620,21 @@ typedef struct {
/** Bool (default: 0). Tells Tor to never try to exec another program.
*/
int NoExec;
+
+ /** Have the KIST scheduler run every X milliseconds. If less than zero, do
+ * not use the KIST scheduler but use the old vanilla scheduler instead. If
+ * zero, do what the consensus says and fall back to using KIST as if this is
+ * set to "10 msec" if the consensus doesn't say anything. */
+ int64_t KISTSchedRunInterval;
+
+ /** A multiplier for the KIST per-socket limit calculation. */
+ double KISTSockBufSizeFactor;
+
+ /** The list of scheduler type string ordered by priority that is first one
+ * has to be tried first. Default: KIST,KISTLite,Vanilla */
+ smartlist_t *Schedulers;
+ /* An ordered list of scheduler_types mapped from Schedulers. */
+ smartlist_t *SchedulerTypes_;
} or_options_t;
/** Persistent state for an onion router, as saved to disk. */
diff --git a/src/or/scheduler.c b/src/or/scheduler.c
index 0d31c7d58c..4a9f3dcaf6 100644
--- a/src/or/scheduler.c
+++ b/src/or/scheduler.c
@@ -2,45 +2,50 @@
/* See LICENSE for licensing information */
#include "or.h"
-
-#define TOR_CHANNEL_INTERNAL_ /* For channel_flush_some_cells() */
-#include "channel.h"
+#include "config.h"
#include "compat_libevent.h"
#define SCHEDULER_PRIVATE_
+#define SCHEDULER_KIST_PRIVATE
#include "scheduler.h"
#include <event2/event.h>
-/*
- * Scheduler high/low watermarks
- */
-
-static uint32_t sched_q_low_water = 16384;
-static uint32_t sched_q_high_water = 32768;
-
-/*
- * Maximum cells to flush in a single call to channel_flush_some_cells();
- * setting this low means more calls, but too high and we could overshoot
- * sched_q_high_water.
- */
-
-static uint32_t sched_max_flush_cells = 16;
-
/**
* \file scheduler.c
* \brief Channel scheduling system: decides which channels should send and
* receive when.
*
- * This module implements a scheduler algorithm, to decide
- * which channels should send/receive when.
+ * This module is the global/common parts of the scheduling system. This system
+ * is what decides what channels get to send cells on their circuits and when.
+ *
+ * Terms:
+ * - "Scheduling system": the collection of scheduler*.{h,c} files and their
+ * aggregate behavior.
+ * - "Scheduler implementation": a scheduler_t. The scheduling system has one
+ * active scheduling implementation at a time.
+ *
+ * In this file you will find state that any scheduler implmentation can have
+ * access to as well as the functions the rest of Tor uses to interact with the
+ * scheduling system.
*
* The earliest versions of Tor approximated a kind of round-robin system
- * among active connections, but only approximated it.
+ * among active connections, but only approximated it. It would only consider
+ * one connection (roughly equal to a channel in today's terms) at a time, and
+ * thus could only prioritize circuits against others on the same connection.
+ *
+ * Then in response to the KIST paper[0], Tor implemented a global
+ * circuit scheduler. It was supposed to prioritize circuits across man
+ * channels, but wasn't effective. It is preserved in scheduler_vanilla.c.
*
- * Now, write scheduling works by keeping track of which channels can
- * accept cells, and have cells to write. From the scheduler's perspective,
- * a channel can be in four possible states:
+ * [0]: http://www.robgjansen.com/publications/kist-sec2014.pdf
+ *
+ * Then we actually got around to implementing KIST for real. We decided to
+ * modularize the scheduler so new ones can be implemented. You can find KIST
+ * in scheduler_kist.c.
+ *
+ * Channels have one of four scheduling states based on whether or not they
+ * have cells to send and whether or not they are able to send.
*
* <ol>
* <li>
@@ -125,85 +130,96 @@ static uint32_t sched_max_flush_cells = 16;
* </ol>
*
* Other event-driven parts of the code move channels between these scheduling
- * states by calling scheduler functions; the scheduler only runs on open-for-
- * writes/has-cells channels and is the only path for those to transition to
- * other states. The scheduler_run() function gives us the opportunity to do
- * scheduling work, and is called from other scheduler functions whenever a
- * state transition occurs, and periodically from the main event loop.
+ * states by calling scheduler functions. The scheduling system builds up a
+ * list of channels in the SCHED_CHAN_PENDING state that the scheduler
+ * implementation should then use when it runs. Scheduling implementations need
+ * to properly update channel states during their scheduler_t->run() function
+ * as that is the only opportunity for channels to move from SCHED_CHAN_PENDING
+ * to any other state.
+ *
+ * The remainder of this file is a small amount of state that any scheduler
+ * implementation should have access to, and the functions the rest of Tor uses
+ * to interact with the scheduling system.
*/
-/* Scheduler global data structures */
+/*****************************************************************************
+ * Scheduling system state
+ *
+ * State that can be accessed from any scheduler implementation (but not
+ * outside the scheduling system)
+ *****************************************************************************/
+
+STATIC const scheduler_t *the_scheduler;
/*
* We keep a list of channels that are pending - i.e, have cells to write
- * and can accept them to send. The enum scheduler_state in channel_t
+ * and can accept them to send. The enum scheduler_state in channel_t
* is reserved for our use.
+ *
+ * Priority queue of channels that can write and have cells (pending work)
*/
-
-/* Pqueue of channels that can write and have cells (pending work) */
STATIC smartlist_t *channels_pending = NULL;
/*
* This event runs the scheduler from its callback, and is manually
* activated whenever a channel enters open for writes/cells to send.
*/
-
STATIC struct event *run_sched_ev = NULL;
-/*
- * Queue heuristic; this is not the queue size, but an 'effective queuesize'
- * that ages out contributions from stalled channels.
- */
-
-STATIC uint64_t queue_heuristic = 0;
+/*****************************************************************************
+ * Scheduling system static function definitions
+ *
+ * Functions that can only be accessed from this file.
+ *****************************************************************************/
/*
- * Timestamp for last queue heuristic update
+ * Scheduler event callback; this should get triggered once per event loop
+ * if any scheduling work was created during the event loop.
*/
+static void
+scheduler_evt_callback(evutil_socket_t fd, short events, void *arg)
+{
+ (void) fd;
+ (void) events;
+ (void) arg;
-STATIC time_t queue_heuristic_timestamp = 0;
-
-/* Scheduler static function declarations */
+ log_debug(LD_SCHED, "Scheduler event callback called");
-static void scheduler_evt_callback(evutil_socket_t fd,
- short events, void *arg);
-static int scheduler_more_work(void);
-static void scheduler_retrigger(void);
-#if 0
-static void scheduler_trigger(void);
-#endif
+ /* Run the scheduler. This is a mandatory function. */
-/* Scheduler function implementations */
+ /* We might as well assert on this. If this function doesn't exist, no cells
+ * are getting scheduled. Things are very broken. scheduler_t says the run()
+ * function is mandatory. */
+ tor_assert(the_scheduler->run);
+ the_scheduler->run();
-/** Free everything and shut down the scheduling system */
+ /* Schedule itself back in if it has more work. */
-void
-scheduler_free_all(void)
-{
- log_debug(LD_SCHED, "Shutting down scheduler");
+ /* Again, might as well assert on this mandatory scheduler_t function. If it
+ * doesn't exist, there's no way to tell libevent to run the scheduler again
+ * in the future. */
+ tor_assert(the_scheduler->schedule);
+ the_scheduler->schedule();
+}
- if (run_sched_ev) {
- if (event_del(run_sched_ev) < 0) {
- log_warn(LD_BUG, "Problem deleting run_sched_ev");
- }
- tor_event_free(run_sched_ev);
- run_sched_ev = NULL;
- }
+/*****************************************************************************
+ * Scheduling system private function definitions
+ *
+ * Functions that can only be accessed from scheduler*.c
+ *****************************************************************************/
- if (channels_pending) {
- smartlist_free(channels_pending);
- channels_pending = NULL;
- }
+/* Return the pending channel list. */
+smartlist_t *
+get_channels_pending(void)
+{
+ return channels_pending;
}
-/**
- * Comparison function to use when sorting pending channels
- */
-
-MOCK_IMPL(STATIC int,
+/* Comparison function to use when sorting pending channels */
+MOCK_IMPL(int,
scheduler_compare_channels, (const void *c1_v, const void *c2_v))
{
- channel_t *c1 = NULL, *c2 = NULL;
+ const channel_t *c1 = NULL, *c2 = NULL;
/* These are a workaround for -Wbad-function-cast throwing a fit */
const circuitmux_policy_t *p1, *p2;
uintptr_t p1_i, p2_i;
@@ -211,11 +227,8 @@ scheduler_compare_channels, (const void *c1_v, const void *c2_v))
tor_assert(c1_v);
tor_assert(c2_v);
- c1 = (channel_t *)(c1_v);
- c2 = (channel_t *)(c2_v);
-
- tor_assert(c1);
- tor_assert(c2);
+ c1 = (const channel_t *)(c1_v);
+ c2 = (const channel_t *)(c2_v);
if (c1 != c2) {
if (circuitmux_get_policy(c1->cmux) ==
@@ -242,26 +255,158 @@ scheduler_compare_channels, (const void *c1_v, const void *c2_v))
}
}
+/*****************************************************************************
+ * Scheduling system global functions
+ *
+ * Functions that can be accessed from anywhere in Tor.
+ *****************************************************************************/
+
+/* Using the global options, select the scheduler we should be using. */
+static void
+select_scheduler(void)
+{
+ const char *chosen_sched_type = NULL;
+
+#ifdef TOR_UNIT_TESTS
+ /* This is hella annoying to set in the options for every test that passes
+ * through the scheduler and there are many so if we don't explicitely have
+ * a list of types set, just put the vanilla one. */
+ if (get_options()->SchedulerTypes_ == NULL) {
+ the_scheduler = get_vanilla_scheduler();
+ return;
+ }
+#endif
+
+ /* This list is ordered that is first entry has the first priority. Thus, as
+ * soon as we find a scheduler type that we can use, we use it and stop. */
+ SMARTLIST_FOREACH_BEGIN(get_options()->SchedulerTypes_, int *, type) {
+ switch (*type) {
+ case SCHEDULER_VANILLA:
+ the_scheduler = get_vanilla_scheduler();
+ chosen_sched_type = "Vanilla";
+ goto end;
+ case SCHEDULER_KIST:
+ if (!scheduler_can_use_kist()) {
+#ifdef HAVE_KIST_SUPPORT
+ if (get_options()->KISTSchedRunInterval == -1) {
+ log_info(LD_SCHED, "Scheduler type KIST can not be used. It is "
+ "disabled because KISTSchedRunInterval=-1");
+ } else {
+ log_notice(LD_SCHED, "Scheduler type KIST has been disabled by "
+ "the consensus.");
+ }
+#else /* HAVE_KIST_SUPPORT */
+ log_info(LD_SCHED, "Scheduler type KIST not built in");
+#endif /* HAVE_KIST_SUPPORT */
+ continue;
+ }
+ the_scheduler = get_kist_scheduler();
+ chosen_sched_type = "KIST";
+ scheduler_kist_set_full_mode();
+ goto end;
+ case SCHEDULER_KIST_LITE:
+ chosen_sched_type = "KISTLite";
+ the_scheduler = get_kist_scheduler();
+ scheduler_kist_set_lite_mode();
+ goto end;
+ default:
+ /* Our option validation should have caught this. */
+ tor_assert_unreached();
+ }
+ } SMARTLIST_FOREACH_END(type);
+
+ end:
+ log_notice(LD_CONFIG, "Scheduler type %s has been enabled.",
+ chosen_sched_type);
+}
+
/*
- * Scheduler event callback; this should get triggered once per event loop
- * if any scheduling work was created during the event loop.
+ * Little helper function called from a few different places. It changes the
+ * scheduler implementation, if necessary. And if it did, it then tells the
+ * old one to free its state and the new one to initialize.
*/
-
static void
-scheduler_evt_callback(evutil_socket_t fd, short events, void *arg)
+set_scheduler(void)
{
- (void)fd;
- (void)events;
- (void)arg;
- log_debug(LD_SCHED, "Scheduler event callback called");
+ const scheduler_t *old_scheduler = the_scheduler;
- tor_assert(run_sched_ev);
+ /* From the options, select the scheduler type to set. */
+ select_scheduler();
+
+ if (old_scheduler != the_scheduler) {
+ /* Allow the old scheduler to clean up, if needed. */
+ if (old_scheduler && old_scheduler->free_all) {
+ old_scheduler->free_all();
+ }
+ /* We don't clean up the old scheduler_t. We keep any type of scheduler
+ * we've allocated so we can do an easy switch back. */
+
+ /* Initialize the new scheduler. */
+ if (the_scheduler->init) {
+ the_scheduler->init();
+ }
+ }
+}
+
+/*
+ * This is how the scheduling system is notified of Tor's configuration
+ * changing. For example: a SIGHUP was issued.
+ */
+void
+scheduler_conf_changed(void)
+{
+ /* Let the scheduler decide what it should do. */
+ set_scheduler();
+
+ /* Then tell the (possibly new) scheduler that we have new options. */
+ if (the_scheduler->on_new_options) {
+ the_scheduler->on_new_options();
+ }
+}
- /* Run the scheduler */
- scheduler_run();
+/*
+ * Whenever we get a new consensus, this function is called.
+ */
+void
+scheduler_notify_networkstatus_changed(const networkstatus_t *old_c,
+ const networkstatus_t *new_c)
+{
+ /* Then tell the (possibly new) scheduler that we have a new consensus */
+ if (the_scheduler->on_new_consensus) {
+ the_scheduler->on_new_consensus(old_c, new_c);
+ }
+ /* Maybe the consensus param made us change the scheduler. */
+ set_scheduler();
+}
- /* Do we have more work to do? */
- if (scheduler_more_work()) scheduler_retrigger();
+/*
+ * Free everything scheduling-related from main.c. Note this is only called
+ * when Tor is shutting down, while scheduler_t->free_all() is called both when
+ * Tor is shutting down and when we are switching schedulers.
+ */
+void
+scheduler_free_all(void)
+{
+ log_debug(LD_SCHED, "Shutting down scheduler");
+
+ if (run_sched_ev) {
+ if (event_del(run_sched_ev) < 0) {
+ log_warn(LD_BUG, "Problem deleting run_sched_ev");
+ }
+ tor_event_free(run_sched_ev);
+ run_sched_ev = NULL;
+ }
+
+ if (channels_pending) {
+ /* We don't have ownership of the object in this list. */
+ smartlist_free(channels_pending);
+ channels_pending = NULL;
+ }
+
+ if (the_scheduler && the_scheduler->free_all) {
+ the_scheduler->free_all();
+ }
+ the_scheduler = NULL;
}
/** Mark a channel as no longer ready to accept writes */
@@ -269,9 +414,12 @@ scheduler_evt_callback(evutil_socket_t fd, short events, void *arg)
MOCK_IMPL(void,
scheduler_channel_doesnt_want_writes,(channel_t *chan))
{
- tor_assert(chan);
-
- tor_assert(channels_pending);
+ IF_BUG_ONCE(!chan) {
+ return;
+ }
+ IF_BUG_ONCE(!channels_pending) {
+ return;
+ }
/* If it's already in pending, we can put it in waiting_to_write */
if (chan->scheduler_state == SCHED_CHAN_PENDING) {
@@ -309,10 +457,12 @@ scheduler_channel_doesnt_want_writes,(channel_t *chan))
MOCK_IMPL(void,
scheduler_channel_has_waiting_cells,(channel_t *chan))
{
- int became_pending = 0;
-
- tor_assert(chan);
- tor_assert(channels_pending);
+ IF_BUG_ONCE(!chan) {
+ return;
+ }
+ IF_BUG_ONCE(!channels_pending) {
+ return;
+ }
/* First, check if this one also writeable */
if (chan->scheduler_state == SCHED_CHAN_WAITING_FOR_CELLS) {
@@ -330,7 +480,9 @@ scheduler_channel_has_waiting_cells,(channel_t *chan))
"Channel " U64_FORMAT " at %p went from waiting_for_cells "
"to pending",
U64_PRINTF_ARG(chan->global_identifier), chan);
- became_pending = 1;
+ /* If we made a channel pending, we potentially have scheduling work to
+ * do. */
+ the_scheduler->schedule();
} else {
/*
* It's not in waiting_for_cells, so it can't become pending; it's
@@ -345,240 +497,104 @@ scheduler_channel_has_waiting_cells,(channel_t *chan))
U64_PRINTF_ARG(chan->global_identifier), chan);
}
}
+}
- /*
- * If we made a channel pending, we potentially have scheduling work
- * to do.
- */
- if (became_pending) scheduler_retrigger();
+/* Add the scheduler event to the set of pending events with next_run being
+ * the time up to libevent should wait before triggering the event. */
+void
+scheduler_ev_add(const struct timeval *next_run)
+{
+ tor_assert(run_sched_ev);
+ tor_assert(next_run);
+ event_add(run_sched_ev, next_run);
}
-/** Set up the scheduling system */
+/* Make the scheduler event active with the given flags. */
+void
+scheduler_ev_active(int flags)
+{
+ tor_assert(run_sched_ev);
+ event_active(run_sched_ev, flags, 1);
+}
+/*
+ * Initialize everything scheduling-related from config.c. Note this is only
+ * called when Tor is starting up, while scheduler_t->init() is called both
+ * when Tor is starting up and when we are switching schedulers.
+ */
void
scheduler_init(void)
{
log_debug(LD_SCHED, "Initting scheduler");
- tor_assert(!run_sched_ev);
+ // Two '!' because we really do want to check if the pointer is non-NULL
+ IF_BUG_ONCE(!!run_sched_ev) {
+ log_warn(LD_SCHED, "We should not already have a libevent scheduler event."
+ "I'll clean the old one up, but this is odd.");
+ tor_event_free(run_sched_ev);
+ run_sched_ev = NULL;
+ }
run_sched_ev = tor_event_new(tor_libevent_get_base(), -1,
0, scheduler_evt_callback, NULL);
-
channels_pending = smartlist_new();
- queue_heuristic = 0;
- queue_heuristic_timestamp = approx_time();
-}
-
-/** Check if there's more scheduling work */
-
-static int
-scheduler_more_work(void)
-{
- tor_assert(channels_pending);
-
- return ((scheduler_get_queue_heuristic() < sched_q_low_water) &&
- ((smartlist_len(channels_pending) > 0))) ? 1 : 0;
-}
-
-/** Retrigger the scheduler in a way safe to use from the callback */
-static void
-scheduler_retrigger(void)
-{
- tor_assert(run_sched_ev);
- event_active(run_sched_ev, EV_TIMEOUT, 1);
+ set_scheduler();
}
-/** Notify the scheduler of a channel being closed */
-
+/*
+ * If a channel is going away, this is how the scheduling system is informed
+ * so it can do any freeing necessary. This ultimately calls
+ * scheduler_t->on_channel_free() so the current scheduler can release any
+ * state specific to this channel.
+ */
MOCK_IMPL(void,
scheduler_release_channel,(channel_t *chan))
{
- tor_assert(chan);
- tor_assert(channels_pending);
-
- if (chan->scheduler_state == SCHED_CHAN_PENDING) {
- smartlist_pqueue_remove(channels_pending,
- scheduler_compare_channels,
- offsetof(channel_t, sched_heap_idx),
- chan);
+ IF_BUG_ONCE(!chan) {
+ return;
+ }
+ IF_BUG_ONCE(!channels_pending) {
+ return;
}
- chan->scheduler_state = SCHED_CHAN_IDLE;
-}
-
-/** Run the scheduling algorithm if necessary */
-
-MOCK_IMPL(void,
-scheduler_run, (void))
-{
- int n_cells, n_chans_before, n_chans_after;
- uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after;
- ssize_t flushed, flushed_this_time;
- smartlist_t *to_readd = NULL;
- channel_t *chan = NULL;
-
- log_debug(LD_SCHED, "We have a chance to run the scheduler");
-
- if (scheduler_get_queue_heuristic() < sched_q_low_water) {
- n_chans_before = smartlist_len(channels_pending);
- q_len_before = channel_get_global_queue_estimate();
- q_heur_before = scheduler_get_queue_heuristic();
-
- while (scheduler_get_queue_heuristic() <= sched_q_high_water &&
- smartlist_len(channels_pending) > 0) {
- /* Pop off a channel */
- chan = smartlist_pqueue_pop(channels_pending,
- scheduler_compare_channels,
- offsetof(channel_t, sched_heap_idx));
- tor_assert(chan);
-
- /* Figure out how many cells we can write */
- n_cells = channel_num_cells_writeable(chan);
- if (n_cells > 0) {
- log_debug(LD_SCHED,
- "Scheduler saw pending channel " U64_FORMAT " at %p with "
- "%d cells writeable",
- U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
-
- flushed = 0;
- while (flushed < n_cells &&
- scheduler_get_queue_heuristic() <= sched_q_high_water) {
- flushed_this_time =
- channel_flush_some_cells(chan,
- MIN(sched_max_flush_cells,
- (size_t) n_cells - flushed));
- if (flushed_this_time <= 0) break;
- flushed += flushed_this_time;
- }
-
- if (flushed < n_cells) {
- /* We ran out of cells to flush */
- chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
- log_debug(LD_SCHED,
- "Channel " U64_FORMAT " at %p "
- "entered waiting_for_cells from pending",
- U64_PRINTF_ARG(chan->global_identifier),
- chan);
- } else {
- /* The channel may still have some cells */
- if (channel_more_to_flush(chan)) {
- /* The channel goes to either pending or waiting_to_write */
- if (channel_num_cells_writeable(chan) > 0) {
- /* Add it back to pending later */
- if (!to_readd) to_readd = smartlist_new();
- smartlist_add(to_readd, chan);
- log_debug(LD_SCHED,
- "Channel " U64_FORMAT " at %p "
- "is still pending",
- U64_PRINTF_ARG(chan->global_identifier),
- chan);
- } else {
- /* It's waiting to be able to write more */
- chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
- log_debug(LD_SCHED,
- "Channel " U64_FORMAT " at %p "
- "entered waiting_to_write from pending",
- U64_PRINTF_ARG(chan->global_identifier),
- chan);
- }
- } else {
- /* No cells left; it can go to idle or waiting_for_cells */
- if (channel_num_cells_writeable(chan) > 0) {
- /*
- * It can still accept writes, so it goes to
- * waiting_for_cells
- */
- chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
- log_debug(LD_SCHED,
- "Channel " U64_FORMAT " at %p "
- "entered waiting_for_cells from pending",
- U64_PRINTF_ARG(chan->global_identifier),
- chan);
- } else {
- /*
- * We exactly filled up the output queue with all available
- * cells; go to idle.
- */
- chan->scheduler_state = SCHED_CHAN_IDLE;
- log_debug(LD_SCHED,
- "Channel " U64_FORMAT " at %p "
- "become idle from pending",
- U64_PRINTF_ARG(chan->global_identifier),
- chan);
- }
- }
- }
-
- log_debug(LD_SCHED,
- "Scheduler flushed %d cells onto pending channel "
- U64_FORMAT " at %p",
- (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
- chan);
- } else {
- log_info(LD_SCHED,
- "Scheduler saw pending channel " U64_FORMAT " at %p with "
- "no cells writeable",
- U64_PRINTF_ARG(chan->global_identifier), chan);
- /* Put it back to WAITING_TO_WRITE */
- chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
- }
- }
-
- /* Readd any channels we need to */
- if (to_readd) {
- SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) {
- readd_chan->scheduler_state = SCHED_CHAN_PENDING;
- smartlist_pqueue_add(channels_pending,
- scheduler_compare_channels,
- offsetof(channel_t, sched_heap_idx),
- readd_chan);
- } SMARTLIST_FOREACH_END(readd_chan);
- smartlist_free(to_readd);
+ if (chan->scheduler_state == SCHED_CHAN_PENDING) {
+ if (smartlist_pos(channels_pending, chan) == -1) {
+ log_warn(LD_SCHED, "Scheduler asked to release channel %" PRIu64 " "
+ "but it wasn't in channels_pending",
+ chan->global_identifier);
+ } else {
+ smartlist_pqueue_remove(channels_pending,
+ scheduler_compare_channels,
+ offsetof(channel_t, sched_heap_idx),
+ chan);
}
-
- n_chans_after = smartlist_len(channels_pending);
- q_len_after = channel_get_global_queue_estimate();
- q_heur_after = scheduler_get_queue_heuristic();
- log_debug(LD_SCHED,
- "Scheduler handled %d of %d pending channels, queue size from "
- U64_FORMAT " to " U64_FORMAT ", queue heuristic from "
- U64_FORMAT " to " U64_FORMAT,
- n_chans_before - n_chans_after, n_chans_before,
- U64_PRINTF_ARG(q_len_before), U64_PRINTF_ARG(q_len_after),
- U64_PRINTF_ARG(q_heur_before), U64_PRINTF_ARG(q_heur_after));
}
-}
-/** Trigger the scheduling event so we run the scheduler later */
-
-#if 0
-static void
-scheduler_trigger(void)
-{
- log_debug(LD_SCHED, "Triggering scheduler event");
-
- tor_assert(run_sched_ev);
-
- event_add(run_sched_ev, EV_TIMEOUT, 1);
+ if (the_scheduler->on_channel_free) {
+ the_scheduler->on_channel_free(chan);
+ }
+ chan->scheduler_state = SCHED_CHAN_IDLE;
}
-#endif
/** Mark a channel as ready to accept writes */
void
scheduler_channel_wants_writes(channel_t *chan)
{
- int became_pending = 0;
-
- tor_assert(chan);
- tor_assert(channels_pending);
+ IF_BUG_ONCE(!chan) {
+ return;
+ }
+ IF_BUG_ONCE(!channels_pending) {
+ return;
+ }
/* If it's already in waiting_to_write, we can put it in pending */
if (chan->scheduler_state == SCHED_CHAN_WAITING_TO_WRITE) {
/*
* It can write now, so it goes to channels_pending.
*/
+ log_debug(LD_SCHED, "chan=%" PRIu64 " became pending",
+ chan->global_identifier);
smartlist_pqueue_add(channels_pending,
scheduler_compare_channels,
offsetof(channel_t, sched_heap_idx),
@@ -588,7 +604,8 @@ scheduler_channel_wants_writes(channel_t *chan)
"Channel " U64_FORMAT " at %p went from waiting_to_write "
"to pending",
U64_PRINTF_ARG(chan->global_identifier), chan);
- became_pending = 1;
+ /* We just made a channel pending, we have scheduling work to do. */
+ the_scheduler->schedule();
} else {
/*
* It's not in SCHED_CHAN_WAITING_TO_WRITE, so it can't become pending;
@@ -602,23 +619,19 @@ scheduler_channel_wants_writes(channel_t *chan)
U64_PRINTF_ARG(chan->global_identifier), chan);
}
}
-
- /*
- * If we made a channel pending, we potentially have scheduling work
- * to do.
- */
- if (became_pending) scheduler_retrigger();
}
-/**
- * Notify the scheduler that a channel's position in the pqueue may have
- * changed
- */
+#ifdef TOR_UNIT_TESTS
+/*
+ * Notify scheduler that a channel's queue position may have changed.
+ */
void
scheduler_touch_channel(channel_t *chan)
{
- tor_assert(chan);
+ IF_BUG_ONCE(!chan) {
+ return;
+ }
if (chan->scheduler_state == SCHED_CHAN_PENDING) {
/* Remove and re-add it */
@@ -634,105 +647,5 @@ scheduler_touch_channel(channel_t *chan)
/* else no-op, since it isn't in the queue */
}
-/**
- * Notify the scheduler of a queue size adjustment, to recalculate the
- * queue heuristic.
- */
-
-void
-scheduler_adjust_queue_size(channel_t *chan, int dir, uint64_t adj)
-{
- time_t now = approx_time();
-
- log_debug(LD_SCHED,
- "Queue size adjustment by %s" U64_FORMAT " for channel "
- U64_FORMAT,
- (dir >= 0) ? "+" : "-",
- U64_PRINTF_ARG(adj),
- U64_PRINTF_ARG(chan->global_identifier));
-
- /* Get the queue heuristic up to date */
- scheduler_update_queue_heuristic(now);
-
- /* Adjust as appropriate */
- if (dir >= 0) {
- /* Increasing it */
- queue_heuristic += adj;
- } else {
- /* Decreasing it */
- if (queue_heuristic > adj) queue_heuristic -= adj;
- else queue_heuristic = 0;
- }
-
- log_debug(LD_SCHED,
- "Queue heuristic is now " U64_FORMAT,
- U64_PRINTF_ARG(queue_heuristic));
-}
-
-/**
- * Query the current value of the queue heuristic
- */
-
-STATIC uint64_t
-scheduler_get_queue_heuristic(void)
-{
- time_t now = approx_time();
-
- scheduler_update_queue_heuristic(now);
-
- return queue_heuristic;
-}
-
-/**
- * Adjust the queue heuristic value to the present time
- */
-
-STATIC void
-scheduler_update_queue_heuristic(time_t now)
-{
- time_t diff;
-
- if (queue_heuristic_timestamp == 0) {
- /*
- * Nothing we can sensibly do; must not have been initted properly.
- * Oh well.
- */
- queue_heuristic_timestamp = now;
- } else if (queue_heuristic_timestamp < now) {
- diff = now - queue_heuristic_timestamp;
- /*
- * This is a simple exponential age-out; the other proposed alternative
- * was a linear age-out using the bandwidth history in rephist.c; I'm
- * going with this out of concern that if an adversary can jam the
- * scheduler long enough, it would cause the bandwidth to drop to
- * zero and render the aging mechanism ineffective thereafter.
- */
- if (0 <= diff && diff < 64) queue_heuristic >>= diff;
- else queue_heuristic = 0;
-
- queue_heuristic_timestamp = now;
-
- log_debug(LD_SCHED,
- "Queue heuristic is now " U64_FORMAT,
- U64_PRINTF_ARG(queue_heuristic));
- }
- /* else no update needed, or time went backward */
-}
-
-/**
- * Set scheduler watermarks and flush size
- */
-
-void
-scheduler_set_watermarks(uint32_t lo, uint32_t hi, uint32_t max_flush)
-{
- /* Sanity assertions - caller should ensure these are true */
- tor_assert(lo > 0);
- tor_assert(hi > lo);
- tor_assert(max_flush > 0);
-
- sched_q_low_water = lo;
- sched_q_high_water = hi;
- sched_max_flush_cells = max_flush;
-}
+#endif /* TOR_UNIT_TESTS */
diff --git a/src/or/scheduler.h b/src/or/scheduler.h
index e29c13de7e..98c3599817 100644
--- a/src/or/scheduler.h
+++ b/src/or/scheduler.h
@@ -1,9 +1,9 @@
-/* * Copyright (c) 2013-2017, The Tor Project, Inc. */
+/* * Copyright (c) 2017, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file scheduler.h
- * \brief Header file for scheduler.c
+ * \brief Header file for scheduler*.c
**/
#ifndef TOR_SCHEDULER_H
@@ -13,45 +13,192 @@
#include "channel.h"
#include "testsupport.h"
-/* Global-visibility scheduler functions */
+/*
+ * A scheduler implementation is a collection of function pointers. If you
+ * would like to add a new scheduler called foo, create scheduler_foo.c,
+ * implement at least the mandatory ones, and implement get_foo_scheduler()
+ * that returns a complete scheduler_t for your foo scheduler. See
+ * scheduler_kist.c for an example.
+ *
+ * These function pointers SHOULD NOT be used anywhere outside of the
+ * scheduling source files. The rest of Tor should communicate with the
+ * scheduling system through the functions near the bottom of this file, and
+ * those functions will call into the current scheduler implementation as
+ * necessary.
+ *
+ * If your scheduler doesn't need to implement something (for example: it
+ * doesn't create any state for itself, thus it has nothing to free when Tor
+ * is shutting down), then set that function pointer to NULL.
+ */
+typedef struct scheduler_s {
+ /* (Optional) To be called when we want to prepare a scheduler for use.
+ * Perhaps Tor just started and we are the lucky chosen scheduler, or
+ * perhaps Tor is switching to this scheduler. No matter the case, this is
+ * where we would prepare any state and initialize parameters. You might
+ * think of this as the opposite of free_all(). */
+ void (*init)(void);
-/* Set up and shut down the scheduler from main.c */
-void scheduler_free_all(void);
-void scheduler_init(void);
-MOCK_DECL(void, scheduler_run, (void));
+ /* (Optional) To be called when we want to tell the scheduler to delete all
+ * of its state (if any). Perhaps Tor is shutting down or perhaps we are
+ * switching schedulers. */
+ void (*free_all)(void);
-/* Mark channels as having cells or wanting/not wanting writes */
-MOCK_DECL(void,scheduler_channel_doesnt_want_writes,(channel_t *chan));
-MOCK_DECL(void,scheduler_channel_has_waiting_cells,(channel_t *chan));
-void scheduler_channel_wants_writes(channel_t *chan);
+ /* (Mandatory) Libevent controls the main event loop in Tor, and this is
+ * where we register with libevent the next execution of run_sched_ev [which
+ * ultimately calls run()]. */
+ void (*schedule)(void);
-/* Notify the scheduler of a channel being closed */
-MOCK_DECL(void,scheduler_release_channel,(channel_t *chan));
+ /* (Mandatory) This is the heart of a scheduler! This is where the
+ * excitement happens! Here libevent has given us the chance to execute, and
+ * we should do whatever we need to do in order to move some cells from
+ * their circuit queues to output buffers in an intelligent manner. We
+ * should do this quickly. When we are done, we'll try to schedule() ourself
+ * if more work needs to be done to setup the next scehduling run. */
+ void (*run)(void);
-/* Notify scheduler of queue size adjustments */
-void scheduler_adjust_queue_size(channel_t *chan, int dir, uint64_t adj);
+ /*
+ * External event not related to the scheduler but that can influence it.
+ */
-/* Notify scheduler that a channel's queue position may have changed */
-void scheduler_touch_channel(channel_t *chan);
+ /* (Optional) To be called whenever Tor finds out about a new consensus.
+ * First the scheduling system as a whole will react to the new consensus
+ * and change the scheduler if needed. After that, the current scheduler
+ * (which might be new) will call this so it has the chance to react to the
+ * new consensus too. If there's a consensus parameter that your scheduler
+ * wants to keep an eye on, this is where you should check for it. */
+ void (*on_new_consensus)(const networkstatus_t *old_c,
+ const networkstatus_t *new_c);
+
+ /* (Optional) To be called when a channel is being freed. Sometimes channels
+ * go away (for example: the relay on the other end is shutting down). If
+ * the scheduler keeps any channel-specific state and has memory to free
+ * when channels go away, implement this and free it here. */
+ void (*on_channel_free)(const channel_t *);
+
+ /* (Optional) To be called whenever Tor is reloading configuration options.
+ * For example: SIGHUP was issued and Tor is rereading its torrc. A
+ * scheduler should use this as an opportunity to parse and cache torrc
+ * options so that it doesn't have to call get_options() all the time. */
+ void (*on_new_options)(void);
+} scheduler_t;
+
+/** Scheduler type, we build an ordered list with those values from the
+ * parsed strings in Schedulers. The reason to do such a thing is so we can
+ * quickly and without parsing strings select the scheduler at anytime. */
+typedef enum {
+ SCHEDULER_VANILLA = 1,
+ SCHEDULER_KIST = 2,
+ SCHEDULER_KIST_LITE = 3,
+} scheduler_types_t;
-/* Adjust the watermarks from config file*/
-void scheduler_set_watermarks(uint32_t lo, uint32_t hi, uint32_t max_flush);
+/*****************************************************************************
+ * Globally visible scheduler variables/values
+ *
+ * These are variables/constants that all of Tor should be able to see.
+ *****************************************************************************/
-/* Things only scheduler.c and its test suite should see */
+/* Default interval that KIST runs (in ms). */
+#define KIST_SCHED_RUN_INTERVAL_DEFAULT 10
+/* Minimum interval that KIST runs. This value disables KIST. */
+#define KIST_SCHED_RUN_INTERVAL_MIN 0
+/* Maximum interval that KIST runs (in ms). */
+#define KIST_SCHED_RUN_INTERVAL_MAX 100
+/*****************************************************************************
+ * Globally visible scheduler functions
+ *
+ * These functions are how the rest of Tor communicates with the scheduling
+ * system.
+ *****************************************************************************/
+
+void scheduler_init(void);
+void scheduler_free_all(void);
+void scheduler_conf_changed(void);
+void scheduler_notify_networkstatus_changed(const networkstatus_t *old_c,
+ const networkstatus_t *new_c);
+MOCK_DECL(void, scheduler_release_channel, (channel_t *chan));
+
+/*
+ * Ways for a channel to interact with the scheduling system. A channel only
+ * really knows (i) whether or not it has cells it wants to send, and
+ * (ii) whether or not it would like to write.
+ */
+void scheduler_channel_wants_writes(channel_t *chan);
+MOCK_DECL(void, scheduler_channel_doesnt_want_writes, (channel_t *chan));
+MOCK_DECL(void, scheduler_channel_has_waiting_cells, (channel_t *chan));
+
+/*****************************************************************************
+ * Private scheduler functions
+ *
+ * These functions are only visible to the scheduling system, the current
+ * scheduler implementation, and tests.
+ *****************************************************************************/
#ifdef SCHEDULER_PRIVATE_
-MOCK_DECL(STATIC int, scheduler_compare_channels,
+
+/*********************************
+ * Defined in scheduler.c
+ *********************************/
+smartlist_t *get_channels_pending(void);
+MOCK_DECL(int, scheduler_compare_channels,
(const void *c1_v, const void *c2_v));
-STATIC uint64_t scheduler_get_queue_heuristic(void);
-STATIC void scheduler_update_queue_heuristic(time_t now);
+void scheduler_ev_active(int flags);
+void scheduler_ev_add(const struct timeval *next_run);
#ifdef TOR_UNIT_TESTS
extern smartlist_t *channels_pending;
extern struct event *run_sched_ev;
-extern uint64_t queue_heuristic;
-extern time_t queue_heuristic_timestamp;
-#endif
-#endif
+extern const scheduler_t *the_scheduler;
+void scheduler_touch_channel(channel_t *chan);
+#endif /* TOR_UNIT_TESTS */
+
+/*********************************
+ * Defined in scheduler_kist.c
+ *********************************/
+
+#ifdef SCHEDULER_KIST_PRIVATE
+
+/* Socke table entry which holds information of a channel's socket and kernel
+ * TCP information. Only used by KIST. */
+typedef struct socket_table_ent_s {
+ HT_ENTRY(socket_table_ent_s) node;
+ const channel_t *chan;
+ /* Amount written this scheduling run */
+ uint64_t written;
+ /* Amount that can be written this scheduling run */
+ uint64_t limit;
+ /* TCP info from the kernel */
+ uint32_t cwnd;
+ uint32_t unacked;
+ uint32_t mss;
+ uint32_t notsent;
+} socket_table_ent_t;
+
+typedef HT_HEAD(outbuf_table_s, outbuf_table_ent_s) outbuf_table_t;
+
+MOCK_DECL(int, channel_should_write_to_kernel,
+ (outbuf_table_t *table, channel_t *chan));
+MOCK_DECL(void, channel_write_to_kernel, (channel_t *chan));
+MOCK_DECL(void, update_socket_info_impl, (socket_table_ent_t *ent));
+
+int scheduler_can_use_kist(void);
+void scheduler_kist_set_full_mode(void);
+void scheduler_kist_set_lite_mode(void);
+scheduler_t *get_kist_scheduler(void);
+int32_t kist_scheduler_run_interval(const networkstatus_t *ns);
+
+#ifdef TOR_UNIT_TESTS
+extern int32_t sched_run_interval;
+#endif /* TOR_UNIT_TESTS */
+
+#endif /* SCHEDULER_KIST_PRIVATE */
+
+/*********************************
+ * Defined in scheduler_vanilla.c
+ *********************************/
+
+scheduler_t *get_vanilla_scheduler(void);
+
+#endif /* SCHEDULER_PRIVATE_ */
-#endif /* !defined(TOR_SCHEDULER_H) */
+#endif /* TOR_SCHEDULER_H */
diff --git a/src/or/scheduler_kist.c b/src/or/scheduler_kist.c
new file mode 100644
index 0000000000..913cb4dce6
--- /dev/null
+++ b/src/or/scheduler_kist.c
@@ -0,0 +1,774 @@
+/* Copyright (c) 2017, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#define SCHEDULER_KIST_PRIVATE
+
+#include <event2/event.h>
+#include <netinet/tcp.h>
+
+#include "or.h"
+#include "buffers.h"
+#include "config.h"
+#include "connection.h"
+#include "networkstatus.h"
+#define TOR_CHANNEL_INTERNAL_
+#include "channel.h"
+#include "channeltls.h"
+#define SCHEDULER_PRIVATE_
+#include "scheduler.h"
+
+#define TLS_PER_CELL_OVERHEAD 29
+
+#ifdef HAVE_KIST_SUPPORT
+/* Kernel interface needed for KIST. */
+#include <linux/sockios.h>
+#endif /* HAVE_KIST_SUPPORT */
+
+/*****************************************************************************
+ * Data structures and supporting functions
+ *****************************************************************************/
+
+#ifdef HAVE_KIST_SUPPORT
+/* Indicate if KIST lite mode is on or off. We can disable it at runtime.
+ * Important to have because of the KISTLite -> KIST possible transition. */
+static unsigned int kist_lite_mode = 0;
+/* Indicate if we don't have the kernel support. This can happen if the kernel
+ * changed and it doesn't recognized the values passed to the syscalls needed
+ * by KIST. In that case, fallback to the naive approach. */
+static unsigned int kist_no_kernel_support = 0;
+#else
+static unsigned int kist_no_kernel_support = 1;
+static unsigned int kist_lite_mode = 1;
+#endif
+
+/* Socket_table hash table stuff. The socket_table keeps track of per-socket
+ * limit information imposed by kist and used by kist. */
+
+static uint32_t
+socket_table_ent_hash(const socket_table_ent_t *ent)
+{
+ return (uint32_t)ent->chan->global_identifier;
+}
+
+static unsigned
+socket_table_ent_eq(const socket_table_ent_t *a, const socket_table_ent_t *b)
+{
+ return a->chan == b->chan;
+}
+
+typedef HT_HEAD(socket_table_s, socket_table_ent_s) socket_table_t;
+
+static socket_table_t socket_table = HT_INITIALIZER();
+
+HT_PROTOTYPE(socket_table_s, socket_table_ent_s, node, socket_table_ent_hash,
+ socket_table_ent_eq)
+HT_GENERATE2(socket_table_s, socket_table_ent_s, node, socket_table_ent_hash,
+ socket_table_ent_eq, 0.6, tor_reallocarray, tor_free_)
+
+/* outbuf_table hash table stuff. The outbuf_table keeps track of which
+ * channels have data sitting in their outbuf so the kist scheduler can force
+ * a write from outbuf to kernel periodically during a run and at the end of a
+ * run. */
+
+typedef struct outbuf_table_ent_s {
+ HT_ENTRY(outbuf_table_ent_s) node;
+ channel_t *chan;
+} outbuf_table_ent_t;
+
+static uint32_t
+outbuf_table_ent_hash(const outbuf_table_ent_t *ent)
+{
+ return (uint32_t)ent->chan->global_identifier;
+}
+
+static unsigned
+outbuf_table_ent_eq(const outbuf_table_ent_t *a, const outbuf_table_ent_t *b)
+{
+ return a->chan->global_identifier == b->chan->global_identifier;
+}
+
+HT_PROTOTYPE(outbuf_table_s, outbuf_table_ent_s, node, outbuf_table_ent_hash,
+ outbuf_table_ent_eq)
+HT_GENERATE2(outbuf_table_s, outbuf_table_ent_s, node, outbuf_table_ent_hash,
+ outbuf_table_ent_eq, 0.6, tor_reallocarray, tor_free_)
+
+/*****************************************************************************
+ * Other internal data
+ *****************************************************************************/
+
+/* Store the last time the scheduler was run so we can decide when to next run
+ * the scheduler based on it. */
+static monotime_t scheduler_last_run;
+/* This is a factor for the extra_space calculation in kist per-socket limits.
+ * It is the number of extra congestion windows we want to write to the kernel.
+ */
+static double sock_buf_size_factor = 1.0;
+/* How often the scheduler runs. */
+STATIC int32_t sched_run_interval = 10;
+
+/*****************************************************************************
+ * Internally called function implementations
+ *****************************************************************************/
+
+/* Little helper function to get the length of a channel's output buffer */
+static inline size_t
+channel_outbuf_length(channel_t *chan)
+{
+ /* In theory, this can not happen because we can not scheduler a channel
+ * without a connection that has its outbuf initialized. Just in case, bug
+ * on this so we can understand a bit more why it happened. */
+ if (BUG(BASE_CHAN_TO_TLS(chan)->conn == NULL)) {
+ return 0;
+ }
+ return buf_datalen(TO_CONN(BASE_CHAN_TO_TLS(chan)->conn)->outbuf);
+}
+
+/* Little helper function for HT_FOREACH_FN. */
+static int
+each_channel_write_to_kernel(outbuf_table_ent_t *ent, void *data)
+{
+ (void) data; /* Make compiler happy. */
+ channel_write_to_kernel(ent->chan);
+ return 0; /* Returning non-zero removes the element from the table. */
+}
+
+/* Free the given outbuf table entry ent. */
+static int
+free_outbuf_info_by_ent(outbuf_table_ent_t *ent, void *data)
+{
+ (void) data; /* Make compiler happy. */
+ log_debug(LD_SCHED, "Freeing outbuf table entry from chan=%" PRIu64,
+ ent->chan->global_identifier);
+ tor_free(ent);
+ return 1; /* So HT_FOREACH_FN will remove the element */
+}
+
+/* Free the given socket table entry ent. */
+static int
+free_socket_info_by_ent(socket_table_ent_t *ent, void *data)
+{
+ (void) data; /* Make compiler happy. */
+ log_debug(LD_SCHED, "Freeing socket table entry from chan=%" PRIu64,
+ ent->chan->global_identifier);
+ tor_free(ent);
+ return 1; /* So HT_FOREACH_FN will remove the element */
+}
+
+/* Clean up socket_table. Probably because the KIST sched impl is going away */
+static void
+free_all_socket_info(void)
+{
+ HT_FOREACH_FN(socket_table_s, &socket_table, free_socket_info_by_ent, NULL);
+}
+
+static socket_table_ent_t *
+socket_table_search(socket_table_t *table, const channel_t *chan)
+{
+ socket_table_ent_t search, *ent = NULL;
+ search.chan = chan;
+ ent = HT_FIND(socket_table_s, table, &search);
+ return ent;
+}
+
+/* Free a socket entry in table for the given chan. */
+static void
+free_socket_info_by_chan(socket_table_t *table, const channel_t *chan)
+{
+ socket_table_ent_t *ent = NULL;
+ ent = socket_table_search(table, chan);
+ if (!ent)
+ return;
+ log_debug(LD_SCHED, "scheduler free socket info for chan=%" PRIu64,
+ chan->global_identifier);
+ HT_REMOVE(socket_table_s, table, ent);
+ free_socket_info_by_ent(ent, NULL);
+}
+
+/* Perform system calls for the given socket in order to calculate kist's
+ * per-socket limit as documented in the function body. */
+MOCK_IMPL(void,
+update_socket_info_impl, (socket_table_ent_t *ent))
+{
+#ifdef HAVE_KIST_SUPPORT
+ int64_t tcp_space, extra_space;
+ const tor_socket_t sock =
+ TO_CONN(BASE_CHAN_TO_TLS((channel_t *) ent->chan)->conn)->s;
+ struct tcp_info tcp;
+ socklen_t tcp_info_len = sizeof(tcp);
+
+ if (kist_no_kernel_support || kist_lite_mode) {
+ goto fallback;
+ }
+
+ /* Gather information */
+ if (getsockopt(sock, SOL_TCP, TCP_INFO, (void *)&(tcp), &tcp_info_len) < 0) {
+ if (errno == EINVAL) {
+ /* Oops, this option is not provided by the kernel, we'll have to
+ * disable KIST entirely. This can happen if tor was built on a machine
+ * with the support previously or if the kernel was updated and lost the
+ * support. */
+ log_notice(LD_SCHED, "Looks like our kernel doesn't have the support "
+ "for KIST anymore. We will fallback to the naive "
+ "approach. Set KISTSchedRunInterval=-1 to disable "
+ "KIST.");
+ kist_no_kernel_support = 1;
+ }
+ goto fallback;
+ }
+ if (ioctl(sock, SIOCOUTQNSD, &(ent->notsent)) < 0) {
+ if (errno == EINVAL) {
+ log_notice(LD_SCHED, "Looks like our kernel doesn't have the support "
+ "for KIST anymore. We will fallback to the naive "
+ "approach. Set KISTSchedRunInterval=-1 to disable "
+ "KIST.");
+ /* Same reason as the above. */
+ kist_no_kernel_support = 1;
+ }
+ goto fallback;
+ }
+ ent->cwnd = tcp.tcpi_snd_cwnd;
+ ent->unacked = tcp.tcpi_unacked;
+ ent->mss = tcp.tcpi_snd_mss;
+
+ /* In order to reduce outbound kernel queuing delays and thus improve Tor's
+ * ability to prioritize circuits, KIST wants to set a socket write limit that
+ * is near the amount that the socket would be able to immediately send into
+ * the Internet.
+ *
+ * We first calculate how much the socket could send immediately (assuming
+ * completely full packets) according to the congestion window and the number
+ * of unacked packets.
+ *
+ * Then we add a little extra space in a controlled way. We do this so any
+ * when the kernel gets ACKs back for data currently sitting in the "TCP
+ * space", it will already have some more data to send immediately. It will
+ * not have to wait for the scheduler to run again. The amount of extra space
+ * is a factor of the current congestion window. With the suggested
+ * sock_buf_size_factor value of 1.0, we allow at most 2*cwnd bytes to sit in
+ * the kernel: 1 cwnd on the wire waiting for ACKs and 1 cwnd ready and
+ * waiting to be sent when those ACKs finally come.
+ *
+ * In the below diagram, we see some bytes in the TCP-space (denoted by '*')
+ * that have be sent onto the wire and are waiting for ACKs. We have a little
+ * more room in "TCP space" that we can fill with data that will be
+ * immediately sent. We also see the "extra space" KIST calculates. The sum
+ * of the empty "TCP space" and the "extra space" is the kist-imposed write
+ * limit for this socket.
+ *
+ * <----------------kernel-outbound-socket-queue----------------|
+ * <*********---------------------------------------------------|
+ * <----TCP-space-----|----extra-space-----|
+ * <------------------|
+ * ^ ((cwnd - unacked) * mss) bytes
+ * |--------------------|
+ * ^ ((cwnd * mss) * factor) bytes
+ */
+
+ /* Assuming all these values from the kernel are uint32_t still, they will
+ * always fit into a int64_t tcp_space variable. */
+ tcp_space = (ent->cwnd - ent->unacked) * ent->mss;
+ if (tcp_space < 0) {
+ tcp_space = 0;
+ }
+
+ /* The clamp_double_to_int64 makes sure the first part fits into an int64_t.
+ * In fact, if sock_buf_size_factor is still forced to be >= 0 in config.c,
+ * then it will be positive for sure. Then we subtract a uint32_t. At worst
+ * we end up negative, but then we just set extra_space to 0 in the sanity
+ * check.*/
+ extra_space =
+ clamp_double_to_int64((ent->cwnd * ent->mss) * sock_buf_size_factor) -
+ ent->notsent;
+ if (extra_space < 0) {
+ extra_space = 0;
+ }
+
+ /* Finally we set the limit. Adding two positive int64_t together will always
+ * fit in an uint64_t. */
+ ent->limit = (uint64_t)tcp_space + (uint64_t)extra_space;
+ return;
+
+#else /* HAVE_KIST_SUPPORT */
+ goto fallback;
+#endif /* HAVE_KIST_SUPPORT */
+
+ fallback:
+ /* If all of a sudden we don't have kist support, we just zero out all the
+ * variables for this socket since we don't know what they should be.
+ * We also effectively allow the socket write as much as it wants to the
+ * kernel, effectively returning it to vanilla scheduler behavior. Writes
+ * are still limited by the lower layers of Tor: socket blocking, full
+ * outbuf, etc. */
+ ent->cwnd = ent->unacked = ent->mss = ent->notsent = 0;
+ ent->limit = INT_MAX;
+}
+
+/* Given a socket that isn't in the table, add it.
+ * Given a socket that is in the table, reinit values that need init-ing
+ * every scheduling run
+ */
+static void
+init_socket_info(socket_table_t *table, const channel_t *chan)
+{
+ socket_table_ent_t *ent = NULL;
+ ent = socket_table_search(table, chan);
+ if (!ent) {
+ log_debug(LD_SCHED, "scheduler init socket info for chan=%" PRIu64,
+ chan->global_identifier);
+ ent = tor_malloc_zero(sizeof(*ent));
+ ent->chan = chan;
+ HT_INSERT(socket_table_s, table, ent);
+ }
+ ent->written = 0;
+}
+
+/* Add chan to the outbuf table if it isn't already in it. If it is, then don't
+ * do anything */
+static void
+outbuf_table_add(outbuf_table_t *table, channel_t *chan)
+{
+ outbuf_table_ent_t search, *ent;
+ search.chan = chan;
+ ent = HT_FIND(outbuf_table_s, table, &search);
+ if (!ent) {
+ log_debug(LD_SCHED, "scheduler init outbuf info for chan=%" PRIu64,
+ chan->global_identifier);
+ ent = tor_malloc_zero(sizeof(*ent));
+ ent->chan = chan;
+ HT_INSERT(outbuf_table_s, table, ent);
+ }
+}
+
+static void
+outbuf_table_remove(outbuf_table_t *table, channel_t *chan)
+{
+ outbuf_table_ent_t search, *ent;
+ search.chan = chan;
+ ent = HT_FIND(outbuf_table_s, table, &search);
+ if (ent) {
+ HT_REMOVE(outbuf_table_s, table, ent);
+ free_outbuf_info_by_ent(ent, NULL);
+ }
+}
+
+/* Set the scheduler running interval. */
+static void
+set_scheduler_run_interval(const networkstatus_t *ns)
+{
+ int32_t old_sched_run_interval = sched_run_interval;
+ sched_run_interval = kist_scheduler_run_interval(ns);
+ if (old_sched_run_interval != sched_run_interval) {
+ log_info(LD_SCHED, "Scheduler KIST changing its running interval "
+ "from %" PRId32 " to %" PRId32,
+ old_sched_run_interval, sched_run_interval);
+ }
+}
+
+/* Return true iff the channel associated socket can write to the kernel that
+ * is hasn't reach the limit. */
+static int
+socket_can_write(socket_table_t *table, const channel_t *chan)
+{
+ socket_table_ent_t *ent = NULL;
+ ent = socket_table_search(table, chan);
+ IF_BUG_ONCE(!ent) {
+ return 1; // Just return true, saying that kist wouldn't limit the socket
+ }
+
+ /* We previously caclulated a write limit for this socket. In the below
+ * calculation, first determine how much room is left in bytes. Then divide
+ * that by the amount of space a cell takes. If there's room for at least 1
+ * cell, then KIST will allow the socket to write. */
+ int64_t kist_limit_space =
+ (int64_t) (ent->limit - ent->written) /
+ (CELL_MAX_NETWORK_SIZE + TLS_PER_CELL_OVERHEAD);
+ return kist_limit_space > 0;
+}
+
+/* Update the channel's socket kernel information. */
+static void
+update_socket_info(socket_table_t *table, const channel_t *chan)
+{
+ socket_table_ent_t *ent = NULL;
+ ent = socket_table_search(table, chan);
+ IF_BUG_ONCE(!ent) {
+ return; // Whelp. Entry didn't exist for some reason so nothing to do.
+ }
+ update_socket_info_impl(ent);
+}
+
+/* Increament the channel's socket written value by the number of bytes. */
+static void
+update_socket_written(socket_table_t *table, channel_t *chan, size_t bytes)
+{
+ socket_table_ent_t *ent = NULL;
+ ent = socket_table_search(table, chan);
+ IF_BUG_ONCE(!ent) {
+ return; // Whelp. Entry didn't exist so nothing to do.
+ }
+
+ log_debug(LD_SCHED, "chan=%" PRIu64 " wrote %lu bytes, old was %" PRIi64,
+ chan->global_identifier, bytes, ent->written);
+
+ ent->written += bytes;
+}
+
+/*
+ * A naive KIST impl would write every single cell all the way to the kernel.
+ * That would take a lot of system calls. A less bad KIST impl would write a
+ * channel's outbuf to the kernel only when we are switching to a different
+ * channel. But if we have two channels with equal priority, we end up writing
+ * one cell for each and bouncing back and forth. This KIST impl avoids that
+ * by only writing a channel's outbuf to the kernel if it has 8 cells or more
+ * in it.
+ */
+MOCK_IMPL(int, channel_should_write_to_kernel,
+ (outbuf_table_t *table, channel_t *chan))
+{
+ outbuf_table_add(table, chan);
+ /* CELL_MAX_NETWORK_SIZE * 8 because we only want to write the outbuf to the
+ * kernel if there's 8 or more cells waiting */
+ return channel_outbuf_length(chan) > (CELL_MAX_NETWORK_SIZE * 8);
+}
+
+/* Little helper function to write a channel's outbuf all the way to the
+ * kernel */
+MOCK_IMPL(void, channel_write_to_kernel, (channel_t *chan))
+{
+ log_debug(LD_SCHED, "Writing %lu bytes to kernel for chan %" PRIu64,
+ channel_outbuf_length(chan), chan->global_identifier);
+ connection_handle_write(TO_CONN(BASE_CHAN_TO_TLS(chan)->conn), 0);
+}
+
+/* Return true iff the scheduler has work to perform. */
+static int
+have_work(void)
+{
+ smartlist_t *cp = get_channels_pending();
+ IF_BUG_ONCE(!cp) {
+ return 0; // channels_pending doesn't exist so... no work?
+ }
+ return smartlist_len(cp) > 0;
+}
+
+/* Function of the scheduler interface: free_all() */
+static void
+kist_free_all(void)
+{
+ free_all_socket_info();
+}
+
+/* Function of the scheduler interface: on_channel_free() */
+static void
+kist_on_channel_free(const channel_t *chan)
+{
+ free_socket_info_by_chan(&socket_table, chan);
+}
+
+/* Function of the scheduler interface: on_new_consensus() */
+static void
+kist_scheduler_on_new_consensus(const networkstatus_t *old_c,
+ const networkstatus_t *new_c)
+{
+ (void) old_c;
+ (void) new_c;
+
+ set_scheduler_run_interval(new_c);
+}
+
+/* Function of the scheduler interface: on_new_options() */
+static void
+kist_scheduler_on_new_options(void)
+{
+ sock_buf_size_factor = get_options()->KISTSockBufSizeFactor;
+
+ /* Calls kist_scheduler_run_interval which calls get_options(). */
+ set_scheduler_run_interval(NULL);
+}
+
+/* Function of the scheduler interface: init() */
+static void
+kist_scheduler_init(void)
+{
+ kist_scheduler_on_new_options();
+ IF_BUG_ONCE(sched_run_interval <= 0) {
+ log_warn(LD_SCHED, "We are initing the KIST scheduler and noticed the "
+ "KISTSchedRunInterval is telling us to not use KIST. That's "
+ "weird! We'll continue using KIST, but at %dms.",
+ KIST_SCHED_RUN_INTERVAL_DEFAULT);
+ sched_run_interval = KIST_SCHED_RUN_INTERVAL_DEFAULT;
+ }
+}
+
+/* Function of the scheduler interface: schedule() */
+static void
+kist_scheduler_schedule(void)
+{
+ struct monotime_t now;
+ struct timeval next_run;
+ int32_t diff;
+
+ if (!have_work()) {
+ return;
+ }
+ monotime_get(&now);
+ diff = (int32_t) monotime_diff_msec(&scheduler_last_run, &now);
+ if (diff < sched_run_interval) {
+ next_run.tv_sec = 0;
+ /* 1000 for ms -> us */
+ next_run.tv_usec = (sched_run_interval - diff) * 1000;
+ /* Readding an event reschedules it. It does not duplicate it. */
+ scheduler_ev_add(&next_run);
+ } else {
+ scheduler_ev_active(EV_TIMEOUT);
+ }
+}
+
+/* Function of the scheduler interface: run() */
+static void
+kist_scheduler_run(void)
+{
+ /* Define variables */
+ channel_t *chan = NULL; // current working channel
+ /* The last distinct chan served in a sched loop. */
+ channel_t *prev_chan = NULL;
+ int flush_result; // temporarily store results from flush calls
+ /* Channels to be readding to pending at the end */
+ smartlist_t *to_readd = NULL;
+ smartlist_t *cp = get_channels_pending();
+
+ outbuf_table_t outbuf_table = HT_INITIALIZER();
+
+ /* For each pending channel, collect new kernel information */
+ SMARTLIST_FOREACH_BEGIN(cp, const channel_t *, pchan) {
+ init_socket_info(&socket_table, pchan);
+ update_socket_info(&socket_table, pchan);
+ } SMARTLIST_FOREACH_END(pchan);
+
+ log_debug(LD_SCHED, "Running the scheduler. %d channels pending",
+ smartlist_len(cp));
+
+ /* The main scheduling loop. Loop until there are no more pending channels */
+ while (smartlist_len(cp) > 0) {
+ /* get best channel */
+ chan = smartlist_pqueue_pop(cp, scheduler_compare_channels,
+ offsetof(channel_t, sched_heap_idx));
+ IF_BUG_ONCE(!chan) {
+ /* Some-freaking-how a NULL got into the channels_pending. That should
+ * never happen, but it should be harmless to ignore it and keep looping.
+ */
+ continue;
+ }
+ outbuf_table_add(&outbuf_table, chan);
+
+ /* if we have switched to a new channel, consider writing the previous
+ * channel's outbuf to the kernel. */
+ if (!prev_chan) {
+ prev_chan = chan;
+ }
+ if (prev_chan != chan) {
+ if (channel_should_write_to_kernel(&outbuf_table, prev_chan)) {
+ channel_write_to_kernel(prev_chan);
+ outbuf_table_remove(&outbuf_table, prev_chan);
+ }
+ prev_chan = chan;
+ }
+
+ /* Only flush and write if the per-socket limit hasn't been hit */
+ if (socket_can_write(&socket_table, chan)) {
+ /* flush to channel queue/outbuf */
+ flush_result = (int)channel_flush_some_cells(chan, 1); // 1 for num cells
+ /* flush_result has the # cells flushed */
+ if (flush_result > 0) {
+ update_socket_written(&socket_table, chan, flush_result *
+ (CELL_MAX_NETWORK_SIZE + TLS_PER_CELL_OVERHEAD));
+ }
+ /* XXX What if we didn't flush? */
+ }
+
+ /* Decide what to do with the channel now */
+
+ if (!channel_more_to_flush(chan) &&
+ !socket_can_write(&socket_table, chan)) {
+
+ /* Case 1: no more cells to send, and cannot write */
+
+ /*
+ * You might think we should put the channel in SCHED_CHAN_IDLE. And
+ * you're probably correct. While implementing KIST, we found that the
+ * scheduling system would sometimes lose track of channels when we did
+ * that. We suspect it has to do with the difference between "can't
+ * write because socket/outbuf is full" and KIST's "can't write because
+ * we've arbitrarily decided that that's enough for now." Sometimes
+ * channels run out of cells at the same time they hit their
+ * kist-imposed write limit and maybe the rest of Tor doesn't put the
+ * channel back in pending when it is supposed to.
+ *
+ * This should be investigated again. It is as simple as changing
+ * SCHED_CHAN_WAITING_FOR_CELLS to SCHED_CHAN_IDLE and seeing if Tor
+ * starts having serious throughput issues. Best done in shadow/chutney.
+ */
+ chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+ log_debug(LD_SCHED, "chan=%" PRIu64 " now waiting_for_cells",
+ chan->global_identifier);
+ } else if (!channel_more_to_flush(chan)) {
+
+ /* Case 2: no more cells to send, but still open for writes */
+
+ chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+ log_debug(LD_SCHED, "chan=%" PRIu64 " now waiting_for_cells",
+ chan->global_identifier);
+ } else if (!socket_can_write(&socket_table, chan)) {
+
+ /* Case 3: cells to send, but cannot write */
+
+ /*
+ * We want to write, but can't. If we left the channel in
+ * channels_pending, we would never exit the scheduling loop. We need to
+ * add it to a temporary list of channels to be added to channels_pending
+ * after the scheduling loop is over. They can hopefully be taken care of
+ * in the next scheduling round.
+ */
+ chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+ if (!to_readd) {
+ to_readd = smartlist_new();
+ }
+ smartlist_add(to_readd, chan);
+ log_debug(LD_SCHED, "chan=%" PRIu64 " now waiting_to_write",
+ chan->global_identifier);
+ } else {
+
+ /* Case 4: cells to send, and still open for writes */
+
+ chan->scheduler_state = SCHED_CHAN_PENDING;
+ smartlist_pqueue_add(cp, scheduler_compare_channels,
+ offsetof(channel_t, sched_heap_idx), chan);
+ }
+ } /* End of main scheduling loop */
+
+ /* Write the outbuf of any channels that still have data */
+ HT_FOREACH_FN(outbuf_table_s, &outbuf_table, each_channel_write_to_kernel,
+ NULL);
+ /* We are done with it. */
+ HT_FOREACH_FN(outbuf_table_s, &outbuf_table, free_outbuf_info_by_ent, NULL);
+ HT_CLEAR(outbuf_table_s, &outbuf_table);
+
+ log_debug(LD_SCHED, "len pending=%d, len to_readd=%d",
+ smartlist_len(cp),
+ (to_readd ? smartlist_len(to_readd) : -1));
+
+ /* Readd any channels we need to */
+ if (to_readd) {
+ SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) {
+ readd_chan->scheduler_state = SCHED_CHAN_PENDING;
+ if (!smartlist_contains(cp, readd_chan)) {
+ smartlist_pqueue_add(cp, scheduler_compare_channels,
+ offsetof(channel_t, sched_heap_idx), readd_chan);
+ }
+ } SMARTLIST_FOREACH_END(readd_chan);
+ smartlist_free(to_readd);
+ }
+
+ monotime_get(&scheduler_last_run);
+}
+
+/*****************************************************************************
+ * Externally called function implementations not called through scheduler_t
+ *****************************************************************************/
+
+/* Stores the kist scheduler function pointers. */
+static scheduler_t kist_scheduler = {
+ .free_all = kist_free_all,
+ .on_channel_free = kist_on_channel_free,
+ .init = kist_scheduler_init,
+ .on_new_consensus = kist_scheduler_on_new_consensus,
+ .schedule = kist_scheduler_schedule,
+ .run = kist_scheduler_run,
+ .on_new_options = kist_scheduler_on_new_options,
+};
+
+/* Return the KIST scheduler object. If it didn't exists, return a newly
+ * allocated one but init() is not called. */
+scheduler_t *
+get_kist_scheduler(void)
+{
+ return &kist_scheduler;
+}
+
+/* Check the torrc for the configured KIST scheduler run interval.
+ * - If torrc < 0, then return the negative torrc value (shouldn't even be
+ * using KIST)
+ * - If torrc > 0, then return the positive torrc value (should use KIST, and
+ * should use the set value)
+ * - If torrc == 0, then look in the consensus for what the value should be.
+ * - If == 0, then return -1 (don't use KIST)
+ * - If > 0, then return the positive consensus value
+ * - If consensus doesn't say anything, return 10 milliseconds
+ */
+int32_t
+kist_scheduler_run_interval(const networkstatus_t *ns)
+{
+ int32_t run_interval = (int32_t)get_options()->KISTSchedRunInterval;
+ if (run_interval != 0) {
+ log_debug(LD_SCHED, "Found KISTSchedRunInterval in torrc. Using that.");
+ return run_interval;
+ }
+
+ log_debug(LD_SCHED, "Turning to the consensus for KISTSchedRunInterval");
+ run_interval = networkstatus_get_param(ns, "KISTSchedRunInterval",
+ KIST_SCHED_RUN_INTERVAL_DEFAULT,
+ KIST_SCHED_RUN_INTERVAL_MIN,
+ KIST_SCHED_RUN_INTERVAL_MAX);
+ if (run_interval <= 0)
+ return -1;
+ return run_interval;
+}
+
+/* Set KISTLite mode that is KIST without kernel support. */
+void
+scheduler_kist_set_lite_mode(void)
+{
+ kist_lite_mode = 1;
+ log_info(LD_SCHED,
+ "Setting KIST scheduler without kernel support (KISTLite mode)");
+}
+
+/* Set KIST mode that is KIST with kernel support. */
+void
+scheduler_kist_set_full_mode(void)
+{
+ kist_lite_mode = 0;
+ log_info(LD_SCHED,
+ "Setting KIST scheduler with kernel support (KIST mode)");
+}
+
+#ifdef HAVE_KIST_SUPPORT
+
+/* Return true iff the scheduler subsystem should use KIST. */
+int
+scheduler_can_use_kist(void)
+{
+ if (kist_no_kernel_support) {
+ /* We have no kernel support so we can't use KIST. */
+ return 0;
+ }
+
+ /* We do have the support, time to check if we can get the interval that the
+ * consensus can be disabling. */
+ int64_t run_interval = kist_scheduler_run_interval(NULL);
+ log_debug(LD_SCHED, "Determined KIST sched_run_interval should be "
+ "%" PRId64 ". Can%s use KIST.",
+ run_interval, (run_interval > 0 ? "" : " not"));
+ return run_interval > 0;
+}
+
+#else /* HAVE_KIST_SUPPORT */
+
+int
+scheduler_can_use_kist(void)
+{
+ return 0;
+}
+
+#endif /* HAVE_KIST_SUPPORT */
+
diff --git a/src/or/scheduler_vanilla.c b/src/or/scheduler_vanilla.c
new file mode 100644
index 0000000000..09653f445e
--- /dev/null
+++ b/src/or/scheduler_vanilla.c
@@ -0,0 +1,196 @@
+/* Copyright (c) 2017, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include <event2/event.h>
+
+#include "or.h"
+#include "config.h"
+#define TOR_CHANNEL_INTERNAL_
+#include "channel.h"
+#define SCHEDULER_PRIVATE_
+#include "scheduler.h"
+
+/*****************************************************************************
+ * Other internal data
+ *****************************************************************************/
+
+/* Maximum cells to flush in a single call to channel_flush_some_cells(); */
+#define MAX_FLUSH_CELLS 1000
+
+/*****************************************************************************
+ * Externally called function implementations
+ *****************************************************************************/
+
+/* Return true iff the scheduler has work to perform. */
+static int
+have_work(void)
+{
+ smartlist_t *cp = get_channels_pending();
+ IF_BUG_ONCE(!cp) {
+ return 0; // channels_pending doesn't exist so... no work?
+ }
+ return smartlist_len(cp) > 0;
+}
+
+/** Retrigger the scheduler in a way safe to use from the callback */
+
+static void
+vanilla_scheduler_schedule(void)
+{
+ if (!have_work()) {
+ return;
+ }
+
+ /* Activate our event so it can process channels. */
+ scheduler_ev_active(EV_TIMEOUT);
+}
+
+static void
+vanilla_scheduler_run(void)
+{
+ int n_cells, n_chans_before, n_chans_after;
+ ssize_t flushed, flushed_this_time;
+ smartlist_t *cp = get_channels_pending();
+ smartlist_t *to_readd = NULL;
+ channel_t *chan = NULL;
+
+ log_debug(LD_SCHED, "We have a chance to run the scheduler");
+
+ n_chans_before = smartlist_len(cp);
+
+ while (smartlist_len(cp) > 0) {
+ /* Pop off a channel */
+ chan = smartlist_pqueue_pop(cp,
+ scheduler_compare_channels,
+ offsetof(channel_t, sched_heap_idx));
+ IF_BUG_ONCE(!chan) {
+ /* Some-freaking-how a NULL got into the channels_pending. That should
+ * never happen, but it should be harmless to ignore it and keep looping.
+ */
+ continue;
+ }
+
+ /* Figure out how many cells we can write */
+ n_cells = channel_num_cells_writeable(chan);
+ if (n_cells > 0) {
+ log_debug(LD_SCHED,
+ "Scheduler saw pending channel " U64_FORMAT " at %p with "
+ "%d cells writeable",
+ U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
+
+ flushed = 0;
+ while (flushed < n_cells) {
+ flushed_this_time =
+ channel_flush_some_cells(chan,
+ MIN(MAX_FLUSH_CELLS, (size_t) n_cells - flushed));
+ if (flushed_this_time <= 0) break;
+ flushed += flushed_this_time;
+ }
+
+ if (flushed < n_cells) {
+ /* We ran out of cells to flush */
+ chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "entered waiting_for_cells from pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ } else {
+ /* The channel may still have some cells */
+ if (channel_more_to_flush(chan)) {
+ /* The channel goes to either pending or waiting_to_write */
+ if (channel_num_cells_writeable(chan) > 0) {
+ /* Add it back to pending later */
+ if (!to_readd) to_readd = smartlist_new();
+ smartlist_add(to_readd, chan);
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "is still pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ } else {
+ /* It's waiting to be able to write more */
+ chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "entered waiting_to_write from pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ }
+ } else {
+ /* No cells left; it can go to idle or waiting_for_cells */
+ if (channel_num_cells_writeable(chan) > 0) {
+ /*
+ * It can still accept writes, so it goes to
+ * waiting_for_cells
+ */
+ chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "entered waiting_for_cells from pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ } else {
+ /*
+ * We exactly filled up the output queue with all available
+ * cells; go to idle.
+ */
+ chan->scheduler_state = SCHED_CHAN_IDLE;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "become idle from pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ }
+ }
+ }
+
+ log_debug(LD_SCHED,
+ "Scheduler flushed %d cells onto pending channel "
+ U64_FORMAT " at %p",
+ (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ } else {
+ log_info(LD_SCHED,
+ "Scheduler saw pending channel " U64_FORMAT " at %p with "
+ "no cells writeable",
+ U64_PRINTF_ARG(chan->global_identifier), chan);
+ /* Put it back to WAITING_TO_WRITE */
+ chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+ }
+ }
+
+ /* Readd any channels we need to */
+ if (to_readd) {
+ SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) {
+ readd_chan->scheduler_state = SCHED_CHAN_PENDING;
+ smartlist_pqueue_add(cp,
+ scheduler_compare_channels,
+ offsetof(channel_t, sched_heap_idx),
+ readd_chan);
+ } SMARTLIST_FOREACH_END(readd_chan);
+ smartlist_free(to_readd);
+ }
+
+ n_chans_after = smartlist_len(cp);
+ log_debug(LD_SCHED, "Scheduler handled %d of %d pending channels",
+ n_chans_before - n_chans_after, n_chans_before);
+}
+
+/* Stores the vanilla scheduler function pointers. */
+static scheduler_t vanilla_scheduler = {
+ .free_all = NULL,
+ .on_channel_free = NULL,
+ .init = NULL,
+ .on_new_consensus = NULL,
+ .schedule = vanilla_scheduler_schedule,
+ .run = vanilla_scheduler_run,
+ .on_new_options = NULL,
+};
+
+scheduler_t *
+get_vanilla_scheduler(void)
+{
+ return &vanilla_scheduler;
+}
+