aboutsummaryrefslogtreecommitdiff
path: root/src/common/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/workqueue.c')
-rw-r--r--src/common/workqueue.c150
1 files changed, 127 insertions, 23 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index e1fb663a2a..42723224d3 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -25,11 +25,19 @@
#include "orconfig.h"
#include "compat.h"
#include "compat_threads.h"
+#include "crypto.h"
#include "util.h"
#include "workqueue.h"
#include "tor_queue.h"
#include "torlog.h"
+#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
+#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
+#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
+
+TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_s);
+typedef struct work_tailq_t work_tailq_t;
+
struct threadpool_s {
/** An array of pointers to workerthread_t: one for each running worker
* thread. */
@@ -38,8 +46,12 @@ struct threadpool_s {
/** Condition variable that we wait on when we have no work, and which
* gets signaled when our queue becomes nonempty. */
tor_cond_t condition;
- /** Queue of pending work that we have to do. */
- TOR_TAILQ_HEAD(, workqueue_entry_s) work;
+ /** Queues of pending work that we have to do. The queue with priority
+ * <b>p</b> is work[p]. */
+ work_tailq_t work[WORKQUEUE_N_PRIORITIES];
+
+ /** Weak RNG, used to decide when to ignore priority. */
+ tor_weak_rng_t weak_rng;
/** The current 'update generation' of the threadpool. Any thread that is
* at an earlier generation needs to run the update function. */
@@ -66,6 +78,11 @@ struct threadpool_s {
void *new_thread_state_arg;
};
+/** Used to put a workqueue_priority_t value into a bitfield. */
+#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
+/** Number of bits needed to hold all legal values of workqueue_priority_t */
+#define WORKQUEUE_PRIORITY_BITS 2
+
struct workqueue_entry_s {
/** The next workqueue_entry_t that's pending on the same thread or
* reply queue. */
@@ -76,6 +93,8 @@ struct workqueue_entry_s {
struct threadpool_s *on_pool;
/** True iff this entry is waiting for a worker to start processing it. */
uint8_t pending;
+ /** Priority of this entry. */
+ workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS;
/** Function to run in the worker thread. */
workqueue_reply_t (*fn)(void *state, void *arg);
/** Function to run while processing the reply queue. */
@@ -94,9 +113,7 @@ struct replyqueue_s {
alert_sockets_t alert;
};
-/** A worker thread represents a single thread in a thread pool. To avoid
- * contention, each gets its own queue. This breaks the guarantee that that
- * queued work will get executed strictly in order. */
+/** A worker thread represents a single thread in a thread pool. */
typedef struct workerthread_s {
/** Which thread it this? In range 0..in_pool->n_threads-1 */
int index;
@@ -109,6 +126,8 @@ typedef struct workerthread_s {
replyqueue_t *reply_queue;
/** The current update generation of this thread */
unsigned generation;
+ /** One over the probability of taking work from a lower-priority queue. */
+ int32_t lower_priority_chance;
} workerthread_t;
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
@@ -125,6 +144,7 @@ workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
ent->fn = fn;
ent->reply_fn = reply_fn;
ent->arg = arg;
+ ent->priority = WQ_PRI_HIGH;
return ent;
}
@@ -161,8 +181,9 @@ workqueue_entry_cancel(workqueue_entry_t *ent)
int cancelled = 0;
void *result = NULL;
tor_mutex_acquire(&ent->on_pool->lock);
+ workqueue_priority_t prio = ent->priority;
if (ent->pending) {
- TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work);
+ TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
cancelled = 1;
result = ent->arg;
}
@@ -180,8 +201,46 @@ workqueue_entry_cancel(workqueue_entry_t *ent)
static int
worker_thread_has_work(workerthread_t *thread)
{
- return !TOR_TAILQ_EMPTY(&thread->in_pool->work) ||
- thread->generation != thread->in_pool->generation;
+ unsigned i;
+ for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
+ if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
+ return 1;
+ }
+ return thread->generation != thread->in_pool->generation;
+}
+
+/** Extract the next workqueue_entry_t from the the thread's pool, removing
+ * it from the relevant queues and marking it as non-pending.
+ *
+ * The caller must hold the lock. */
+static workqueue_entry_t *
+worker_thread_extract_next_work(workerthread_t *thread)
+{
+ threadpool_t *pool = thread->in_pool;
+ work_tailq_t *queue = NULL, *this_queue;
+ unsigned i;
+ for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
+ this_queue = &pool->work[i];
+ if (!TOR_TAILQ_EMPTY(this_queue)) {
+ queue = this_queue;
+ if (! tor_weak_random_one_in_n(&pool->weak_rng,
+ thread->lower_priority_chance)) {
+ /* Usually we'll just break now, so that we can get out of the loop
+ * and use the queue where we found work. But with a small
+ * probability, we'll keep looking for lower priority work, so that
+ * we don't ignore our low-priority queues entirely. */
+ break;
+ }
+ }
+ }
+
+ if (queue == NULL)
+ return NULL;
+
+ workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
+ TOR_TAILQ_REMOVE(queue, work, next_work);
+ work->pending = 0;
+ return work;
}
/**
@@ -217,9 +276,9 @@ worker_thread_main(void *thread_)
tor_mutex_acquire(&pool->lock);
continue;
}
- work = TOR_TAILQ_FIRST(&pool->work);
- TOR_TAILQ_REMOVE(&pool->work, work, next_work);
- work->pending = 0;
+ work = worker_thread_extract_next_work(thread);
+ if (BUG(work == NULL))
+ break;
tor_mutex_release(&pool->lock);
/* We run the work function without holding the thread lock. This
@@ -268,12 +327,14 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
/** Allocate and start a new worker thread to use state object <b>state</b>,
* and send responses to <b>replyqueue</b>. */
static workerthread_t *
-workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
+workerthread_new(int32_t lower_priority_chance,
+ void *state, threadpool_t *pool, replyqueue_t *replyqueue)
{
workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
thr->state = state;
thr->reply_queue = replyqueue;
thr->in_pool = pool;
+ thr->lower_priority_chance = lower_priority_chance;
if (spawn_func(worker_thread_main, thr) < 0) {
//LCOV_EXCL_START
@@ -299,24 +360,34 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
* function's responsibility to free the work object.
*
* On success, return a workqueue_entry_t object that can be passed to
- * workqueue_entry_cancel(). On failure, return NULL.
+ * workqueue_entry_cancel(). On failure, return NULL. (Failure is not
+ * currently possible, but callers should check anyway.)
+ *
+ * Items are executed in a loose priority order -- each thread will usually
+ * take from the queued work with the highest prioirity, but will occasionally
+ * visit lower-priority queues to keep them from starving completely.
*
- * Note that because each thread has its own work queue, work items may not
+ * Note that because of priorities and thread behavior, work items may not
* be executed strictly in order.
*/
workqueue_entry_t *
-threadpool_queue_work(threadpool_t *pool,
- workqueue_reply_t (*fn)(void *, void *),
- void (*reply_fn)(void *),
- void *arg)
+threadpool_queue_work_priority(threadpool_t *pool,
+ workqueue_priority_t prio,
+ workqueue_reply_t (*fn)(void *, void *),
+ void (*reply_fn)(void *),
+ void *arg)
{
+ tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
+ ((int)prio) <= WORKQUEUE_PRIORITY_LAST);
+
workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
ent->on_pool = pool;
ent->pending = 1;
+ ent->priority = prio;
tor_mutex_acquire(&pool->lock);
- TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work);
+ TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
tor_cond_signal_one(&pool->condition);
@@ -325,6 +396,16 @@ threadpool_queue_work(threadpool_t *pool,
return ent;
}
+/** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
+workqueue_entry_t *
+threadpool_queue_work(threadpool_t *pool,
+ workqueue_reply_t (*fn)(void *, void *),
+ void (*reply_fn)(void *),
+ void *arg)
+{
+ return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
+}
+
/**
* Queue a copy of a work item for every thread in a pool. This can be used,
* for example, to tell the threads to update some parameter in their states.
@@ -388,6 +469,14 @@ threadpool_queue_update(threadpool_t *pool,
/** Don't have more than this many threads per pool. */
#define MAX_THREADS 1024
+/** For half of our threads, choose lower priority queues with probability
+ * 1/N for each of these values. Both are chosen somewhat arbitrarily. If
+ * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks
+ * stalling forever. If it's too high, we have a risk of low-priority tasks
+ * grabbing half of the threads. */
+#define CHANCE_PERMISSIVE 37
+#define CHANCE_STRICT INT32_MAX
+
/** Launch threads until we have <b>n</b>. */
static int
threadpool_start_threads(threadpool_t *pool, int n)
@@ -404,8 +493,14 @@ threadpool_start_threads(threadpool_t *pool, int n)
sizeof(workerthread_t*), n);
while (pool->n_threads < n) {
+ /* For half of our threads, we'll choose lower priorities permissively;
+ * for the other half, we'll stick more strictly to higher priorities.
+ * This keeps slow low-priority tasks from taking over completely. */
+ int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE;
+
void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
- workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue);
+ workerthread_t *thr = workerthread_new(chance,
+ state, pool, pool->reply_queue);
if (!thr) {
//LCOV_EXCL_START
@@ -441,7 +536,15 @@ threadpool_new(int n_threads,
pool = tor_malloc_zero(sizeof(threadpool_t));
tor_mutex_init_nonrecursive(&pool->lock);
tor_cond_init(&pool->condition);
- TOR_TAILQ_INIT(&pool->work);
+ unsigned i;
+ for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
+ TOR_TAILQ_INIT(&pool->work[i]);
+ }
+ {
+ unsigned seed;
+ crypto_rand((void*)&seed, sizeof(seed));
+ tor_init_weak_random(&pool->weak_rng, seed);
+ }
pool->new_thread_state_fn = new_thread_state_fn;
pool->new_thread_state_arg = arg;
@@ -510,12 +613,13 @@ replyqueue_get_socket(replyqueue_t *rq)
void
replyqueue_process(replyqueue_t *queue)
{
- if (queue->alert.drain_fn(queue->alert.read_fd) < 0) {
+ int r = queue->alert.drain_fn(queue->alert.read_fd);
+ if (r < 0) {
//LCOV_EXCL_START
static ratelim_t warn_limit = RATELIM_INIT(7200);
log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
"Failure from drain_fd: %s",
- tor_socket_strerror(tor_socket_errno(queue->alert.read_fd)));
+ tor_socket_strerror(-r));
//LCOV_EXCL_STOP
}