aboutsummaryrefslogtreecommitdiff
path: root/src/common/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/workqueue.c')
-rw-r--r--src/common/workqueue.c39
1 files changed, 24 insertions, 15 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index c1bd6d4e8b..c467bdf43b 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -25,7 +25,7 @@ struct threadpool_s {
unsigned generation;
/** Function that should be run for updates on each thread. */
- int (*update_fn)(void *, void *);
+ workqueue_reply_t (*update_fn)(void *, void *);
/** Function to free update arguments if they can't be run. */
void (*free_update_arg_fn)(void *);
/** Array of n_threads update arguments. */
@@ -56,7 +56,7 @@ struct workqueue_entry_s {
/** True iff this entry is waiting for a worker to start processing it. */
uint8_t pending;
/** Function to run in the worker thread. */
- int (*fn)(void *state, void *arg);
+ workqueue_reply_t (*fn)(void *state, void *arg);
/** Function to run while processing the reply queue. */
void (*reply_fn)(void *arg);
/** Argument for the above functions. */
@@ -96,7 +96,7 @@ static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
* <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
* thread. See threadpool_queue_work() for full documentation. */
static workqueue_entry_t *
-workqueue_entry_new(int (*fn)(void*, void*),
+workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
void (*reply_fn)(void*),
void *arg)
{
@@ -172,7 +172,7 @@ worker_thread_main(void *thread_)
workerthread_t *thread = thread_;
threadpool_t *pool = thread->in_pool;
workqueue_entry_t *work;
- int result;
+ workqueue_reply_t result;
tor_mutex_acquire(&pool->lock);
while (1) {
@@ -182,13 +182,14 @@ worker_thread_main(void *thread_)
if (thread->in_pool->generation != thread->generation) {
void *arg = thread->in_pool->update_args[thread->index];
thread->in_pool->update_args[thread->index] = NULL;
- int (*update_fn)(void*,void*) = thread->in_pool->update_fn;
+ workqueue_reply_t (*update_fn)(void*,void*) =
+ thread->in_pool->update_fn;
thread->generation = thread->in_pool->generation;
tor_mutex_release(&pool->lock);
- int r = update_fn(thread->state, arg);
+ workqueue_reply_t r = update_fn(thread->state, arg);
- if (r < 0) {
+ if (r != WQ_RPL_REPLY) {
return;
}
@@ -208,7 +209,7 @@ worker_thread_main(void *thread_)
queue_reply(thread->reply_queue, work);
/* We may need to exit the thread. */
- if (result >= WQ_RPL_ERROR) {
+ if (result != WQ_RPL_REPLY) {
return;
}
tor_mutex_acquire(&pool->lock);
@@ -255,6 +256,7 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
if (spawn_func(worker_thread_main, thr) < 0) {
log_err(LD_GENERAL, "Can't launch worker thread.");
+ tor_free(thr);
return NULL;
}
@@ -280,7 +282,7 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
*/
workqueue_entry_t *
threadpool_queue_work(threadpool_t *pool,
- int (*fn)(void *, void *),
+ workqueue_reply_t (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg)
{
@@ -292,10 +294,10 @@ threadpool_queue_work(threadpool_t *pool,
TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work);
- tor_mutex_release(&pool->lock);
-
tor_cond_signal_one(&pool->condition);
+ tor_mutex_release(&pool->lock);
+
return ent;
}
@@ -317,7 +319,7 @@ threadpool_queue_work(threadpool_t *pool,
int
threadpool_queue_update(threadpool_t *pool,
void *(*dup_fn)(void *),
- int (*fn)(void *, void *),
+ workqueue_reply_t (*fn)(void *, void *),
void (*free_fn)(void *),
void *arg)
{
@@ -344,10 +346,10 @@ threadpool_queue_update(threadpool_t *pool,
pool->update_fn = fn;
++pool->generation;
- tor_mutex_release(&pool->lock);
-
tor_cond_signal_all(&pool->condition);
+ tor_mutex_release(&pool->lock);
+
if (old_args) {
for (i = 0; i < n_threads; ++i) {
if (old_args[i] && old_args_free_fn)
@@ -359,12 +361,17 @@ threadpool_queue_update(threadpool_t *pool,
return 0;
}
+/** Don't have more than this many threads per pool. */
+#define MAX_THREADS 1024
+
/** Launch threads until we have <b>n</b>. */
static int
threadpool_start_threads(threadpool_t *pool, int n)
{
if (n < 0)
return -1;
+ if (n > MAX_THREADS)
+ n = MAX_THREADS;
tor_mutex_acquire(&pool->lock);
@@ -377,6 +384,7 @@ threadpool_start_threads(threadpool_t *pool, int n)
workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue);
if (!thr) {
+ pool->free_thread_state_fn(state);
tor_mutex_release(&pool->lock);
return -1;
}
@@ -473,7 +481,8 @@ replyqueue_process(replyqueue_t *queue)
if (queue->alert.drain_fn(queue->alert.read_fd) < 0) {
static ratelim_t warn_limit = RATELIM_INIT(7200);
log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
- "Failure from drain_fd");
+ "Failure from drain_fd: %s",
+ tor_socket_strerror(tor_socket_errno(queue->alert.read_fd)));
}
tor_mutex_acquire(&queue->lock);