aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--changes/ticket409914
-rw-r--r--src/lib/evloop/workqueue.c118
2 files changed, 10 insertions, 112 deletions
diff --git a/changes/ticket40991 b/changes/ticket40991
deleted file mode 100644
index 3ee26efb47..0000000000
--- a/changes/ticket40991
+++ /dev/null
@@ -1,4 +0,0 @@
- o Minor bugfixes (threads, memory):
- - Rework start and exit of worker threads.
- Improvements in cleanup of resources used by threads.
- Fixes bug 40991; bugfix on 0.4.8.13.
diff --git a/src/lib/evloop/workqueue.c b/src/lib/evloop/workqueue.c
index e61c838b22..17ab44e3ab 100644
--- a/src/lib/evloop/workqueue.c
+++ b/src/lib/evloop/workqueue.c
@@ -78,8 +78,6 @@ struct threadpool_t {
/** Number of elements in threads. */
int n_threads;
- /** Number of elements to be created in threads. */
- int n_threads_max;
/** Mutex to protect all the above fields. */
tor_mutex_t lock;
@@ -90,11 +88,6 @@ struct threadpool_t {
void *(*new_thread_state_fn)(void*);
void (*free_thread_state_fn)(void*);
void *new_thread_state_arg;
-
- /** Used for signalling the worker threads to exit. */
- int exit;
- /** Mutex for controlling worker threads' startup and exit. */
- tor_mutex_t control_lock;
};
/** Used to put a workqueue_priority_t value into a bitfield. */
@@ -277,25 +270,13 @@ worker_thread_extract_next_work(workerthread_t *thread)
static void
worker_thread_main(void *thread_)
{
- static int n_worker_threads_running = 0;
workerthread_t *thread = thread_;
threadpool_t *pool = thread->in_pool;
workqueue_entry_t *work;
workqueue_reply_t result;
tor_mutex_acquire(&pool->lock);
- log_debug(LD_GENERAL, "Worker thread (TID: %lu) has started.",
- tor_get_thread_id());
-
- if (++n_worker_threads_running == pool->n_threads_max)
- /* Let the main thread know, the last worker thread has started. */
- tor_cond_signal_one(&pool->condition);
-
while (1) {
- /* Exit thread when signaled to exit */
- if (pool->exit)
- break;
-
/* lock must be held at this point. */
while (worker_thread_has_work(thread)) {
/* lock must be held at this point. */
@@ -310,7 +291,7 @@ worker_thread_main(void *thread_)
workqueue_reply_t r = update_fn(thread->state, arg);
if (r != WQ_RPL_REPLY) {
- break;
+ return;
}
tor_mutex_acquire(&pool->lock);
@@ -330,7 +311,7 @@ worker_thread_main(void *thread_)
/* We may need to exit the thread. */
if (result != WQ_RPL_REPLY) {
- break;
+ return;
}
tor_mutex_acquire(&pool->lock);
}
@@ -344,14 +325,6 @@ worker_thread_main(void *thread_)
log_warn(LD_GENERAL, "Fail tor_cond_wait.");
}
}
-
- log_debug(LD_GENERAL, "Worker thread (TID: %lu) has exited.",
- tor_get_thread_id());
-
- if (--n_worker_threads_running == 0)
- tor_mutex_release(&pool->control_lock);
-
- tor_mutex_release(&pool->lock);
}
/** Put a reply on the reply queue. The reply must not currently be on
@@ -549,9 +522,6 @@ threadpool_start_threads(threadpool_t *pool, int n)
pool->threads = tor_reallocarray(pool->threads,
sizeof(workerthread_t*), n);
- pool->n_threads_max = n;
- log_debug(LD_GENERAL, "Starting worker threads...");
-
while (pool->n_threads < n) {
/* For half of our threads, we'll choose lower priorities permissively;
* for the other half, we'll stick more strictly to higher priorities.
@@ -573,36 +543,9 @@ threadpool_start_threads(threadpool_t *pool, int n)
thr->index = pool->n_threads;
pool->threads[pool->n_threads++] = thr;
}
-
tor_mutex_release(&pool->lock);
- tor_mutex_acquire(&pool->control_lock);
-
- struct timeval tv = {.tv_sec = 30, .tv_usec = 0};
-
- /* Wait for the last launched thread to confirm us, it has started.
- * Wait max 30 seconds */
- switch (tor_cond_wait(&pool->condition, &pool->control_lock, &tv)) {
- case -1:
- log_warn(LD_GENERAL, "Failed to confirm worker threads' start up.");
- goto error;
- case 1:
- log_warn(LD_GENERAL, "Failed to confirm worker threads' "
- "start up after timeout.");
- goto error;
- case 0:
- log_debug(LD_GENERAL, "Starting worker threads finished.");
- break;
- default:;
- }
-
- /* On success control lock stays locked. This is required for the
- * main thread to wait for the worker threads to exit on shutdown. */
return 0;
-
-error:
- tor_mutex_release(&pool->control_lock);
- return -1;
}
/**
@@ -623,9 +566,6 @@ threadpool_new(int n_threads,
pool = tor_malloc_zero(sizeof(threadpool_t));
tor_mutex_init_nonrecursive(&pool->lock);
tor_cond_init(&pool->condition);
- tor_mutex_init_nonrecursive(&pool->control_lock);
- pool->exit = 0;
-
unsigned i;
for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
TOR_TAILQ_INIT(&pool->work[i]);
@@ -639,6 +579,8 @@ threadpool_new(int n_threads,
if (threadpool_start_threads(pool, n_threads) < 0) {
//LCOV_EXCL_START
tor_assert_nonfatal_unreached();
+ tor_cond_uninit(&pool->condition);
+ tor_mutex_uninit(&pool->lock);
threadpool_free(pool);
return NULL;
//LCOV_EXCL_STOP
@@ -656,32 +598,6 @@ threadpool_free_(threadpool_t *pool)
if (!pool)
return;
- tor_mutex_acquire(&pool->lock);
- /* Signal the worker threads to exit */
- pool->exit = 1;
- /* If worker threads are waiting for work, let them continue to exit */
- tor_cond_signal_all(&pool->condition);
-
- log_debug(LD_GENERAL, "Signaled worker threads to exit. "
- "Waiting for them to exit...");
- tor_mutex_release(&pool->lock);
-
- /* Wait until all worker threads have exited.
- * pool->control_lock must be prelocked here. */
- tor_mutex_acquire(&pool->control_lock);
- /* Unlock required, else main thread hangs on
- * tor_mutex_uninit(&pool->control_lock) */
- tor_mutex_release(&pool->control_lock);
-
- /* If this message appears in the log before all threads have confirmed
- * their exit, then pool->control_lock wasn't prelocked for some reason. */
- log_debug(LD_GENERAL, "All worker threads have exited. "
- "Beginning to clean up...");
-
- tor_cond_uninit(&pool->condition);
- tor_mutex_uninit(&pool->lock);
- tor_mutex_uninit(&pool->control_lock);
-
if (pool->threads) {
for (int i = 0; i != pool->n_threads; ++i)
workerthread_free(pool->threads[i]);
@@ -689,35 +605,21 @@ threadpool_free_(threadpool_t *pool)
tor_free(pool->threads);
}
- if (pool->update_args) {
- if (!pool->free_update_arg_fn)
- log_warn(LD_GENERAL, "Freeing pool->update_args not possible. "
- "pool->free_update_arg_fn is not set.");
- else
- pool->free_update_arg_fn(pool->update_args);
- }
+ if (pool->update_args)
+ pool->free_update_arg_fn(pool->update_args);
if (pool->reply_event) {
- if (tor_event_del(pool->reply_event) == -1)
- log_warn(LD_GENERAL, "libevent error: deleting reply event failed.");
- else
- tor_event_free(pool->reply_event);
+ tor_event_del(pool->reply_event);
+ tor_event_free(pool->reply_event);
}
if (pool->reply_queue)
replyqueue_free(pool->reply_queue);
- if (pool->new_thread_state_arg) {
- if (!pool->free_thread_state_fn)
- log_warn(LD_GENERAL, "Freeing pool->new_thread_state_arg not possible. "
- "pool->free_thread_state_fn is not set.");
- else
- pool->free_thread_state_fn(pool->new_thread_state_arg);
- }
+ if (pool->new_thread_state_arg)
+ pool->free_thread_state_fn(pool->new_thread_state_arg);
tor_free(pool);
-
- log_debug(LD_GENERAL, "Cleanup finished.");
}
/** Return the reply queue associated with a given thread pool. */