diff options
Diffstat (limited to 'src/common/workqueue.c')
-rw-r--r-- | src/common/workqueue.c | 31 |
1 files changed, 29 insertions, 2 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c index 0a38550de0..e1fb663a2a 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -6,6 +6,20 @@ * * \brief Implements worker threads, queues of work for them, and mechanisms * for them to send answers back to the main thread. + * + * The main structure here is a threadpool_t : it manages a set of worker + * threads, a queue of pending work, and a reply queue. Every piece of work + * is a workqueue_entry_t, containing data to process and a function to + * process it with. + * + * The main thread informs the worker threads of pending work by using a + * condition variable. The workers inform the main process of completed work + * by using an alert_sockets_t object, as implemented in compat_threads.c. + * + * The main thread can also queue an "update" that will be handled by all the + * workers. This is useful for updating state that all the workers share. + * + * In Tor today, there is currently only one thread pool, used in cpuworker.c. */ #include "orconfig.h" @@ -262,9 +276,12 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) thr->in_pool = pool; if (spawn_func(worker_thread_main, thr) < 0) { + //LCOV_EXCL_START + tor_assert_nonfatal_unreached(); log_err(LD_GENERAL, "Can't launch worker thread."); tor_free(thr); return NULL; + //LCOV_EXCL_STOP } return thr; @@ -375,8 +392,8 @@ threadpool_queue_update(threadpool_t *pool, static int threadpool_start_threads(threadpool_t *pool, int n) { - if (n < 0) - return -1; + if (BUG(n < 0)) + return -1; // LCOV_EXCL_LINE if (n > MAX_THREADS) n = MAX_THREADS; @@ -391,9 +408,12 @@ threadpool_start_threads(threadpool_t *pool, int n) workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue); if (!thr) { + //LCOV_EXCL_START + tor_assert_nonfatal_unreached(); pool->free_thread_state_fn(state); tor_mutex_release(&pool->lock); return -1; + //LCOV_EXCL_STOP } thr->index = pool->n_threads; pool->threads[pool->n_threads++] = thr; @@ -429,10 +449,13 @@ threadpool_new(int n_threads, pool->reply_queue = replyqueue; if (threadpool_start_threads(pool, n_threads) < 0) { + //LCOV_EXCL_START + tor_assert_nonfatal_unreached(); tor_cond_uninit(&pool->condition); tor_mutex_uninit(&pool->lock); tor_free(pool); return NULL; + //LCOV_EXCL_STOP } return pool; @@ -456,8 +479,10 @@ replyqueue_new(uint32_t alertsocks_flags) rq = tor_malloc_zero(sizeof(replyqueue_t)); if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) { + //LCOV_EXCL_START tor_free(rq); return NULL; + //LCOV_EXCL_STOP } tor_mutex_init(&rq->lock); @@ -486,10 +511,12 @@ void replyqueue_process(replyqueue_t *queue) { if (queue->alert.drain_fn(queue->alert.read_fd) < 0) { + //LCOV_EXCL_START static ratelim_t warn_limit = RATELIM_INIT(7200); log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL, "Failure from drain_fd: %s", tor_socket_strerror(tor_socket_errno(queue->alert.read_fd))); + //LCOV_EXCL_STOP } tor_mutex_acquire(&queue->lock); |