diff options
-rw-r--r-- | src/common/workqueue.c | 72 | ||||
-rw-r--r-- | src/common/workqueue.h | 18 | ||||
-rw-r--r-- | src/test/bench_workqueue.c | 24 |
3 files changed, 61 insertions, 53 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c index ea8dcb0f9b..80e061dfb5 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -31,16 +31,18 @@ keep array of threads; round-robin between them. */ -typedef struct workqueue_entry_s { - TOR_SIMPLEQ_ENTRY(workqueue_entry_s) next_work; - int (*fn)(int status, void *state, void *arg); +struct workqueue_entry_s { + TOR_TAILQ_ENTRY(workqueue_entry_s) next_work; + struct workerthread_s *on_thread; + uint8_t pending; + int (*fn)(void *state, void *arg); void (*reply_fn)(void *arg); void *arg; -} workqueue_entry_t; +}; struct replyqueue_s { tor_mutex_t lock; - TOR_SIMPLEQ_HEAD(, workqueue_entry_s) answers; + TOR_TAILQ_HEAD(, workqueue_entry_s) answers; void (*alert_fn)(struct replyqueue_s *); // lock not held on this, next 2. tor_socket_t write_sock; @@ -50,7 +52,7 @@ struct replyqueue_s { typedef struct workerthread_s { tor_mutex_t lock; tor_cond_t condition; - TOR_SIMPLEQ_HEAD(, workqueue_entry_s) work; + TOR_TAILQ_HEAD(, workqueue_entry_s) work; unsigned is_running; unsigned is_shut_down; unsigned waiting; @@ -76,7 +78,7 @@ struct threadpool_s { static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); static workqueue_entry_t * -workqueue_entry_new(int (*fn)(int, void*, void*), +workqueue_entry_new(int (*fn)(void*, void*), void (*reply_fn)(void*), void *arg) { @@ -95,6 +97,23 @@ workqueue_entry_free(workqueue_entry_t *ent) tor_free(ent); } +int +workqueue_entry_cancel(workqueue_entry_t *ent) +{ + int cancelled = 0; + tor_mutex_acquire(&ent->on_thread->lock); + if (ent->pending) { + TOR_TAILQ_REMOVE(&ent->on_thread->work, ent, next_work); + cancelled = 1; + } + tor_mutex_release(&ent->on_thread->lock); + + if (cancelled) { + tor_free(ent); + } + return cancelled; +} + static void worker_thread_main(void *thread_) { @@ -107,20 +126,17 @@ worker_thread_main(void *thread_) thread->is_running = 1; while (1) { /* lock held. */ - while (!TOR_SIMPLEQ_EMPTY(&thread->work)) { + while (!TOR_TAILQ_EMPTY(&thread->work)) { /* lock held. */ - work = TOR_SIMPLEQ_FIRST(&thread->work); - TOR_SIMPLEQ_REMOVE_HEAD(&thread->work, next_work); + work = TOR_TAILQ_FIRST(&thread->work); + TOR_TAILQ_REMOVE(&thread->work, work, next_work); + work->pending = 0; tor_mutex_release(&thread->lock); - result = work->fn(WQ_CMD_RUN, thread->state, work->arg); + result = work->fn(thread->state, work->arg); - if (result == WQ_RPL_QUEUE) { - queue_reply(thread->reply_queue, work); - } else { - workqueue_entry_free(work); - } + queue_reply(thread->reply_queue, work); tor_mutex_acquire(&thread->lock); if (result >= WQ_RPL_ERROR) { @@ -148,8 +164,8 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work) { int was_empty; tor_mutex_acquire(&queue->lock); - was_empty = TOR_SIMPLEQ_EMPTY(&queue->answers); - TOR_SIMPLEQ_INSERT_TAIL(&queue->answers, work, next_work); + was_empty = TOR_TAILQ_EMPTY(&queue->answers); + TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work); tor_mutex_release(&queue->lock); if (was_empty) { @@ -175,7 +191,7 @@ workerthread_new(void *state, replyqueue_t *replyqueue) workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); tor_mutex_init_for_cond(&thr->lock); tor_cond_init(&thr->condition); - TOR_SIMPLEQ_INIT(&thr->work); + TOR_TAILQ_INIT(&thr->work); thr->state = state; thr->reply_queue = replyqueue; @@ -187,9 +203,9 @@ workerthread_new(void *state, replyqueue_t *replyqueue) return thr; } -void * +workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, - int (*fn)(int, void *, void *), + int (*fn)(void *, void *), void (*reply_fn)(void *), void *arg) { @@ -206,11 +222,12 @@ threadpool_queue_work(threadpool_t *pool, pool->next_for_work = 0; tor_mutex_release(&pool->lock); - ent = workqueue_entry_new(fn, reply_fn, arg); tor_mutex_acquire(&worker->lock); - TOR_SIMPLEQ_INSERT_TAIL(&worker->work, ent, next_work); + ent->on_thread = worker; + ent->pending = 1; + TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work); if (worker->waiting) /* XXXX inside or outside of lock?? */ tor_cond_signal_one(&worker->condition); @@ -298,7 +315,7 @@ replyqueue_new(void) rq = tor_malloc_zero(sizeof(replyqueue_t)); tor_mutex_init(&rq->lock); - TOR_SIMPLEQ_INIT(&rq->answers); + TOR_TAILQ_INIT(&rq->answers); rq->read_sock = pair[0]; rq->write_sock = pair[1]; @@ -331,10 +348,10 @@ replyqueue_process(replyqueue_t *queue) /* XXXX freak out on r == 0, or r == "error, not retryable". */ tor_mutex_acquire(&queue->lock); - while (!TOR_SIMPLEQ_EMPTY(&queue->answers)) { + while (!TOR_TAILQ_EMPTY(&queue->answers)) { /* lock held. */ - workqueue_entry_t *work = TOR_SIMPLEQ_FIRST(&queue->answers); - TOR_SIMPLEQ_REMOVE_HEAD(&queue->answers, next_work); + workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); + TOR_TAILQ_REMOVE(&queue->answers, work, next_work); tor_mutex_release(&queue->lock); work->reply_fn(work->arg); @@ -345,3 +362,4 @@ replyqueue_process(replyqueue_t *queue) tor_mutex_release(&queue->lock); } + diff --git a/src/common/workqueue.h b/src/common/workqueue.h index e502734b84..47753cff12 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -8,20 +8,20 @@ typedef struct replyqueue_s replyqueue_t; typedef struct threadpool_s threadpool_t; - +typedef struct workqueue_entry_s workqueue_entry_t; #define WQ_CMD_RUN 0 #define WQ_CMD_CANCEL 1 -#define WQ_RPL_QUEUE 0 -#define WQ_RPL_NOQUEUE 1 -#define WQ_RPL_ERROR 2 -#define WQ_RPL_SHUTDOWN 3 +#define WQ_RPL_REPLY 0 +#define WQ_RPL_ERROR 1 +#define WQ_RPL_SHUTDOWN 2 -void *threadpool_queue_work(threadpool_t *pool, - int (*fn)(int, void *, void *), - void (*reply_fn)(void *), - void *arg); +workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, + int (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg); +int workqueue_entry_cancel(workqueue_entry_t *pending_work); int threadpool_start_threads(threadpool_t *pool, int n); threadpool_t *threadpool_new(int n_threads, replyqueue_t *replyqueue, diff --git a/src/test/bench_workqueue.c b/src/test/bench_workqueue.c index 1bdfbefb3e..f190c613e5 100644 --- a/src/test/bench_workqueue.c +++ b/src/test/bench_workqueue.c @@ -64,7 +64,7 @@ mark_handled(int serial) } static int -workqueue_do_rsa(int cmd, void *state, void *work) +workqueue_do_rsa(void *state, void *work) { rsa_work_t *rw = work; state_t *st = state; @@ -74,16 +74,11 @@ workqueue_do_rsa(int cmd, void *state, void *work) tor_assert(st->magic == 13371337); - if (cmd == WQ_CMD_CANCEL) { - tor_free(work); - return WQ_RPL_NOQUEUE; - } - len = crypto_pk_private_sign(rsa, (char*)sig, 256, (char*)rw->msg, rw->msglen); if (len < 0) { - tor_free(work); - return WQ_RPL_NOQUEUE; + rw->msglen = 0; + return WQ_RPL_ERROR; } memset(rw->msg, 0, sizeof(rw->msg)); @@ -93,12 +88,12 @@ workqueue_do_rsa(int cmd, void *state, void *work) mark_handled(rw->serial); - return WQ_RPL_QUEUE; + return WQ_RPL_REPLY; } #if 0 static int -workqueue_do_shutdown(int cmd, void *state, void *work) +workqueue_do_shutdown(void *state, void *work) { (void)state; (void)work; @@ -110,7 +105,7 @@ workqueue_do_shutdown(int cmd, void *state, void *work) #endif static int -workqueue_do_ecdh(int cmd, void *state, void *work) +workqueue_do_ecdh(void *state, void *work) { ecdh_work_t *ew = work; uint8_t output[CURVE25519_OUTPUT_LEN]; @@ -118,16 +113,11 @@ workqueue_do_ecdh(int cmd, void *state, void *work) tor_assert(st->magic == 13371337); - if (cmd == WQ_CMD_CANCEL) { - tor_free(work); - return WQ_RPL_NOQUEUE; - } - curve25519_handshake(output, &st->ecdh, &ew->u.pk); memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN); ++st->n_handled; mark_handled(ew->serial); - return WQ_RPL_QUEUE; + return WQ_RPL_REPLY; } static void * |