summaryrefslogtreecommitdiff
path: root/src/common/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/workqueue.c')
-rw-r--r--src/common/workqueue.c72
1 files changed, 45 insertions, 27 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index ea8dcb0f9b..80e061dfb5 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -31,16 +31,18 @@ keep array of threads; round-robin between them.
*/
-typedef struct workqueue_entry_s {
- TOR_SIMPLEQ_ENTRY(workqueue_entry_s) next_work;
- int (*fn)(int status, void *state, void *arg);
+struct workqueue_entry_s {
+ TOR_TAILQ_ENTRY(workqueue_entry_s) next_work;
+ struct workerthread_s *on_thread;
+ uint8_t pending;
+ int (*fn)(void *state, void *arg);
void (*reply_fn)(void *arg);
void *arg;
-} workqueue_entry_t;
+};
struct replyqueue_s {
tor_mutex_t lock;
- TOR_SIMPLEQ_HEAD(, workqueue_entry_s) answers;
+ TOR_TAILQ_HEAD(, workqueue_entry_s) answers;
void (*alert_fn)(struct replyqueue_s *); // lock not held on this, next 2.
tor_socket_t write_sock;
@@ -50,7 +52,7 @@ struct replyqueue_s {
typedef struct workerthread_s {
tor_mutex_t lock;
tor_cond_t condition;
- TOR_SIMPLEQ_HEAD(, workqueue_entry_s) work;
+ TOR_TAILQ_HEAD(, workqueue_entry_s) work;
unsigned is_running;
unsigned is_shut_down;
unsigned waiting;
@@ -76,7 +78,7 @@ struct threadpool_s {
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
static workqueue_entry_t *
-workqueue_entry_new(int (*fn)(int, void*, void*),
+workqueue_entry_new(int (*fn)(void*, void*),
void (*reply_fn)(void*),
void *arg)
{
@@ -95,6 +97,23 @@ workqueue_entry_free(workqueue_entry_t *ent)
tor_free(ent);
}
+int
+workqueue_entry_cancel(workqueue_entry_t *ent)
+{
+ int cancelled = 0;
+ tor_mutex_acquire(&ent->on_thread->lock);
+ if (ent->pending) {
+ TOR_TAILQ_REMOVE(&ent->on_thread->work, ent, next_work);
+ cancelled = 1;
+ }
+ tor_mutex_release(&ent->on_thread->lock);
+
+ if (cancelled) {
+ tor_free(ent);
+ }
+ return cancelled;
+}
+
static void
worker_thread_main(void *thread_)
{
@@ -107,20 +126,17 @@ worker_thread_main(void *thread_)
thread->is_running = 1;
while (1) {
/* lock held. */
- while (!TOR_SIMPLEQ_EMPTY(&thread->work)) {
+ while (!TOR_TAILQ_EMPTY(&thread->work)) {
/* lock held. */
- work = TOR_SIMPLEQ_FIRST(&thread->work);
- TOR_SIMPLEQ_REMOVE_HEAD(&thread->work, next_work);
+ work = TOR_TAILQ_FIRST(&thread->work);
+ TOR_TAILQ_REMOVE(&thread->work, work, next_work);
+ work->pending = 0;
tor_mutex_release(&thread->lock);
- result = work->fn(WQ_CMD_RUN, thread->state, work->arg);
+ result = work->fn(thread->state, work->arg);
- if (result == WQ_RPL_QUEUE) {
- queue_reply(thread->reply_queue, work);
- } else {
- workqueue_entry_free(work);
- }
+ queue_reply(thread->reply_queue, work);
tor_mutex_acquire(&thread->lock);
if (result >= WQ_RPL_ERROR) {
@@ -148,8 +164,8 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
{
int was_empty;
tor_mutex_acquire(&queue->lock);
- was_empty = TOR_SIMPLEQ_EMPTY(&queue->answers);
- TOR_SIMPLEQ_INSERT_TAIL(&queue->answers, work, next_work);
+ was_empty = TOR_TAILQ_EMPTY(&queue->answers);
+ TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
tor_mutex_release(&queue->lock);
if (was_empty) {
@@ -175,7 +191,7 @@ workerthread_new(void *state, replyqueue_t *replyqueue)
workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
tor_mutex_init_for_cond(&thr->lock);
tor_cond_init(&thr->condition);
- TOR_SIMPLEQ_INIT(&thr->work);
+ TOR_TAILQ_INIT(&thr->work);
thr->state = state;
thr->reply_queue = replyqueue;
@@ -187,9 +203,9 @@ workerthread_new(void *state, replyqueue_t *replyqueue)
return thr;
}
-void *
+workqueue_entry_t *
threadpool_queue_work(threadpool_t *pool,
- int (*fn)(int, void *, void *),
+ int (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg)
{
@@ -206,11 +222,12 @@ threadpool_queue_work(threadpool_t *pool,
pool->next_for_work = 0;
tor_mutex_release(&pool->lock);
-
ent = workqueue_entry_new(fn, reply_fn, arg);
tor_mutex_acquire(&worker->lock);
- TOR_SIMPLEQ_INSERT_TAIL(&worker->work, ent, next_work);
+ ent->on_thread = worker;
+ ent->pending = 1;
+ TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work);
if (worker->waiting) /* XXXX inside or outside of lock?? */
tor_cond_signal_one(&worker->condition);
@@ -298,7 +315,7 @@ replyqueue_new(void)
rq = tor_malloc_zero(sizeof(replyqueue_t));
tor_mutex_init(&rq->lock);
- TOR_SIMPLEQ_INIT(&rq->answers);
+ TOR_TAILQ_INIT(&rq->answers);
rq->read_sock = pair[0];
rq->write_sock = pair[1];
@@ -331,10 +348,10 @@ replyqueue_process(replyqueue_t *queue)
/* XXXX freak out on r == 0, or r == "error, not retryable". */
tor_mutex_acquire(&queue->lock);
- while (!TOR_SIMPLEQ_EMPTY(&queue->answers)) {
+ while (!TOR_TAILQ_EMPTY(&queue->answers)) {
/* lock held. */
- workqueue_entry_t *work = TOR_SIMPLEQ_FIRST(&queue->answers);
- TOR_SIMPLEQ_REMOVE_HEAD(&queue->answers, next_work);
+ workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
+ TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
tor_mutex_release(&queue->lock);
work->reply_fn(work->arg);
@@ -345,3 +362,4 @@ replyqueue_process(replyqueue_t *queue)
tor_mutex_release(&queue->lock);
}
+