diff options
Diffstat (limited to 'src/common/workqueue.c')
-rw-r--r-- | src/common/workqueue.c | 72 |
1 files changed, 45 insertions, 27 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c index ea8dcb0f9b..80e061dfb5 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -31,16 +31,18 @@ keep array of threads; round-robin between them. */ -typedef struct workqueue_entry_s { - TOR_SIMPLEQ_ENTRY(workqueue_entry_s) next_work; - int (*fn)(int status, void *state, void *arg); +struct workqueue_entry_s { + TOR_TAILQ_ENTRY(workqueue_entry_s) next_work; + struct workerthread_s *on_thread; + uint8_t pending; + int (*fn)(void *state, void *arg); void (*reply_fn)(void *arg); void *arg; -} workqueue_entry_t; +}; struct replyqueue_s { tor_mutex_t lock; - TOR_SIMPLEQ_HEAD(, workqueue_entry_s) answers; + TOR_TAILQ_HEAD(, workqueue_entry_s) answers; void (*alert_fn)(struct replyqueue_s *); // lock not held on this, next 2. tor_socket_t write_sock; @@ -50,7 +52,7 @@ struct replyqueue_s { typedef struct workerthread_s { tor_mutex_t lock; tor_cond_t condition; - TOR_SIMPLEQ_HEAD(, workqueue_entry_s) work; + TOR_TAILQ_HEAD(, workqueue_entry_s) work; unsigned is_running; unsigned is_shut_down; unsigned waiting; @@ -76,7 +78,7 @@ struct threadpool_s { static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); static workqueue_entry_t * -workqueue_entry_new(int (*fn)(int, void*, void*), +workqueue_entry_new(int (*fn)(void*, void*), void (*reply_fn)(void*), void *arg) { @@ -95,6 +97,23 @@ workqueue_entry_free(workqueue_entry_t *ent) tor_free(ent); } +int +workqueue_entry_cancel(workqueue_entry_t *ent) +{ + int cancelled = 0; + tor_mutex_acquire(&ent->on_thread->lock); + if (ent->pending) { + TOR_TAILQ_REMOVE(&ent->on_thread->work, ent, next_work); + cancelled = 1; + } + tor_mutex_release(&ent->on_thread->lock); + + if (cancelled) { + tor_free(ent); + } + return cancelled; +} + static void worker_thread_main(void *thread_) { @@ -107,20 +126,17 @@ worker_thread_main(void *thread_) thread->is_running = 1; while (1) { /* lock held. */ - while (!TOR_SIMPLEQ_EMPTY(&thread->work)) { + while (!TOR_TAILQ_EMPTY(&thread->work)) { /* lock held. */ - work = TOR_SIMPLEQ_FIRST(&thread->work); - TOR_SIMPLEQ_REMOVE_HEAD(&thread->work, next_work); + work = TOR_TAILQ_FIRST(&thread->work); + TOR_TAILQ_REMOVE(&thread->work, work, next_work); + work->pending = 0; tor_mutex_release(&thread->lock); - result = work->fn(WQ_CMD_RUN, thread->state, work->arg); + result = work->fn(thread->state, work->arg); - if (result == WQ_RPL_QUEUE) { - queue_reply(thread->reply_queue, work); - } else { - workqueue_entry_free(work); - } + queue_reply(thread->reply_queue, work); tor_mutex_acquire(&thread->lock); if (result >= WQ_RPL_ERROR) { @@ -148,8 +164,8 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work) { int was_empty; tor_mutex_acquire(&queue->lock); - was_empty = TOR_SIMPLEQ_EMPTY(&queue->answers); - TOR_SIMPLEQ_INSERT_TAIL(&queue->answers, work, next_work); + was_empty = TOR_TAILQ_EMPTY(&queue->answers); + TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work); tor_mutex_release(&queue->lock); if (was_empty) { @@ -175,7 +191,7 @@ workerthread_new(void *state, replyqueue_t *replyqueue) workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); tor_mutex_init_for_cond(&thr->lock); tor_cond_init(&thr->condition); - TOR_SIMPLEQ_INIT(&thr->work); + TOR_TAILQ_INIT(&thr->work); thr->state = state; thr->reply_queue = replyqueue; @@ -187,9 +203,9 @@ workerthread_new(void *state, replyqueue_t *replyqueue) return thr; } -void * +workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, - int (*fn)(int, void *, void *), + int (*fn)(void *, void *), void (*reply_fn)(void *), void *arg) { @@ -206,11 +222,12 @@ threadpool_queue_work(threadpool_t *pool, pool->next_for_work = 0; tor_mutex_release(&pool->lock); - ent = workqueue_entry_new(fn, reply_fn, arg); tor_mutex_acquire(&worker->lock); - TOR_SIMPLEQ_INSERT_TAIL(&worker->work, ent, next_work); + ent->on_thread = worker; + ent->pending = 1; + TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work); if (worker->waiting) /* XXXX inside or outside of lock?? */ tor_cond_signal_one(&worker->condition); @@ -298,7 +315,7 @@ replyqueue_new(void) rq = tor_malloc_zero(sizeof(replyqueue_t)); tor_mutex_init(&rq->lock); - TOR_SIMPLEQ_INIT(&rq->answers); + TOR_TAILQ_INIT(&rq->answers); rq->read_sock = pair[0]; rq->write_sock = pair[1]; @@ -331,10 +348,10 @@ replyqueue_process(replyqueue_t *queue) /* XXXX freak out on r == 0, or r == "error, not retryable". */ tor_mutex_acquire(&queue->lock); - while (!TOR_SIMPLEQ_EMPTY(&queue->answers)) { + while (!TOR_TAILQ_EMPTY(&queue->answers)) { /* lock held. */ - workqueue_entry_t *work = TOR_SIMPLEQ_FIRST(&queue->answers); - TOR_SIMPLEQ_REMOVE_HEAD(&queue->answers, next_work); + workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); + TOR_TAILQ_REMOVE(&queue->answers, work, next_work); tor_mutex_release(&queue->lock); work->reply_fn(work->arg); @@ -345,3 +362,4 @@ replyqueue_process(replyqueue_t *queue) tor_mutex_release(&queue->lock); } + |