/* copyright (c) 2013-2024, The Tor Project, Inc. */ /* See LICENSE for licensing information */ /** * \file workqueue.c * * \brief Implements worker threads, queues of work for them, and mechanisms * for them to send answers back to the main thread. * * The main structure here is a threadpool_t : it manages a set of worker * threads, a queue of pending work, and a reply queue. Every piece of work * is a workqueue_entry_t, containing data to process and a function to * process it with. * * The main thread informs the worker threads of pending work by using a * condition variable. The workers inform the main process of completed work * by using an alert_sockets_t object, as implemented in net/alertsock.c. * * The main thread can also queue an "update" that will be handled by all the * workers. This is useful for updating state that all the workers share. * * In Tor today, there is currently only one thread pool, managed * in cpuworker.c and handling a variety of types of work, from the original * "onion skin" circuit handshakes, to consensus diff computation, to * client-side onion service PoW generation. */ #include "orconfig.h" #include "lib/evloop/compat_libevent.h" #include "lib/evloop/workqueue.h" #include "lib/crypt_ops/crypto_rand.h" #include "lib/intmath/weakrng.h" #include "lib/log/ratelim.h" #include "lib/log/log.h" #include "lib/log/util_bug.h" #include "lib/net/alertsock.h" #include "lib/net/socket.h" #include "lib/thread/threads.h" #include "ext/tor_queue.h" #include #include #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1) TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_t); typedef struct work_tailq_t work_tailq_t; struct threadpool_t { /** An array of pointers to workerthread_t: one for each running worker * thread. */ struct workerthread_t **threads; /** Condition variable that we wait on when we have no work, and which * gets signaled when our queue becomes nonempty. */ tor_cond_t condition; /** Queues of pending work that we have to do. The queue with priority * p is work[p]. */ work_tailq_t work[WORKQUEUE_N_PRIORITIES]; /** The current 'update generation' of the threadpool. Any thread that is * at an earlier generation needs to run the update function. */ unsigned generation; /** Function that should be run for updates on each thread. */ workqueue_reply_t (*update_fn)(void *, void *); /** Function to free update arguments if they can't be run. */ void (*free_update_arg_fn)(void *); /** Array of n_threads update arguments. */ void **update_args; /** Event to notice when another thread has sent a reply. */ struct event *reply_event; void (*reply_cb)(threadpool_t *); /** Number of elements in threads. */ int n_threads; /** Number of elements to be created in threads. */ int n_threads_max; /** Mutex to protect all the above fields. */ tor_mutex_t lock; /** A reply queue to use when constructing new threads. */ replyqueue_t *reply_queue; /** Functions used to allocate and free thread state. */ void *(*new_thread_state_fn)(void*); void (*free_thread_state_fn)(void*); void *new_thread_state_arg; /** Used for signalling the worker threads to exit. */ int exit; /** Mutex for controlling worker threads' startup and exit. */ tor_mutex_t control_lock; }; /** Used to put a workqueue_priority_t value into a bitfield. */ #define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t) /** Number of bits needed to hold all legal values of workqueue_priority_t */ #define WORKQUEUE_PRIORITY_BITS 2 struct workqueue_entry_t { /** The next workqueue_entry_t that's pending on the same thread or * reply queue. */ TOR_TAILQ_ENTRY(workqueue_entry_t) next_work; /** The threadpool to which this workqueue_entry_t was assigned. This field * is set when the workqueue_entry_t is created, and won't be cleared until * after it's handled in the main thread. */ struct threadpool_t *on_pool; /** True iff this entry is waiting for a worker to start processing it. */ uint8_t pending; /** Priority of this entry. */ workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS; /** Function to run in the worker thread. */ workqueue_reply_t (*fn)(void *state, void *arg); /** Function to run while processing the reply queue. */ void (*reply_fn)(void *arg); /** Argument for the above functions. */ void *arg; }; struct replyqueue_t { /** Mutex to protect the answers field */ tor_mutex_t lock; /** Doubly-linked list of answers that the reply queue needs to handle. */ TOR_TAILQ_HEAD(, workqueue_entry_t) answers; /** Mechanism to wake up the main thread when it is receiving answers. */ alert_sockets_t alert; }; /** A worker thread represents a single thread in a thread pool. */ typedef struct workerthread_t { /** Which thread it this? In range 0..in_pool->n_threads-1 */ int index; /** The pool this thread is a part of. */ struct threadpool_t *in_pool; /** User-supplied state field that we pass to the worker functions of each * work item. */ void *state; /** Reply queue to which we pass our results. */ replyqueue_t *reply_queue; /** The current update generation of this thread */ unsigned generation; /** One over the probability of taking work from a lower-priority queue. */ int32_t lower_priority_chance; } workerthread_t; static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); static void workerthread_free_(workerthread_t *thread); #define workerthread_free(thread) \ FREE_AND_NULL(workerthread_t, workerthread_free_, (thread)) static void replyqueue_free_(replyqueue_t *queue); #define replyqueue_free(queue) \ FREE_AND_NULL(replyqueue_t, replyqueue_free_, (queue)) /** Allocate and return a new workqueue_entry_t, set up to run the function * fn in the worker thread, and reply_fn in the main * thread. See threadpool_queue_work() for full documentation. */ static workqueue_entry_t * workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*), void (*reply_fn)(void*), void *arg) { workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t)); ent->fn = fn; ent->reply_fn = reply_fn; ent->arg = arg; ent->priority = WQ_PRI_HIGH; return ent; } #define workqueue_entry_free(ent) \ FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent)) /** * Release all storage held in ent. Call only when ent is not on * any queue. */ static void workqueue_entry_free_(workqueue_entry_t *ent) { if (!ent) return; memset(ent, 0xf0, sizeof(*ent)); tor_free(ent); } /** * Cancel a workqueue_entry_t that has been returned from * threadpool_queue_work. * * You must not call this function on any work whose reply function has been * executed in the main thread; that will cause undefined behavior (probably, * a crash). * * If the work is cancelled, this function return the argument passed to the * work function. It is the caller's responsibility to free this storage. * * This function will have no effect if the worker thread has already executed * or begun to execute the work item. In that case, it will return NULL. */ void * workqueue_entry_cancel(workqueue_entry_t *ent) { int cancelled = 0; void *result = NULL; tor_mutex_acquire(&ent->on_pool->lock); workqueue_priority_t prio = ent->priority; if (ent->pending) { TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work); cancelled = 1; result = ent->arg; } tor_mutex_release(&ent->on_pool->lock); if (cancelled) { workqueue_entry_free(ent); } return result; } /**DOCDOC must hold lock */ static int worker_thread_has_work(workerthread_t *thread) { unsigned i; for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i])) return 1; } return thread->generation != thread->in_pool->generation; } /** Extract the next workqueue_entry_t from the the thread's pool, removing * it from the relevant queues and marking it as non-pending. * * The caller must hold the lock. */ static workqueue_entry_t * worker_thread_extract_next_work(workerthread_t *thread) { threadpool_t *pool = thread->in_pool; work_tailq_t *queue = NULL, *this_queue; unsigned i; for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { this_queue = &pool->work[i]; if (!TOR_TAILQ_EMPTY(this_queue)) { queue = this_queue; if (! crypto_fast_rng_one_in_n(get_thread_fast_rng(), thread->lower_priority_chance)) { /* Usually we'll just break now, so that we can get out of the loop * and use the queue where we found work. But with a small * probability, we'll keep looking for lower priority work, so that * we don't ignore our low-priority queues entirely. */ break; } } } if (queue == NULL) return NULL; workqueue_entry_t *work = TOR_TAILQ_FIRST(queue); TOR_TAILQ_REMOVE(queue, work, next_work); work->pending = 0; return work; } /** * Main function for the worker thread. */ static void worker_thread_main(void *thread_) { static int n_worker_threads_running = 0; workerthread_t *thread = thread_; threadpool_t *pool = thread->in_pool; workqueue_entry_t *work; workqueue_reply_t result; tor_mutex_acquire(&pool->lock); log_debug(LD_GENERAL, "Worker thread (TID: %lu) has started.", tor_get_thread_id()); if (++n_worker_threads_running == pool->n_threads_max) /* Let the main thread know, the last worker thread has started. */ tor_cond_signal_one(&pool->condition); while (1) { /* Exit thread when signaled to exit */ if (pool->exit) break; /* lock must be held at this point. */ while (worker_thread_has_work(thread)) { /* lock must be held at this point. */ if (thread->in_pool->generation != thread->generation) { void *arg = thread->in_pool->update_args[thread->index]; thread->in_pool->update_args[thread->index] = NULL; workqueue_reply_t (*update_fn)(void*,void*) = thread->in_pool->update_fn; thread->generation = thread->in_pool->generation; tor_mutex_release(&pool->lock); workqueue_reply_t r = update_fn(thread->state, arg); if (r != WQ_RPL_REPLY) { break; } tor_mutex_acquire(&pool->lock); continue; } work = worker_thread_extract_next_work(thread); if (BUG(work == NULL)) break; tor_mutex_release(&pool->lock); /* We run the work function without holding the thread lock. This * is the main thread's first opportunity to give us more work. */ result = work->fn(thread->state, work->arg); /* Queue the reply for the main thread. */ queue_reply(thread->reply_queue, work); /* We may need to exit the thread. */ if (result != WQ_RPL_REPLY) { break; } tor_mutex_acquire(&pool->lock); } /* At this point the lock is held, and there is no work in this thread's * queue. */ /* TODO: support an idle-function */ /* Okay. Now, wait till somebody has work for us. */ if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) { log_warn(LD_GENERAL, "Fail tor_cond_wait."); } } log_debug(LD_GENERAL, "Worker thread (TID: %lu) has exited.", tor_get_thread_id()); if (--n_worker_threads_running == 0) tor_mutex_release(&pool->control_lock); tor_mutex_release(&pool->lock); } /** Put a reply on the reply queue. The reply must not currently be on * any thread's work queue. */ static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work) { int was_empty; tor_mutex_acquire(&queue->lock); was_empty = TOR_TAILQ_EMPTY(&queue->answers); TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work); tor_mutex_release(&queue->lock); if (was_empty) { if (queue->alert.alert_fn(queue->alert.write_fd) < 0) { /* XXXX complain! */ } } } /** Allocate and start a new worker thread to use state object state, * and send responses to replyqueue. */ static workerthread_t * workerthread_new(int32_t lower_priority_chance, void *state, threadpool_t *pool, replyqueue_t *replyqueue) { workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); thr->state = state; thr->reply_queue = replyqueue; thr->in_pool = pool; thr->lower_priority_chance = lower_priority_chance; if (spawn_func(worker_thread_main, thr) < 0) { //LCOV_EXCL_START tor_assert_nonfatal_unreached(); log_err(LD_GENERAL, "Can't launch worker thread."); workerthread_free(thr); return NULL; //LCOV_EXCL_STOP } return thr; } /** * Free up the resources allocated by a worker thread. */ static void workerthread_free_(workerthread_t *thread) { tor_free(thread); } /** * Queue an item of work for a thread in a thread pool. The function * fn will be run in a worker thread, and will receive as arguments the * thread's state object, and the provided object arg. It must return * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN. * * Regardless of its return value, the function reply_fn will later be * run in the main thread when it invokes replyqueue_process(), and will * receive as its argument the same arg object. It's the reply * function's responsibility to free the work object. * * On success, return a workqueue_entry_t object that can be passed to * workqueue_entry_cancel(). On failure, return NULL. (Failure is not * currently possible, but callers should check anyway.) * * Items are executed in a loose priority order -- each thread will usually * take from the queued work with the highest prioirity, but will occasionally * visit lower-priority queues to keep them from starving completely. * * Note that because of priorities and thread behavior, work items may not * be executed strictly in order. */ workqueue_entry_t * threadpool_queue_work_priority(threadpool_t *pool, workqueue_priority_t prio, workqueue_reply_t (*fn)(void *, void *), void (*reply_fn)(void *), void *arg) { tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST && ((int)prio) <= WORKQUEUE_PRIORITY_LAST); workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); ent->on_pool = pool; ent->pending = 1; ent->priority = prio; tor_mutex_acquire(&pool->lock); TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work); tor_cond_signal_one(&pool->condition); tor_mutex_release(&pool->lock); return ent; } /** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */ workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, workqueue_reply_t (*fn)(void *, void *), void (*reply_fn)(void *), void *arg) { return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg); } /** * Queue a copy of a work item for every thread in a pool. This can be used, * for example, to tell the threads to update some parameter in their states. * * Arguments are as for threadpool_queue_work, except that the * arg value is passed to dup_fn once per each thread to * make a copy of it. * * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update * will be run. If a new update is scheduled before the old update finishes * running, then the new will replace the old in any threads that haven't run * it yet. * * Return 0 on success, -1 on failure. */ int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), workqueue_reply_t (*fn)(void *, void *), void (*free_fn)(void *), void *arg) { int i, n_threads; void (*old_args_free_fn)(void *arg); void **old_args; void **new_args; tor_mutex_acquire(&pool->lock); n_threads = pool->n_threads; old_args = pool->update_args; old_args_free_fn = pool->free_update_arg_fn; new_args = tor_calloc(n_threads, sizeof(void*)); for (i = 0; i < n_threads; ++i) { if (dup_fn) new_args[i] = dup_fn(arg); else new_args[i] = arg; } pool->update_args = new_args; pool->free_update_arg_fn = free_fn; pool->update_fn = fn; ++pool->generation; tor_cond_signal_all(&pool->condition); tor_mutex_release(&pool->lock); if (old_args) { for (i = 0; i < n_threads; ++i) { if (old_args[i] && old_args_free_fn) old_args_free_fn(old_args[i]); } tor_free(old_args); } return 0; } /** Don't have more than this many threads per pool. */ #define MAX_THREADS 1024 /** For half of our threads, choose lower priority queues with probability * 1/N for each of these values. Both are chosen somewhat arbitrarily. If * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks * stalling forever. If it's too high, we have a risk of low-priority tasks * grabbing half of the threads. */ #define CHANCE_PERMISSIVE 37 #define CHANCE_STRICT INT32_MAX /** Launch threads until we have n. */ static int threadpool_start_threads(threadpool_t *pool, int n) { if (BUG(n < 0)) return -1; // LCOV_EXCL_LINE if (n > MAX_THREADS) n = MAX_THREADS; tor_mutex_acquire(&pool->lock); if (pool->n_threads < n) pool->threads = tor_reallocarray(pool->threads, sizeof(workerthread_t*), n); pool->n_threads_max = n; log_debug(LD_GENERAL, "Starting worker threads..."); while (pool->n_threads < n) { /* For half of our threads, we'll choose lower priorities permissively; * for the other half, we'll stick more strictly to higher priorities. * This keeps slow low-priority tasks from taking over completely. */ int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE; void *state = pool->new_thread_state_fn(pool->new_thread_state_arg); workerthread_t *thr = workerthread_new(chance, state, pool, pool->reply_queue); if (!thr) { //LCOV_EXCL_START tor_assert_nonfatal_unreached(); pool->free_thread_state_fn(state); tor_mutex_release(&pool->lock); return -1; //LCOV_EXCL_STOP } thr->index = pool->n_threads; pool->threads[pool->n_threads++] = thr; } tor_mutex_release(&pool->lock); tor_mutex_acquire(&pool->control_lock); struct timeval tv = {.tv_sec = 30, .tv_usec = 0}; /* Wait for the last launched thread to confirm us, it has started. * Wait max 30 seconds */ switch (tor_cond_wait(&pool->condition, &pool->control_lock, &tv)) { case -1: log_warn(LD_GENERAL, "Failed to confirm worker threads' start up."); goto error; case 1: log_warn(LD_GENERAL, "Failed to confirm worker threads' " "start up after timeout."); goto error; case 0: log_debug(LD_GENERAL, "Starting worker threads finished."); break; default:; } /* On success control lock stays locked. This is required for the * main thread to wait for the worker threads to exit on shutdown. */ return 0; error: tor_mutex_release(&pool->control_lock); return -1; } /** * Construct a new thread pool with n worker threads, configured to * send their output to replyqueue. The threads' states will be * constructed with the new_thread_state_fn call, receiving arg * as its argument. When the threads close, they will call * free_thread_state_fn on their states. */ threadpool_t * threadpool_new(int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void*), void (*free_thread_state_fn)(void*), void *arg) { threadpool_t *pool; pool = tor_malloc_zero(sizeof(threadpool_t)); tor_mutex_init_nonrecursive(&pool->lock); tor_cond_init(&pool->condition); tor_mutex_init_nonrecursive(&pool->control_lock); pool->exit = 0; unsigned i; for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { TOR_TAILQ_INIT(&pool->work[i]); } pool->new_thread_state_fn = new_thread_state_fn; pool->new_thread_state_arg = arg; pool->free_thread_state_fn = free_thread_state_fn; pool->reply_queue = replyqueue; if (threadpool_start_threads(pool, n_threads) < 0) { //LCOV_EXCL_START tor_assert_nonfatal_unreached(); threadpool_free(pool); return NULL; //LCOV_EXCL_STOP } return pool; } /** * Free up the resources allocated by worker threads, worker thread pool, ... */ void threadpool_free_(threadpool_t *pool) { if (!pool) return; tor_mutex_acquire(&pool->lock); /* Signal the worker threads to exit */ pool->exit = 1; /* If worker threads are waiting for work, let them continue to exit */ tor_cond_signal_all(&pool->condition); log_debug(LD_GENERAL, "Signaled worker threads to exit. " "Waiting for them to exit..."); tor_mutex_release(&pool->lock); /* Wait until all worker threads have exited. * pool->control_lock must be prelocked here. */ tor_mutex_acquire(&pool->control_lock); /* Unlock required, else main thread hangs on * tor_mutex_uninit(&pool->control_lock) */ tor_mutex_release(&pool->control_lock); /* If this message appears in the log before all threads have confirmed * their exit, then pool->control_lock wasn't prelocked for some reason. */ log_debug(LD_GENERAL, "All worker threads have exited. " "Beginning to clean up..."); tor_cond_uninit(&pool->condition); tor_mutex_uninit(&pool->lock); tor_mutex_uninit(&pool->control_lock); if (pool->threads) { for (int i = 0; i != pool->n_threads; ++i) workerthread_free(pool->threads[i]); tor_free(pool->threads); } if (pool->update_args) { if (!pool->free_update_arg_fn) log_warn(LD_GENERAL, "Freeing pool->update_args not possible. " "pool->free_update_arg_fn is not set."); else pool->free_update_arg_fn(pool->update_args); } if (pool->reply_event) { if (tor_event_del(pool->reply_event) == -1) log_warn(LD_GENERAL, "libevent error: deleting reply event failed."); else tor_event_free(pool->reply_event); } if (pool->reply_queue) replyqueue_free(pool->reply_queue); if (pool->new_thread_state_arg) { if (!pool->free_thread_state_fn) log_warn(LD_GENERAL, "Freeing pool->new_thread_state_arg not possible. " "pool->free_thread_state_fn is not set."); else pool->free_thread_state_fn(pool->new_thread_state_arg); } tor_free(pool); log_debug(LD_GENERAL, "Cleanup finished."); } /** Return the reply queue associated with a given thread pool. */ replyqueue_t * threadpool_get_replyqueue(threadpool_t *tp) { return tp->reply_queue; } /** Allocate a new reply queue. Reply queues are used to pass results from * worker threads to the main thread. Since the main thread is running an * IO-centric event loop, it needs to get woken up with means other than a * condition variable. */ replyqueue_t * replyqueue_new(uint32_t alertsocks_flags) { replyqueue_t *rq; rq = tor_malloc_zero(sizeof(replyqueue_t)); if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) { //LCOV_EXCL_START replyqueue_free(rq); return NULL; //LCOV_EXCL_STOP } tor_mutex_init(&rq->lock); TOR_TAILQ_INIT(&rq->answers); return rq; } /** * Free up the resources allocated by a reply queue. */ static void replyqueue_free_(replyqueue_t *queue) { if (!queue) return; workqueue_entry_t *work; while (!TOR_TAILQ_EMPTY(&queue->answers)) { work = TOR_TAILQ_FIRST(&queue->answers); TOR_TAILQ_REMOVE(&queue->answers, work, next_work); workqueue_entry_free(work); } tor_free(queue); } /** Internal: Run from the libevent mainloop when there is work to handle in * the reply queue handler. */ static void reply_event_cb(evutil_socket_t sock, short events, void *arg) { threadpool_t *tp = arg; (void) sock; (void) events; replyqueue_process(tp->reply_queue); if (tp->reply_cb) tp->reply_cb(tp); } /** Register the threadpool tp's reply queue with Tor's global * libevent mainloop. If cb is provided, it is run after * each time there is work to process from the reply queue. Return 0 on * success, -1 on failure. */ int threadpool_register_reply_event(threadpool_t *tp, void (*cb)(threadpool_t *tp)) { struct event_base *base = tor_libevent_get_base(); if (tp->reply_event) { tor_event_free(tp->reply_event); } tp->reply_event = tor_event_new(base, tp->reply_queue->alert.read_fd, EV_READ|EV_PERSIST, reply_event_cb, tp); tor_assert(tp->reply_event); tp->reply_cb = cb; return event_add(tp->reply_event, NULL); } /** * Process all pending replies on a reply queue. The main thread should call * this function every time the socket returned by replyqueue_get_socket() is * readable. */ void replyqueue_process(replyqueue_t *queue) { int r = queue->alert.drain_fn(queue->alert.read_fd); if (r < 0) { //LCOV_EXCL_START static ratelim_t warn_limit = RATELIM_INIT(7200); log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL, "Failure from drain_fd: %s", tor_socket_strerror(-r)); //LCOV_EXCL_STOP } tor_mutex_acquire(&queue->lock); while (!TOR_TAILQ_EMPTY(&queue->answers)) { /* lock must be held at this point.*/ workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); TOR_TAILQ_REMOVE(&queue->answers, work, next_work); tor_mutex_release(&queue->lock); work->on_pool = NULL; work->reply_fn(work->arg); workqueue_entry_free(work); tor_mutex_acquire(&queue->lock); } tor_mutex_release(&queue->lock); } /** Return the number of threads configured for the given pool. */ unsigned int threadpool_get_n_threads(threadpool_t *tp) { tor_assert(tp); return tp->n_threads; }