aboutsummaryrefslogtreecommitdiff
path: root/src/common/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/workqueue.c')
-rw-r--r--src/common/workqueue.c31
1 files changed, 29 insertions, 2 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index 0a38550de0..e1fb663a2a 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -6,6 +6,20 @@
*
* \brief Implements worker threads, queues of work for them, and mechanisms
* for them to send answers back to the main thread.
+ *
+ * The main structure here is a threadpool_t : it manages a set of worker
+ * threads, a queue of pending work, and a reply queue. Every piece of work
+ * is a workqueue_entry_t, containing data to process and a function to
+ * process it with.
+ *
+ * The main thread informs the worker threads of pending work by using a
+ * condition variable. The workers inform the main process of completed work
+ * by using an alert_sockets_t object, as implemented in compat_threads.c.
+ *
+ * The main thread can also queue an "update" that will be handled by all the
+ * workers. This is useful for updating state that all the workers share.
+ *
+ * In Tor today, there is currently only one thread pool, used in cpuworker.c.
*/
#include "orconfig.h"
@@ -262,9 +276,12 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
thr->in_pool = pool;
if (spawn_func(worker_thread_main, thr) < 0) {
+ //LCOV_EXCL_START
+ tor_assert_nonfatal_unreached();
log_err(LD_GENERAL, "Can't launch worker thread.");
tor_free(thr);
return NULL;
+ //LCOV_EXCL_STOP
}
return thr;
@@ -375,8 +392,8 @@ threadpool_queue_update(threadpool_t *pool,
static int
threadpool_start_threads(threadpool_t *pool, int n)
{
- if (n < 0)
- return -1;
+ if (BUG(n < 0))
+ return -1; // LCOV_EXCL_LINE
if (n > MAX_THREADS)
n = MAX_THREADS;
@@ -391,9 +408,12 @@ threadpool_start_threads(threadpool_t *pool, int n)
workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue);
if (!thr) {
+ //LCOV_EXCL_START
+ tor_assert_nonfatal_unreached();
pool->free_thread_state_fn(state);
tor_mutex_release(&pool->lock);
return -1;
+ //LCOV_EXCL_STOP
}
thr->index = pool->n_threads;
pool->threads[pool->n_threads++] = thr;
@@ -429,10 +449,13 @@ threadpool_new(int n_threads,
pool->reply_queue = replyqueue;
if (threadpool_start_threads(pool, n_threads) < 0) {
+ //LCOV_EXCL_START
+ tor_assert_nonfatal_unreached();
tor_cond_uninit(&pool->condition);
tor_mutex_uninit(&pool->lock);
tor_free(pool);
return NULL;
+ //LCOV_EXCL_STOP
}
return pool;
@@ -456,8 +479,10 @@ replyqueue_new(uint32_t alertsocks_flags)
rq = tor_malloc_zero(sizeof(replyqueue_t));
if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
+ //LCOV_EXCL_START
tor_free(rq);
return NULL;
+ //LCOV_EXCL_STOP
}
tor_mutex_init(&rq->lock);
@@ -486,10 +511,12 @@ void
replyqueue_process(replyqueue_t *queue)
{
if (queue->alert.drain_fn(queue->alert.read_fd) < 0) {
+ //LCOV_EXCL_START
static ratelim_t warn_limit = RATELIM_INIT(7200);
log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
"Failure from drain_fd: %s",
tor_socket_strerror(tor_socket_errno(queue->alert.read_fd)));
+ //LCOV_EXCL_STOP
}
tor_mutex_acquire(&queue->lock);