summaryrefslogtreecommitdiff
path: root/src/common/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/workqueue.c')
-rw-r--r--src/common/workqueue.c57
1 files changed, 44 insertions, 13 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index ed70d5e5fc..c4b64de58b 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -186,13 +186,32 @@ workerthread_new(void *state, replyqueue_t *replyqueue)
return thr;
}
+static workqueue_entry_t *
+workerthread_queue_work(workerthread_t *worker,
+ int (*fn)(void *, void *),
+ void (*reply_fn)(void *),
+ void *arg)
+{
+ workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
+
+ tor_mutex_acquire(&worker->lock);
+ ent->on_thread = worker;
+ ent->pending = 1;
+ TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work);
+
+ if (worker->waiting) /* XXXX inside or outside of lock?? */
+ tor_cond_signal_one(&worker->condition);
+
+ tor_mutex_release(&worker->lock);
+ return ent;
+}
+
workqueue_entry_t *
threadpool_queue_work(threadpool_t *pool,
int (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg)
{
- workqueue_entry_t *ent;
workerthread_t *worker;
tor_mutex_acquire(&pool->lock);
@@ -205,22 +224,34 @@ threadpool_queue_work(threadpool_t *pool,
pool->next_for_work = 0;
tor_mutex_release(&pool->lock);
- ent = workqueue_entry_new(fn, reply_fn, arg);
-
- tor_mutex_acquire(&worker->lock);
- ent->on_thread = worker;
- ent->pending = 1;
- TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work);
-
- if (worker->waiting) /* XXXX inside or outside of lock?? */
- tor_cond_signal_one(&worker->condition);
+ return workerthread_queue_work(worker, fn, reply_fn, arg);
+}
- tor_mutex_release(&worker->lock);
+int
+threadpool_queue_for_all(threadpool_t *pool,
+ void *(*dup_fn)(void *),
+ int (*fn)(void *, void *),
+ void (*reply_fn)(void *),
+ void *arg)
+{
+ int i = 0;
+ workerthread_t *worker;
+ void *arg_copy;
+ while (1) {
+ tor_mutex_acquire(&pool->lock);
+ if (i >= pool->n_threads) {
+ tor_mutex_release(&pool->lock);
+ return 0;
+ }
+ worker = pool->threads[i++];
+ tor_mutex_release(&pool->lock);
- return ent;
+ arg_copy = dup_fn ? dup_fn(arg) : arg;
+ /* CHECK*/ workerthread_queue_work(worker, fn, reply_fn, arg_copy);
+ }
}
-int
+static int
threadpool_start_threads(threadpool_t *pool, int n)
{
tor_mutex_acquire(&pool->lock);