aboutsummaryrefslogtreecommitdiff
path: root/src/lib/evloop/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/evloop/workqueue.c')
-rw-r--r--src/lib/evloop/workqueue.c38
1 files changed, 15 insertions, 23 deletions
diff --git a/src/lib/evloop/workqueue.c b/src/lib/evloop/workqueue.c
index 931f65e710..603dddd5a3 100644
--- a/src/lib/evloop/workqueue.c
+++ b/src/lib/evloop/workqueue.c
@@ -15,7 +15,7 @@
*
* The main thread informs the worker threads of pending work by using a
* condition variable. The workers inform the main process of completed work
- * by using an alert_sockets_t object, as implemented in compat_threads.c.
+ * by using an alert_sockets_t object, as implemented in net/alertsock.c.
*
* The main thread can also queue an "update" that will be handled by all the
* workers. This is useful for updating state that all the workers share.
@@ -36,7 +36,7 @@
#include "lib/net/socket.h"
#include "lib/thread/threads.h"
-#include "tor_queue.h"
+#include "ext/tor_queue.h"
#include <event2/event.h>
#include <string.h>
@@ -44,13 +44,13 @@
#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
-TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_s);
+TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_t);
typedef struct work_tailq_t work_tailq_t;
-struct threadpool_s {
+struct threadpool_t {
/** An array of pointers to workerthread_t: one for each running worker
* thread. */
- struct workerthread_s **threads;
+ struct workerthread_t **threads;
/** Condition variable that we wait on when we have no work, and which
* gets signaled when our queue becomes nonempty. */
@@ -59,9 +59,6 @@ struct threadpool_s {
* <b>p</b> is work[p]. */
work_tailq_t work[WORKQUEUE_N_PRIORITIES];
- /** Weak RNG, used to decide when to ignore priority. */
- tor_weak_rng_t weak_rng;
-
/** The current 'update generation' of the threadpool. Any thread that is
* at an earlier generation needs to run the update function. */
unsigned generation;
@@ -95,14 +92,14 @@ struct threadpool_s {
/** Number of bits needed to hold all legal values of workqueue_priority_t */
#define WORKQUEUE_PRIORITY_BITS 2
-struct workqueue_entry_s {
+struct workqueue_entry_t {
/** The next workqueue_entry_t that's pending on the same thread or
* reply queue. */
- TOR_TAILQ_ENTRY(workqueue_entry_s) next_work;
+ TOR_TAILQ_ENTRY(workqueue_entry_t) next_work;
/** The threadpool to which this workqueue_entry_t was assigned. This field
* is set when the workqueue_entry_t is created, and won't be cleared until
* after it's handled in the main thread. */
- struct threadpool_s *on_pool;
+ struct threadpool_t *on_pool;
/** True iff this entry is waiting for a worker to start processing it. */
uint8_t pending;
/** Priority of this entry. */
@@ -115,22 +112,22 @@ struct workqueue_entry_s {
void *arg;
};
-struct replyqueue_s {
+struct replyqueue_t {
/** Mutex to protect the answers field */
tor_mutex_t lock;
/** Doubly-linked list of answers that the reply queue needs to handle. */
- TOR_TAILQ_HEAD(, workqueue_entry_s) answers;
+ TOR_TAILQ_HEAD(, workqueue_entry_t) answers;
/** Mechanism to wake up the main thread when it is receiving answers. */
alert_sockets_t alert;
};
/** A worker thread represents a single thread in a thread pool. */
-typedef struct workerthread_s {
+typedef struct workerthread_t {
/** Which thread it this? In range 0..in_pool->n_threads-1 */
int index;
/** The pool this thread is a part of. */
- struct threadpool_s *in_pool;
+ struct threadpool_t *in_pool;
/** User-supplied state field that we pass to the worker functions of each
* work item. */
void *state;
@@ -238,7 +235,7 @@ worker_thread_extract_next_work(workerthread_t *thread)
this_queue = &pool->work[i];
if (!TOR_TAILQ_EMPTY(this_queue)) {
queue = this_queue;
- if (! tor_weak_random_one_in_n(&pool->weak_rng,
+ if (! crypto_fast_rng_one_in_n(get_thread_fast_rng(),
thread->lower_priority_chance)) {
/* Usually we'll just break now, so that we can get out of the loop
* and use the queue where we found work. But with a small
@@ -555,11 +552,6 @@ threadpool_new(int n_threads,
for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
TOR_TAILQ_INIT(&pool->work[i]);
}
- {
- unsigned seed;
- crypto_rand((void*)&seed, sizeof(seed));
- tor_init_weak_random(&pool->weak_rng, seed);
- }
pool->new_thread_state_fn = new_thread_state_fn;
pool->new_thread_state_arg = arg;
@@ -622,8 +614,8 @@ reply_event_cb(evutil_socket_t sock, short events, void *arg)
tp->reply_cb(tp);
}
-/** Register the threadpool <b>tp</b>'s reply queue with the libevent
- * mainloop of <b>base</b>. If <b>tp</b> is provided, it is run after
+/** Register the threadpool <b>tp</b>'s reply queue with Tor's global
+ * libevent mainloop. If <b>cb</b> is provided, it is run after
* each time there is work to process from the reply queue. Return 0 on
* success, -1 on failure.
*/