diff options
-rw-r--r-- | src/common/workqueue.c | 57 | ||||
-rw-r--r-- | src/common/workqueue.h | 6 |
2 files changed, 49 insertions, 14 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c index ed70d5e5fc..c4b64de58b 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -186,13 +186,32 @@ workerthread_new(void *state, replyqueue_t *replyqueue) return thr; } +static workqueue_entry_t * +workerthread_queue_work(workerthread_t *worker, + int (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg) +{ + workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); + + tor_mutex_acquire(&worker->lock); + ent->on_thread = worker; + ent->pending = 1; + TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work); + + if (worker->waiting) /* XXXX inside or outside of lock?? */ + tor_cond_signal_one(&worker->condition); + + tor_mutex_release(&worker->lock); + return ent; +} + workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, int (*fn)(void *, void *), void (*reply_fn)(void *), void *arg) { - workqueue_entry_t *ent; workerthread_t *worker; tor_mutex_acquire(&pool->lock); @@ -205,22 +224,34 @@ threadpool_queue_work(threadpool_t *pool, pool->next_for_work = 0; tor_mutex_release(&pool->lock); - ent = workqueue_entry_new(fn, reply_fn, arg); - - tor_mutex_acquire(&worker->lock); - ent->on_thread = worker; - ent->pending = 1; - TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work); - - if (worker->waiting) /* XXXX inside or outside of lock?? */ - tor_cond_signal_one(&worker->condition); + return workerthread_queue_work(worker, fn, reply_fn, arg); +} - tor_mutex_release(&worker->lock); +int +threadpool_queue_for_all(threadpool_t *pool, + void *(*dup_fn)(void *), + int (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg) +{ + int i = 0; + workerthread_t *worker; + void *arg_copy; + while (1) { + tor_mutex_acquire(&pool->lock); + if (i >= pool->n_threads) { + tor_mutex_release(&pool->lock); + return 0; + } + worker = pool->threads[i++]; + tor_mutex_release(&pool->lock); - return ent; + arg_copy = dup_fn ? dup_fn(arg) : arg; + /* CHECK*/ workerthread_queue_work(worker, fn, reply_fn, arg_copy); + } } -int +static int threadpool_start_threads(threadpool_t *pool, int n) { tor_mutex_acquire(&pool->lock); diff --git a/src/common/workqueue.h b/src/common/workqueue.h index 47753cff12..684fb192ba 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -21,8 +21,12 @@ workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, int (*fn)(void *, void *), void (*reply_fn)(void *), void *arg); +int threadpool_queue_for_all(threadpool_t *pool, + void *(*dup_fn)(void *), + int (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg); int workqueue_entry_cancel(workqueue_entry_t *pending_work); -int threadpool_start_threads(threadpool_t *pool, int n); threadpool_t *threadpool_new(int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void*), |