diff options
Diffstat (limited to 'src/common/workqueue.c')
-rw-r--r-- | src/common/workqueue.c | 49 |
1 files changed, 41 insertions, 8 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c index ec96959b7d..563a98af96 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -1,3 +1,4 @@ + /* copyright (c) 2013-2015, The Tor Project, Inc. */ /* See LICENSE for licensing information */ @@ -24,13 +25,16 @@ #include "orconfig.h" #include "compat.h" +#include "compat_libevent.h" #include "compat_threads.h" -#include "crypto.h" +#include "crypto_rand.h" #include "util.h" #include "workqueue.h" #include "tor_queue.h" #include "torlog.h" +#include <event2/event.h> + #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1) @@ -63,6 +67,9 @@ struct threadpool_s { void (*free_update_arg_fn)(void *); /** Array of n_threads update arguments. */ void **update_args; + /** Event to notice when another thread has sent a reply. */ + struct event *reply_event; + void (*reply_cb)(threadpool_t *); /** Number of elements in threads. */ int n_threads; @@ -597,15 +604,41 @@ replyqueue_new(uint32_t alertsocks_flags) return rq; } -/** - * Return the "read socket" for a given reply queue. The main thread should - * listen for read events on this socket, and call replyqueue_process() every - * time it triggers. +/** Internal: Run from the libevent mainloop when there is work to handle in + * the reply queue handler. */ +static void +reply_event_cb(evutil_socket_t sock, short events, void *arg) +{ + threadpool_t *tp = arg; + (void) sock; + (void) events; + replyqueue_process(tp->reply_queue); + if (tp->reply_cb) + tp->reply_cb(tp); +} + +/** Register the threadpool <b>tp</b>'s reply queue with the libevent + * mainloop of <b>base</b>. If <b>tp</b> is provided, it is run after + * each time there is work to process from the reply queue. Return 0 on + * success, -1 on failure. */ -tor_socket_t -replyqueue_get_socket(replyqueue_t *rq) +int +threadpool_register_reply_event(threadpool_t *tp, + void (*cb)(threadpool_t *tp)) { - return rq->alert.read_fd; + struct event_base *base = tor_libevent_get_base(); + + if (tp->reply_event) { + tor_event_free(tp->reply_event); + } + tp->reply_event = tor_event_new(base, + tp->reply_queue->alert.read_fd, + EV_READ|EV_PERSIST, + reply_event_cb, + tp); + tor_assert(tp->reply_event); + tp->reply_cb = cb; + return event_add(tp->reply_event, NULL); } /** |