diff options
-rw-r--r-- | src/common/compat_pthreads.c | 1 | ||||
-rw-r--r-- | src/common/workqueue.c | 206 | ||||
-rw-r--r-- | src/common/workqueue.h | 10 | ||||
-rw-r--r-- | src/or/cpuworker.c | 15 | ||||
-rw-r--r-- | src/test/test_workqueue.c | 23 |
5 files changed, 141 insertions, 114 deletions
diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c index 848bfe0973..f43480539f 100644 --- a/src/common/compat_pthreads.c +++ b/src/common/compat_pthreads.c @@ -5,6 +5,7 @@ #include "orconfig.h" #include <pthread.h> +#include <signal.h> #include "compat.h" #include "torlog.h" diff --git a/src/common/workqueue.c b/src/common/workqueue.c index 7fa8967580..5ba29e3b26 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -1,4 +1,4 @@ -/* Copyright (c) 2013, The Tor Project, Inc. */ +/* Copyright (c) 2013-2015, The Tor Project, Inc. */ /* See LICENSE for licensing information */ #include "orconfig.h" @@ -13,8 +13,23 @@ struct threadpool_s { /** An array of pointers to workerthread_t: one for each running worker * thread. */ struct workerthread_s **threads; - /** Index of the next thread that we'll give work to.*/ - int next_for_work; + + /** Condition variable that we wait on when we have no work, and which + * gets signaled when our queue becomes nonempty. */ + tor_cond_t condition; + /** Queue of pending work that we have to do. */ + TOR_TAILQ_HEAD(, workqueue_entry_s) work; + + /** The current 'update generation' of the threadpool. Any thread that is + * at an earlier generation needs to run the update function. */ + unsigned generation; + + /** Function that should be run for updates on each thread. */ + int (*update_fn)(void *, void *); + /** Function to free update arguments if they can't be run. */ + void (*free_update_arg_fn)(void *); + /** Array of n_threads update arguments. */ + void **update_args; /** Number of elements in threads. */ int n_threads; @@ -34,10 +49,10 @@ struct workqueue_entry_s { /** The next workqueue_entry_t that's pending on the same thread or * reply queue. */ TOR_TAILQ_ENTRY(workqueue_entry_s) next_work; - /** The thread to which this workqueue_entry_t was assigned. This field + /** The threadpool to which this workqueue_entry_t was assigned. This field * is set when the workqueue_entry_t is created, and won't be cleared until * after it's handled in the main thread. */ - struct workerthread_s *on_thread; + struct threadpool_s *on_pool; /** True iff this entry is waiting for a worker to start processing it. */ uint8_t pending; /** Function to run in the worker thread. */ @@ -62,13 +77,10 @@ struct replyqueue_s { * contention, each gets its own queue. This breaks the guarantee that that * queued work will get executed strictly in order. */ typedef struct workerthread_s { - /** Lock to protect all fields of this thread and its queue. */ - tor_mutex_t lock; - /** Condition variable that we wait on when we have no work, and which - * gets signaled when our queue becomes nonempty. */ - tor_cond_t condition; - /** Queue of pending work that we have to do. */ - TOR_TAILQ_HEAD(, workqueue_entry_s) work; + /** Which thread it this? In range 0..in_pool->n_threads-1 */ + int index; + /** The pool this thread is a part of. */ + struct threadpool_s *in_pool; /** True iff this thread is currently in its loop. (Not currently used.) */ unsigned is_running; /** True iff this thread has crashed or is shut down for some reason. (Not @@ -81,6 +93,8 @@ typedef struct workerthread_s { void *state; /** Reply queue to which we pass our results. */ replyqueue_t *reply_queue; + /** The current update generation of this thread */ + unsigned generation; } workerthread_t; static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); @@ -132,13 +146,13 @@ workqueue_entry_cancel(workqueue_entry_t *ent) { int cancelled = 0; void *result = NULL; - tor_mutex_acquire(&ent->on_thread->lock); + tor_mutex_acquire(&ent->on_pool->lock); if (ent->pending) { - TOR_TAILQ_REMOVE(&ent->on_thread->work, ent, next_work); + TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work); cancelled = 1; result = ent->arg; } - tor_mutex_release(&ent->on_thread->lock); + tor_mutex_release(&ent->on_pool->lock); if (cancelled) { tor_free(ent); @@ -146,6 +160,16 @@ workqueue_entry_cancel(workqueue_entry_t *ent) return result; } +/**DOCDOC + + must hold lock */ +static int +worker_thread_has_work(workerthread_t *thread) +{ + return !TOR_TAILQ_EMPTY(&thread->in_pool->work) || + thread->generation != thread->in_pool->generation; +} + /** * Main function for the worker thread. */ @@ -153,20 +177,39 @@ static void worker_thread_main(void *thread_) { workerthread_t *thread = thread_; + threadpool_t *pool = thread->in_pool; workqueue_entry_t *work; int result; - tor_mutex_acquire(&thread->lock); thread->is_running = 1; + + tor_mutex_acquire(&pool->lock); while (1) { /* lock must be held at this point. */ - while (!TOR_TAILQ_EMPTY(&thread->work)) { + while (worker_thread_has_work(thread)) { /* lock must be held at this point. */ - - work = TOR_TAILQ_FIRST(&thread->work); - TOR_TAILQ_REMOVE(&thread->work, work, next_work); + if (thread->in_pool->generation != thread->generation) { + void *arg = thread->in_pool->update_args[thread->index]; + thread->in_pool->update_args[thread->index] = NULL; + int (*update_fn)(void*,void*) = thread->in_pool->update_fn; + thread->generation = thread->in_pool->generation; + tor_mutex_release(&pool->lock); + + int r = update_fn(thread->state, arg); + + if (r < 0) { + thread->is_running = 0; + thread->is_shut_down = 1; + return; + } + + tor_mutex_acquire(&pool->lock); + continue; + } + work = TOR_TAILQ_FIRST(&pool->work); + TOR_TAILQ_REMOVE(&pool->work, work, next_work); work->pending = 0; - tor_mutex_release(&thread->lock); + tor_mutex_release(&pool->lock); /* We run the work function without holding the thread lock. This * is the main thread's first opportunity to give us more work. */ @@ -175,25 +218,23 @@ worker_thread_main(void *thread_) /* Queue the reply for the main thread. */ queue_reply(thread->reply_queue, work); - tor_mutex_acquire(&thread->lock); /* We may need to exit the thread. */ if (result >= WQ_RPL_ERROR) { thread->is_running = 0; thread->is_shut_down = 1; - tor_mutex_release(&thread->lock); return; } + tor_mutex_acquire(&pool->lock); } /* At this point the lock is held, and there is no work in this thread's * queue. */ - /* TODO: Try work-stealing. */ /* TODO: support an idle-function */ /* Okay. Now, wait till somebody has work for us. */ /* XXXX we could just omit waiting and instead */ thread->waiting = 1; - if (tor_cond_wait(&thread->condition, &thread->lock, NULL) < 0) { + if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) { /* XXXX ERROR */ } thread->waiting = 0; @@ -221,14 +262,12 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work) /** Allocate and start a new worker thread to use state object <b>state</b>, * and send responses to <b>replyqueue</b>. */ static workerthread_t * -workerthread_new(void *state, replyqueue_t *replyqueue) +workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) { workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); - tor_mutex_init_for_cond(&thr->lock); - tor_cond_init(&thr->condition); - TOR_TAILQ_INIT(&thr->work); thr->state = state; thr->reply_queue = replyqueue; + thr->in_pool = pool; if (spawn_func(worker_thread_main, thr) < 0) { log_err(LD_GENERAL, "Can't launch worker thread."); @@ -239,30 +278,6 @@ workerthread_new(void *state, replyqueue_t *replyqueue) } /** - * Add an item of work to a single worker thread. See threadpool_queue_work(*) - * for arguments. - */ -static workqueue_entry_t * -workerthread_queue_work(workerthread_t *worker, - int (*fn)(void *, void *), - void (*reply_fn)(void *), - void *arg) -{ - workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); - - tor_mutex_acquire(&worker->lock); - ent->on_thread = worker; - ent->pending = 1; - TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work); - - if (worker->waiting) /* XXXX inside or outside of lock?? */ - tor_cond_signal_one(&worker->condition); - - tor_mutex_release(&worker->lock); - return ent; -} - -/** * Queue an item of work for a thread in a thread pool. The function * <b>fn</b> will be run in a worker thread, and will receive as arguments the * thread's state object, and the provided object <b>arg</b>. It must return @@ -285,20 +300,19 @@ threadpool_queue_work(threadpool_t *pool, void (*reply_fn)(void *), void *arg) { - workerthread_t *worker; + workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); + ent->on_pool = pool; + ent->pending = 1; tor_mutex_acquire(&pool->lock); - /* Pick the next thread in random-access order. */ - worker = pool->threads[pool->next_for_work++]; - if (!worker) { - tor_mutex_release(&pool->lock); - return NULL; - } - if (pool->next_for_work >= pool->n_threads) - pool->next_for_work = 0; + + TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work); + tor_mutex_release(&pool->lock); - return workerthread_queue_work(worker, fn, reply_fn, arg); + tor_cond_signal_one(&pool->condition); + + return ent; } /** @@ -309,30 +323,56 @@ threadpool_queue_work(threadpool_t *pool, * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to * make a copy of it. * + * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update + * will be run. If a new update is scheduled before the old update finishes + * running, then the new will replace the old in any threads that haven't run + * it yet. + * * Return 0 on success, -1 on failure. */ int -threadpool_queue_for_all(threadpool_t *pool, +threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), int (*fn)(void *, void *), - void (*reply_fn)(void *), + void (*free_fn)(void *), void *arg) { - int i = 0; - workerthread_t *worker; - void *arg_copy; - while (1) { - tor_mutex_acquire(&pool->lock); - if (i >= pool->n_threads) { - tor_mutex_release(&pool->lock); - return 0; - } - worker = pool->threads[i++]; - tor_mutex_release(&pool->lock); + int i, n_threads; + void (*old_args_free_fn)(void *arg); + void **old_args; + void **new_args; + + tor_mutex_acquire(&pool->lock); + n_threads = pool->n_threads; + old_args = pool->update_args; + old_args_free_fn = pool->free_update_arg_fn; + + new_args = tor_calloc(n_threads, sizeof(void*)); + for (i = 0; i < n_threads; ++i) { + if (dup_fn) + new_args[i] = dup_fn(arg); + else + new_args[i] = arg; + } + + pool->update_args = new_args; + pool->free_update_arg_fn = free_fn; + pool->update_fn = fn; + ++pool->generation; - arg_copy = dup_fn ? dup_fn(arg) : arg; - /* CHECK*/ workerthread_queue_work(worker, fn, reply_fn, arg_copy); + tor_mutex_release(&pool->lock); + + tor_cond_signal_all(&pool->condition); + + if (old_args) { + for (i = 0; i < n_threads; ++i) { + if (old_args[i] && old_args_free_fn) + old_args_free_fn(old_args[i]); + } + tor_free(old_args); } + + return 0; } /** Launch threads until we have <b>n</b>. */ @@ -346,7 +386,8 @@ threadpool_start_threads(threadpool_t *pool, int n) while (pool->n_threads < n) { void *state = pool->new_thread_state_fn(pool->new_thread_state_arg); - workerthread_t *thr = workerthread_new(state, pool->reply_queue); + workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue); + thr->index = pool->n_threads; if (!thr) { tor_mutex_release(&pool->lock); @@ -375,7 +416,10 @@ threadpool_new(int n_threads, { threadpool_t *pool; pool = tor_malloc_zero(sizeof(threadpool_t)); - tor_mutex_init(&pool->lock); + tor_mutex_init_nonrecursive(&pool->lock); + tor_cond_init(&pool->condition); + TOR_TAILQ_INIT(&pool->work); + pool->new_thread_state_fn = new_thread_state_fn; pool->new_thread_state_arg = arg; pool->free_thread_state_fn = free_thread_state_fn; @@ -447,7 +491,7 @@ replyqueue_process(replyqueue_t *queue) workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); TOR_TAILQ_REMOVE(&queue->answers, work, next_work); tor_mutex_release(&queue->lock); - work->on_thread = NULL; + work->on_pool = NULL; work->reply_fn(work->arg); workqueue_entry_free(work); diff --git a/src/common/workqueue.h b/src/common/workqueue.h index aa1bcc518a..92e82b8a48 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -27,11 +27,11 @@ workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, int (*fn)(void *, void *), void (*reply_fn)(void *), void *arg); -int threadpool_queue_for_all(threadpool_t *pool, - void *(*dup_fn)(void *), - int (*fn)(void *, void *), - void (*reply_fn)(void *), - void *arg); +int threadpool_queue_update(threadpool_t *pool, + void *(*dup_fn)(void *), + int (*fn)(void *, void *), + void (*free_fn)(void *), + void *arg); void *workqueue_entry_cancel(workqueue_entry_t *pending_work); threadpool_t *threadpool_new(int n_threads, replyqueue_t *replyqueue, diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c index 36ca505fe3..3f129ded99 100644 --- a/src/or/cpuworker.c +++ b/src/or/cpuworker.c @@ -170,11 +170,6 @@ update_state_threadfn(void *state_, void *work_) ++state->generation; return WQ_RPL_REPLY; } -static void -update_state_replyfn(void *work_) -{ - tor_free(work_); -} /** Called when the onion key has changed and we need to spawn new * cpuworkers. Close all currently idle cpuworkers, and mark the last @@ -183,11 +178,11 @@ update_state_replyfn(void *work_) void cpuworkers_rotate_keyinfo(void) { - if (threadpool_queue_for_all(threadpool, - worker_state_new, - update_state_threadfn, - update_state_replyfn, - NULL)) { + if (threadpool_queue_update(threadpool, + worker_state_new, + update_state_threadfn, + worker_state_free, + NULL)) { log_warn(LD_OR, "Failed to queue key update for worker threads."); } } diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c index 410f43cce4..8ce4405062 100644 --- a/src/test/test_workqueue.c +++ b/src/test/test_workqueue.c @@ -132,7 +132,6 @@ new_state(void *arg) /* Every thread gets its own keys. not a problem for benchmarking */ st->rsa = crypto_pk_new(); if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) { - puts("keygen failed"); crypto_pk_free(st->rsa); tor_free(st); return NULL; @@ -213,7 +212,6 @@ add_n_work_items(threadpool_t *tp, int n) while (n_queued++ < n) { ent = add_work(tp); if (! ent) { - puts("Couldn't add work."); tor_event_base_loopexit(tor_libevent_get_base(), NULL); return -1; } @@ -238,18 +236,6 @@ add_n_work_items(threadpool_t *tp, int n) } static int shutting_down = 0; -static int n_shutdowns_done = 0; - -static void -shutdown_reply(void *arg) -{ - (void)arg; - tor_assert(shutting_down); - ++n_shutdowns_done; - if (n_shutdowns_done == opt_n_threads) { - tor_event_base_loopexit(tor_libevent_get_base(), NULL); - } -} static void replysock_readable_cb(tor_socket_t sock, short what, void *arg) @@ -297,8 +283,8 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg) n_received+n_successful_cancel == n_sent && n_sent >= opt_n_items) { shutting_down = 1; - threadpool_queue_for_all(tp, NULL, - workqueue_do_shutdown, shutdown_reply, NULL); + threadpool_queue_update(tp, NULL, + workqueue_do_shutdown, NULL, NULL); } } @@ -410,8 +396,9 @@ main(int argc, char **argv) event_base_loop(tor_libevent_get_base(), 0); - if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent || - n_shutdowns_done != opt_n_threads) { + if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent) { + printf("%d vs %d\n", n_sent, opt_n_items); + printf("%d+%d vs %d\n", n_received, n_successful_cancel, n_sent); puts("FAIL"); return 1; } else { |