diff options
Diffstat (limited to 'src/common/workqueue.c')
-rw-r--r-- | src/common/workqueue.c | 39 |
1 files changed, 24 insertions, 15 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c index c1bd6d4e8b..c467bdf43b 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -25,7 +25,7 @@ struct threadpool_s { unsigned generation; /** Function that should be run for updates on each thread. */ - int (*update_fn)(void *, void *); + workqueue_reply_t (*update_fn)(void *, void *); /** Function to free update arguments if they can't be run. */ void (*free_update_arg_fn)(void *); /** Array of n_threads update arguments. */ @@ -56,7 +56,7 @@ struct workqueue_entry_s { /** True iff this entry is waiting for a worker to start processing it. */ uint8_t pending; /** Function to run in the worker thread. */ - int (*fn)(void *state, void *arg); + workqueue_reply_t (*fn)(void *state, void *arg); /** Function to run while processing the reply queue. */ void (*reply_fn)(void *arg); /** Argument for the above functions. */ @@ -96,7 +96,7 @@ static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main * thread. See threadpool_queue_work() for full documentation. */ static workqueue_entry_t * -workqueue_entry_new(int (*fn)(void*, void*), +workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*), void (*reply_fn)(void*), void *arg) { @@ -172,7 +172,7 @@ worker_thread_main(void *thread_) workerthread_t *thread = thread_; threadpool_t *pool = thread->in_pool; workqueue_entry_t *work; - int result; + workqueue_reply_t result; tor_mutex_acquire(&pool->lock); while (1) { @@ -182,13 +182,14 @@ worker_thread_main(void *thread_) if (thread->in_pool->generation != thread->generation) { void *arg = thread->in_pool->update_args[thread->index]; thread->in_pool->update_args[thread->index] = NULL; - int (*update_fn)(void*,void*) = thread->in_pool->update_fn; + workqueue_reply_t (*update_fn)(void*,void*) = + thread->in_pool->update_fn; thread->generation = thread->in_pool->generation; tor_mutex_release(&pool->lock); - int r = update_fn(thread->state, arg); + workqueue_reply_t r = update_fn(thread->state, arg); - if (r < 0) { + if (r != WQ_RPL_REPLY) { return; } @@ -208,7 +209,7 @@ worker_thread_main(void *thread_) queue_reply(thread->reply_queue, work); /* We may need to exit the thread. */ - if (result >= WQ_RPL_ERROR) { + if (result != WQ_RPL_REPLY) { return; } tor_mutex_acquire(&pool->lock); @@ -255,6 +256,7 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) if (spawn_func(worker_thread_main, thr) < 0) { log_err(LD_GENERAL, "Can't launch worker thread."); + tor_free(thr); return NULL; } @@ -280,7 +282,7 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) */ workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, - int (*fn)(void *, void *), + workqueue_reply_t (*fn)(void *, void *), void (*reply_fn)(void *), void *arg) { @@ -292,10 +294,10 @@ threadpool_queue_work(threadpool_t *pool, TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work); - tor_mutex_release(&pool->lock); - tor_cond_signal_one(&pool->condition); + tor_mutex_release(&pool->lock); + return ent; } @@ -317,7 +319,7 @@ threadpool_queue_work(threadpool_t *pool, int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), - int (*fn)(void *, void *), + workqueue_reply_t (*fn)(void *, void *), void (*free_fn)(void *), void *arg) { @@ -344,10 +346,10 @@ threadpool_queue_update(threadpool_t *pool, pool->update_fn = fn; ++pool->generation; - tor_mutex_release(&pool->lock); - tor_cond_signal_all(&pool->condition); + tor_mutex_release(&pool->lock); + if (old_args) { for (i = 0; i < n_threads; ++i) { if (old_args[i] && old_args_free_fn) @@ -359,12 +361,17 @@ threadpool_queue_update(threadpool_t *pool, return 0; } +/** Don't have more than this many threads per pool. */ +#define MAX_THREADS 1024 + /** Launch threads until we have <b>n</b>. */ static int threadpool_start_threads(threadpool_t *pool, int n) { if (n < 0) return -1; + if (n > MAX_THREADS) + n = MAX_THREADS; tor_mutex_acquire(&pool->lock); @@ -377,6 +384,7 @@ threadpool_start_threads(threadpool_t *pool, int n) workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue); if (!thr) { + pool->free_thread_state_fn(state); tor_mutex_release(&pool->lock); return -1; } @@ -473,7 +481,8 @@ replyqueue_process(replyqueue_t *queue) if (queue->alert.drain_fn(queue->alert.read_fd) < 0) { static ratelim_t warn_limit = RATELIM_INIT(7200); log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL, - "Failure from drain_fd"); + "Failure from drain_fd: %s", + tor_socket_strerror(tor_socket_errno(queue->alert.read_fd))); } tor_mutex_acquire(&queue->lock); |