summaryrefslogtreecommitdiff
path: root/src/common/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/workqueue.c')
-rw-r--r--src/common/workqueue.c49
1 files changed, 41 insertions, 8 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index ec96959b7d..563a98af96 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -1,3 +1,4 @@
+
/* copyright (c) 2013-2015, The Tor Project, Inc. */
/* See LICENSE for licensing information */
@@ -24,13 +25,16 @@
#include "orconfig.h"
#include "compat.h"
+#include "compat_libevent.h"
#include "compat_threads.h"
-#include "crypto.h"
+#include "crypto_rand.h"
#include "util.h"
#include "workqueue.h"
#include "tor_queue.h"
#include "torlog.h"
+#include <event2/event.h>
+
#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
@@ -63,6 +67,9 @@ struct threadpool_s {
void (*free_update_arg_fn)(void *);
/** Array of n_threads update arguments. */
void **update_args;
+ /** Event to notice when another thread has sent a reply. */
+ struct event *reply_event;
+ void (*reply_cb)(threadpool_t *);
/** Number of elements in threads. */
int n_threads;
@@ -597,15 +604,41 @@ replyqueue_new(uint32_t alertsocks_flags)
return rq;
}
-/**
- * Return the "read socket" for a given reply queue. The main thread should
- * listen for read events on this socket, and call replyqueue_process() every
- * time it triggers.
+/** Internal: Run from the libevent mainloop when there is work to handle in
+ * the reply queue handler. */
+static void
+reply_event_cb(evutil_socket_t sock, short events, void *arg)
+{
+ threadpool_t *tp = arg;
+ (void) sock;
+ (void) events;
+ replyqueue_process(tp->reply_queue);
+ if (tp->reply_cb)
+ tp->reply_cb(tp);
+}
+
+/** Register the threadpool <b>tp</b>'s reply queue with the libevent
+ * mainloop of <b>base</b>. If <b>tp</b> is provided, it is run after
+ * each time there is work to process from the reply queue. Return 0 on
+ * success, -1 on failure.
*/
-tor_socket_t
-replyqueue_get_socket(replyqueue_t *rq)
+int
+threadpool_register_reply_event(threadpool_t *tp,
+ void (*cb)(threadpool_t *tp))
{
- return rq->alert.read_fd;
+ struct event_base *base = tor_libevent_get_base();
+
+ if (tp->reply_event) {
+ tor_event_free(tp->reply_event);
+ }
+ tp->reply_event = tor_event_new(base,
+ tp->reply_queue->alert.read_fd,
+ EV_READ|EV_PERSIST,
+ reply_event_cb,
+ tp);
+ tor_assert(tp->reply_event);
+ tp->reply_cb = cb;
+ return event_add(tp->reply_event, NULL);
}
/**