diff options
author | Nick Mathewson <nickm@torproject.org> | 2015-08-21 10:37:01 -0400 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2015-08-21 10:37:01 -0400 |
commit | 3b6d2f9bf407ee560bafaeabb02b3bc88caa680a (patch) | |
tree | f20d08ea0db20ec548e53ae1fea7221ea897cd96 /src | |
parent | b58dfba76f2d34d8401130137705f2ab1f9bf8c9 (diff) | |
parent | 1633d1ad1d334b183e123192ccd8edc6277dad22 (diff) | |
download | tor-3b6d2f9bf407ee560bafaeabb02b3bc88caa680a.tar.gz tor-3b6d2f9bf407ee560bafaeabb02b3bc88caa680a.zip |
Merge branch 'workqueue_squashed'
Diffstat (limited to 'src')
-rw-r--r-- | src/common/compat.c | 1 | ||||
-rw-r--r-- | src/common/container.h | 3 | ||||
-rw-r--r-- | src/common/workqueue.c | 21 | ||||
-rw-r--r-- | src/common/workqueue.h | 19 | ||||
-rw-r--r-- | src/or/control.c | 1 | ||||
-rw-r--r-- | src/or/cpuworker.c | 4 | ||||
-rw-r--r-- | src/or/rendcache.c | 2 | ||||
-rw-r--r-- | src/or/rendclient.c | 9 | ||||
-rw-r--r-- | src/test/test_util.c | 1 | ||||
-rw-r--r-- | src/test/test_workqueue.c | 29 |
10 files changed, 59 insertions, 31 deletions
diff --git a/src/common/compat.c b/src/common/compat.c index 76f9bcb97e..7d72b4b7fd 100644 --- a/src/common/compat.c +++ b/src/common/compat.c @@ -3424,3 +3424,4 @@ tor_get_avail_disk_space(const char *path) return -1; #endif } + diff --git a/src/common/container.h b/src/common/container.h index 5abd8b48d9..bf4f04762c 100644 --- a/src/common/container.h +++ b/src/common/container.h @@ -110,7 +110,8 @@ void smartlist_sort_digests256(smartlist_t *sl); void smartlist_sort_pointers(smartlist_t *sl); const char *smartlist_get_most_frequent_string(smartlist_t *sl); -const char *smartlist_get_most_frequent_string_(smartlist_t *sl, int *count_out); +const char *smartlist_get_most_frequent_string_(smartlist_t *sl, + int *count_out); const uint8_t *smartlist_get_most_frequent_digest256(smartlist_t *sl); void smartlist_uniq_strings(smartlist_t *sl); diff --git a/src/common/workqueue.c b/src/common/workqueue.c index b0b004dc25..c467bdf43b 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -25,7 +25,7 @@ struct threadpool_s { unsigned generation; /** Function that should be run for updates on each thread. */ - int (*update_fn)(void *, void *); + workqueue_reply_t (*update_fn)(void *, void *); /** Function to free update arguments if they can't be run. */ void (*free_update_arg_fn)(void *); /** Array of n_threads update arguments. */ @@ -56,7 +56,7 @@ struct workqueue_entry_s { /** True iff this entry is waiting for a worker to start processing it. */ uint8_t pending; /** Function to run in the worker thread. */ - int (*fn)(void *state, void *arg); + workqueue_reply_t (*fn)(void *state, void *arg); /** Function to run while processing the reply queue. */ void (*reply_fn)(void *arg); /** Argument for the above functions. */ @@ -96,7 +96,7 @@ static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main * thread. See threadpool_queue_work() for full documentation. */ static workqueue_entry_t * -workqueue_entry_new(int (*fn)(void*, void*), +workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*), void (*reply_fn)(void*), void *arg) { @@ -172,7 +172,7 @@ worker_thread_main(void *thread_) workerthread_t *thread = thread_; threadpool_t *pool = thread->in_pool; workqueue_entry_t *work; - int result; + workqueue_reply_t result; tor_mutex_acquire(&pool->lock); while (1) { @@ -182,13 +182,14 @@ worker_thread_main(void *thread_) if (thread->in_pool->generation != thread->generation) { void *arg = thread->in_pool->update_args[thread->index]; thread->in_pool->update_args[thread->index] = NULL; - int (*update_fn)(void*,void*) = thread->in_pool->update_fn; + workqueue_reply_t (*update_fn)(void*,void*) = + thread->in_pool->update_fn; thread->generation = thread->in_pool->generation; tor_mutex_release(&pool->lock); - int r = update_fn(thread->state, arg); + workqueue_reply_t r = update_fn(thread->state, arg); - if (r < 0) { + if (r != WQ_RPL_REPLY) { return; } @@ -208,7 +209,7 @@ worker_thread_main(void *thread_) queue_reply(thread->reply_queue, work); /* We may need to exit the thread. */ - if (result >= WQ_RPL_ERROR) { + if (result != WQ_RPL_REPLY) { return; } tor_mutex_acquire(&pool->lock); @@ -281,7 +282,7 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) */ workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, - int (*fn)(void *, void *), + workqueue_reply_t (*fn)(void *, void *), void (*reply_fn)(void *), void *arg) { @@ -318,7 +319,7 @@ threadpool_queue_work(threadpool_t *pool, int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), - int (*fn)(void *, void *), + workqueue_reply_t (*fn)(void *, void *), void (*free_fn)(void *), void *arg) { diff --git a/src/common/workqueue.h b/src/common/workqueue.h index 92e82b8a48..9ce1eadafc 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -15,21 +15,22 @@ typedef struct threadpool_s threadpool_t; * pool. */ typedef struct workqueue_entry_s workqueue_entry_t; -/** Possible return value from a work function: indicates success. */ -#define WQ_RPL_REPLY 0 -/** Possible return value from a work function: indicates fatal error */ -#define WQ_RPL_ERROR 1 -/** Possible return value from a work function: indicates thread is shutting - * down. */ -#define WQ_RPL_SHUTDOWN 2 +/** Possible return value from a work function: */ +typedef enum { + WQ_RPL_REPLY = 0, /** indicates success */ + WQ_RPL_ERROR = 1, /** indicates fatal error */ + WQ_RPL_SHUTDOWN = 2, /** indicates thread is shutting down */ +} workqueue_reply_t; workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, - int (*fn)(void *, void *), + workqueue_reply_t (*fn)(void *, + void *), void (*reply_fn)(void *), void *arg); + int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), - int (*fn)(void *, void *), + workqueue_reply_t (*fn)(void *, void *), void (*free_fn)(void *), void *arg); void *workqueue_entry_cancel(workqueue_entry_t *pending_work); diff --git a/src/or/control.c b/src/or/control.c index ed2fedac32..220e7e514f 100644 --- a/src/or/control.c +++ b/src/or/control.c @@ -6478,3 +6478,4 @@ control_testing_set_global_event_mask(uint64_t mask) global_event_mask = mask; } #endif + diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c index d511ecf84c..76d97e05f2 100644 --- a/src/or/cpuworker.c +++ b/src/or/cpuworker.c @@ -160,7 +160,7 @@ typedef struct cpuworker_job_u { } u; } cpuworker_job_t; -static int +static workqueue_reply_t update_state_threadfn(void *state_, void *work_) { worker_state_t *state = state_; @@ -387,7 +387,7 @@ cpuworker_onion_handshake_replyfn(void *work_) } /** Implementation function for onion handshake requests. */ -static int +static workqueue_reply_t cpuworker_onion_handshake_threadfn(void *state_, void *work_) { worker_state_t *state = state_; diff --git a/src/or/rendcache.c b/src/or/rendcache.c index 9a33046fb6..fe9a1344f6 100644 --- a/src/or/rendcache.c +++ b/src/or/rendcache.c @@ -330,7 +330,7 @@ cache_failure_intro_lookup(const uint8_t *identity, const char *service_id, *intro_entry = intro_elem; } return 1; -not_found: + not_found: return 0; } diff --git a/src/or/rendclient.c b/src/or/rendclient.c index c6f29a7707..a39e518e99 100644 --- a/src/or/rendclient.c +++ b/src/or/rendclient.c @@ -1021,7 +1021,7 @@ rend_client_report_intro_point_failure(extend_info_t *failed_intro, /* fall through */ case INTRO_POINT_FAILURE_GENERIC: rend_cache_intro_failure_note(failure_type, - (uint8_t *) failed_intro->identity_digest, + (uint8_t *)failed_intro->identity_digest, rend_query->onion_address); rend_intro_point_free(intro); smartlist_del(ent->parsed->intro_nodes, i); @@ -1038,9 +1038,10 @@ rend_client_report_intro_point_failure(extend_info_t *failed_intro, intro->unreachable_count, zap_intro_point ? " Removing from descriptor.": ""); if (zap_intro_point) { - rend_cache_intro_failure_note(failure_type, - (uint8_t *) failed_intro->identity_digest, - rend_query->onion_address); + rend_cache_intro_failure_note( + failure_type, + (uint8_t *) failed_intro->identity_digest, + rend_query->onion_address); rend_intro_point_free(intro); smartlist_del(ent->parsed->intro_nodes, i); } diff --git a/src/test/test_util.c b/src/test/test_util.c index 8b4513d34c..0a5783e9f5 100644 --- a/src/test/test_util.c +++ b/src/test/test_util.c @@ -3654,7 +3654,6 @@ test_util_di_map(void *arg) dimap_free(dimap, tor_free_); } - /** * Test counting high bits */ diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c index 7f20642041..0d79733cf0 100644 --- a/src/test/test_workqueue.c +++ b/src/test/test_workqueue.c @@ -70,7 +70,7 @@ mark_handled(int serial) #endif } -static int +static workqueue_reply_t workqueue_do_rsa(void *state, void *work) { rsa_work_t *rw = work; @@ -98,7 +98,7 @@ workqueue_do_rsa(void *state, void *work) return WQ_RPL_REPLY; } -static int +static workqueue_reply_t workqueue_do_shutdown(void *state, void *work) { (void)state; @@ -108,7 +108,7 @@ workqueue_do_shutdown(void *state, void *work) return WQ_RPL_SHUTDOWN; } -static int +static workqueue_reply_t workqueue_do_ecdh(void *state, void *work) { ecdh_work_t *ew = work; @@ -124,6 +124,14 @@ workqueue_do_ecdh(void *state, void *work) return WQ_RPL_REPLY; } +static workqueue_reply_t +workqueue_shutdown_error(void *state, void *work) +{ + (void)state; + (void)work; + return WQ_RPL_REPLY; +} + static void * new_state(void *arg) { @@ -156,6 +164,7 @@ static int n_sent = 0; static int rsa_sent = 0; static int ecdh_sent = 0; static int n_received = 0; +static int no_shutdown = 0; #ifdef TRACK_RESPONSES bitarray_t *received; @@ -174,6 +183,14 @@ handle_reply(void *arg) ++n_received; } +/* This should never get called. */ +static void +handle_reply_shutdown(void *arg) +{ + (void)arg; + no_shutdown = 1; +} + static workqueue_entry_t * add_work(threadpool_t *tp) { @@ -288,6 +305,9 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg) shutting_down = 1; threadpool_queue_update(tp, NULL, workqueue_do_shutdown, NULL, NULL); + // Anything we add after starting the shutdown must not be executed. + threadpool_queue_work(tp, workqueue_shutdown_error, + handle_reply_shutdown, NULL); { struct timeval limit = { 2, 0 }; tor_event_base_loopexit(tor_libevent_get_base(), &limit); @@ -416,6 +436,9 @@ main(int argc, char **argv) printf("%d+%d vs %d\n", n_received, n_successful_cancel, n_sent); puts("FAIL"); return 1; + } else if (no_shutdown) { + puts("Accepted work after shutdown\n"); + puts("FAIL"); } else { puts("OK"); return 0; |