diff options
-rw-r--r-- | changes/multi-priority | 5 | ||||
-rw-r--r-- | src/common/workqueue.c | 118 | ||||
-rw-r--r-- | src/common/workqueue.h | 14 | ||||
-rw-r--r-- | src/test/test_workqueue.c | 4 |
4 files changed, 126 insertions, 15 deletions
diff --git a/changes/multi-priority b/changes/multi-priority new file mode 100644 index 0000000000..6f19314b53 --- /dev/null +++ b/changes/multi-priority @@ -0,0 +1,5 @@ + o Minor features (relay, thread pool): + - Allow background work to be queued with different priorities, so + that a big pile of slow low-priority jobs will not starve out + higher priority jobs. This lays the groundwork for a fix for bug + 22883. diff --git a/src/common/workqueue.c b/src/common/workqueue.c index ec6e257b4c..aae55589ac 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -25,11 +25,19 @@ #include "orconfig.h" #include "compat.h" #include "compat_threads.h" +#include "crypto.h" #include "util.h" #include "workqueue.h" #include "tor_queue.h" #include "torlog.h" +#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH +#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW +#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1) + +TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_s); +typedef struct work_tailq_t work_tailq_t; + struct threadpool_s { /** An array of pointers to workerthread_t: one for each running worker * thread. */ @@ -38,8 +46,12 @@ struct threadpool_s { /** Condition variable that we wait on when we have no work, and which * gets signaled when our queue becomes nonempty. */ tor_cond_t condition; - /** Queue of pending work that we have to do. */ - TOR_TAILQ_HEAD(, workqueue_entry_s) work; + /** Queues of pending work that we have to do. The queue with priority + * <b>p</b> is work[p]. */ + work_tailq_t work[WORKQUEUE_N_PRIORITIES]; + + /** Weak RNG, used to decide when to ignore priority. */ + tor_weak_rng_t weak_rng; /** The current 'update generation' of the threadpool. Any thread that is * at an earlier generation needs to run the update function. */ @@ -66,6 +78,11 @@ struct threadpool_s { void *new_thread_state_arg; }; +/** Used to put a workqueue_priority_t value into a bitfield. */ +#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t) +/** Number of bits needed to hold all legal values of workqueue_priority_t */ +#define WORKQUEUE_PRIORITY_BITS 2 + struct workqueue_entry_s { /** The next workqueue_entry_t that's pending on the same thread or * reply queue. */ @@ -76,6 +93,8 @@ struct workqueue_entry_s { struct threadpool_s *on_pool; /** True iff this entry is waiting for a worker to start processing it. */ uint8_t pending; + /** Priority of this entry. */ + workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS; /** Function to run in the worker thread. */ workqueue_reply_t (*fn)(void *state, void *arg); /** Function to run while processing the reply queue. */ @@ -125,6 +144,7 @@ workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*), ent->fn = fn; ent->reply_fn = reply_fn; ent->arg = arg; + ent->priority = WQ_PRI_HIGH; return ent; } @@ -161,8 +181,9 @@ workqueue_entry_cancel(workqueue_entry_t *ent) int cancelled = 0; void *result = NULL; tor_mutex_acquire(&ent->on_pool->lock); + workqueue_priority_t prio = ent->priority; if (ent->pending) { - TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work); + TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work); cancelled = 1; result = ent->arg; } @@ -180,8 +201,50 @@ workqueue_entry_cancel(workqueue_entry_t *ent) static int worker_thread_has_work(workerthread_t *thread) { - return !TOR_TAILQ_EMPTY(&thread->in_pool->work) || - thread->generation != thread->in_pool->generation; + unsigned i; + for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { + if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i])) + return 1; + } + return thread->generation != thread->in_pool->generation; +} + +/** With this probability, we'll consider taking work from a lower-priority + * queue when we already have higher-priority work. This keeps priority from + * becoming completely inverted. */ +#define LOWER_PRIORITY_CHANCE 37 + +/** Extract the next workqueue_entry_t from the the thread's pool, removing + * it from the relevant queues and marking it as non-pending. + * + * The caller must hold the lock. */ +static workqueue_entry_t * +worker_thread_extract_next_work(workerthread_t *thread) +{ + threadpool_t *pool = thread->in_pool; + work_tailq_t *queue = NULL, *this_queue; + unsigned i; + for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { + this_queue = &pool->work[i]; + if (!TOR_TAILQ_EMPTY(this_queue)) { + queue = this_queue; + if (! tor_weak_random_one_in_n(&pool->weak_rng, LOWER_PRIORITY_CHANCE)) { + /* Usually we'll just break now, so that we can get out of the loop + * and use the queue where we found work. But with a small + * probability, we'll keep looking for lower priority work, so that + * we don't ignore our low-priority queues entirely. */ + break; + } + } + } + + if (queue == NULL) + return NULL; + + workqueue_entry_t *work = TOR_TAILQ_FIRST(queue); + TOR_TAILQ_REMOVE(queue, work, next_work); + work->pending = 0; + return work; } /** @@ -217,9 +280,9 @@ worker_thread_main(void *thread_) tor_mutex_acquire(&pool->lock); continue; } - work = TOR_TAILQ_FIRST(&pool->work); - TOR_TAILQ_REMOVE(&pool->work, work, next_work); - work->pending = 0; + work = worker_thread_extract_next_work(thread); + if (BUG(work == NULL)) + break; tor_mutex_release(&pool->lock); /* We run the work function without holding the thread lock. This @@ -301,22 +364,31 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) * On success, return a workqueue_entry_t object that can be passed to * workqueue_entry_cancel(). On failure, return NULL. * + * Items are executed in a loose priority order -- each thread will usually + * take from the queued work with the highest prioirity, but will occasionally + * visit lower-priority queues to keep them from starving completely. + * * Note that because each thread has its own work queue, work items may not * be executed strictly in order. */ workqueue_entry_t * -threadpool_queue_work(threadpool_t *pool, - workqueue_reply_t (*fn)(void *, void *), - void (*reply_fn)(void *), - void *arg) +threadpool_queue_work_priority(threadpool_t *pool, + workqueue_priority_t prio, + workqueue_reply_t (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg) { + tor_assert(prio >= WORKQUEUE_PRIORITY_FIRST && + prio <= WORKQUEUE_PRIORITY_LAST); + workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); ent->on_pool = pool; ent->pending = 1; + ent->priority = prio; tor_mutex_acquire(&pool->lock); - TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work); + TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work); tor_cond_signal_one(&pool->condition); @@ -325,6 +397,16 @@ threadpool_queue_work(threadpool_t *pool, return ent; } +/** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */ +workqueue_entry_t * +threadpool_queue_work(threadpool_t *pool, + workqueue_reply_t (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg) +{ + return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg); +} + /** * Queue a copy of a work item for every thread in a pool. This can be used, * for example, to tell the threads to update some parameter in their states. @@ -441,7 +523,15 @@ threadpool_new(int n_threads, pool = tor_malloc_zero(sizeof(threadpool_t)); tor_mutex_init_nonrecursive(&pool->lock); tor_cond_init(&pool->condition); - TOR_TAILQ_INIT(&pool->work); + unsigned i; + for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { + TOR_TAILQ_INIT(&pool->work[i]); + } + { + unsigned seed; + crypto_rand((void*)&seed, sizeof(seed)); + tor_init_weak_random(&pool->weak_rng, seed); + } pool->new_thread_state_fn = new_thread_state_fn; pool->new_thread_state_arg = arg; diff --git a/src/common/workqueue.h b/src/common/workqueue.h index 7b483eb7ac..d2508f5329 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -22,6 +22,20 @@ typedef enum workqueue_reply_t { WQ_RPL_SHUTDOWN = 2, /** indicates thread is shutting down */ } workqueue_reply_t; +/** Possible priorities for work. Lower numeric values are more important. */ +typedef enum workqueue_priority_t { + WQ_PRI_HIGH = 0, + WQ_PRI_MED = 1, + WQ_PRI_LOW = 2, +} workqueue_priority_t; + +workqueue_entry_t *threadpool_queue_work_priority(threadpool_t *pool, + workqueue_priority_t prio, + workqueue_reply_t (*fn)(void *, + void *), + void (*reply_fn)(void *), + void *arg); + workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, workqueue_reply_t (*fn)(void *, void *), diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c index 10714457f7..6fa46f90d4 100644 --- a/src/test/test_workqueue.c +++ b/src/test/test_workqueue.c @@ -200,7 +200,9 @@ add_work(threadpool_t *tp) crypto_rand((char*)w->msg, 20); w->msglen = 20; ++rsa_sent; - return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w); + return threadpool_queue_work_priority(tp, + WQ_PRI_MED, + workqueue_do_rsa, handle_reply, w); } else { ecdh_work_t *w = tor_malloc_zero(sizeof(*w)); w->serial = n_sent++; |