From a82604b526a2a258e057d6d515ac17429eb6fb67 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 23 Sep 2013 01:19:16 -0400 Subject: Initial workqueue implemention, with a simple test. It seems to be working, but more tuning is needed. --- src/test/include.am | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'src/test/include.am') diff --git a/src/test/include.am b/src/test/include.am index b9b381fdae..6ad1b552b7 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -1,6 +1,6 @@ TESTS += src/test/test -noinst_PROGRAMS+= src/test/bench +noinst_PROGRAMS+= src/test/bench src/test/bench_workqueue if UNITTESTS_ENABLED noinst_PROGRAMS+= src/test/test src/test/test-child endif @@ -62,6 +62,9 @@ src_test_test_CPPFLAGS= $(src_test_AM_CPPFLAGS) src_test_bench_SOURCES = \ src/test/bench.c +src_test_bench_workqueue_SOURCES = \ + src/test/bench_workqueue.c + src_test_test_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ @TOR_LDFLAGS_libevent@ src_test_test_LDADD = src/or/libtor-testing.a src/common/libor-testing.a \ @@ -80,6 +83,14 @@ src_test_bench_LDADD = src/or/libtor.a src/common/libor.a \ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ \ @TOR_SYSTEMD_LIBS@ +src_test_bench_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ + @TOR_LDFLAGS_libevent@ +src_test_bench_workqueue_LDADD = src/or/libtor.a src/common/libor.a \ + src/common/libor-crypto.a $(LIBDONNA) \ + src/common/libor-event.a \ + @TOR_ZLIB_LIBS@ @TOR_LIB_MATH@ @TOR_LIBEVENT_LIBS@ \ + @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ + noinst_HEADERS+= \ src/test/fakechans.h \ src/test/test.h \ -- cgit v1.2.3-54-g00ecf From 93ad89e9d219d6cea764652a05c236210c7de3fa Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 25 Sep 2013 11:36:02 -0400 Subject: Rename bench_workqueue -> test_workqueue and make it a unit test. --- .gitignore | 2 + src/test/bench_workqueue.c | 288 -------------------------------------- src/test/include.am | 19 +-- src/test/test_workqueue.c | 342 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 355 insertions(+), 296 deletions(-) delete mode 100644 src/test/bench_workqueue.c create mode 100644 src/test/test_workqueue.c (limited to 'src/test/include.am') diff --git a/.gitignore b/.gitignore index 9ddd0c5385..e63576cfd4 100644 --- a/.gitignore +++ b/.gitignore @@ -163,10 +163,12 @@ cscope.* /src/test/test-bt-cl /src/test/test-child /src/test/test-ntor-cl +/src/test/test_workqueue /src/test/test.exe /src/test/test-bt-cl.exe /src/test/test-child.exe /src/test/test-ntor-cl.exe +/src/test/test_workqueue.exe # /src/tools/ /src/tools/tor-checkkey diff --git a/src/test/bench_workqueue.c b/src/test/bench_workqueue.c deleted file mode 100644 index f190c613e5..0000000000 --- a/src/test/bench_workqueue.c +++ /dev/null @@ -1,288 +0,0 @@ -/* Copyright (c) 2001-2004, Roger Dingledine. - * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. - * Copyright (c) 2007-2013, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -#include "or.h" -#include "compat_threads.h" -#include "onion.h" -#include "workqueue.h" -#include "crypto.h" -#include "crypto_curve25519.h" -#include "compat_libevent.h" - -#include -#ifdef HAVE_EVENT2_EVENT_H -#include -#else -#include -#endif - -#ifdef TRACK_RESPONSES -tor_mutex_t bitmap_mutex; -int handled_len; -bitarray_t *handled; -#endif - -#define N_ITEMS 10000 -#define N_INFLIGHT 1000 -#define RELAUNCH_AT 250 - -typedef struct state_s { - int magic; - int n_handled; - crypto_pk_t *rsa; - curve25519_secret_key_t ecdh; -} state_t; - -typedef struct rsa_work_s { - int serial; - uint8_t msg[128]; - uint8_t msglen; -} rsa_work_t; - -typedef struct ecdh_work_s { - int serial; - union { - curve25519_public_key_t pk; - uint8_t msg[32]; - } u; -} ecdh_work_t; - -static void -mark_handled(int serial) -{ -#ifdef TRACK_RESPONSES - tor_mutex_acquire(&bitmap_mutex); - tor_assert(serial < handled_len); - tor_assert(! bitarray_is_set(handled, serial)); - bitarray_set(handled, serial); - tor_mutex_release(&bitmap_mutex); -#else - (void)serial; -#endif -} - -static int -workqueue_do_rsa(void *state, void *work) -{ - rsa_work_t *rw = work; - state_t *st = state; - crypto_pk_t *rsa = st->rsa; - uint8_t sig[256]; - int len; - - tor_assert(st->magic == 13371337); - - len = crypto_pk_private_sign(rsa, (char*)sig, 256, - (char*)rw->msg, rw->msglen); - if (len < 0) { - rw->msglen = 0; - return WQ_RPL_ERROR; - } - - memset(rw->msg, 0, sizeof(rw->msg)); - rw->msglen = len; - memcpy(rw->msg, sig, len); - ++st->n_handled; - - mark_handled(rw->serial); - - return WQ_RPL_REPLY; -} - -#if 0 -static int -workqueue_do_shutdown(void *state, void *work) -{ - (void)state; - (void)work; - (void)cmd; - crypto_pk_free(((state_t*)state)->rsa); - tor_free(state); - return WQ_RPL_SHUTDOWN; -} -#endif - -static int -workqueue_do_ecdh(void *state, void *work) -{ - ecdh_work_t *ew = work; - uint8_t output[CURVE25519_OUTPUT_LEN]; - state_t *st = state; - - tor_assert(st->magic == 13371337); - - curve25519_handshake(output, &st->ecdh, &ew->u.pk); - memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN); - ++st->n_handled; - mark_handled(ew->serial); - return WQ_RPL_REPLY; -} - -static void * -new_state(void *arg) -{ - state_t *st; - (void)arg; - - st = tor_malloc(sizeof(*st)); - /* Every thread gets its own keys. not a problem for benchmarking */ - st->rsa = crypto_pk_new(); - if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) { - puts("keygen failed"); - crypto_pk_free(st->rsa); - tor_free(st); - return NULL; - } - curve25519_secret_key_generate(&st->ecdh, 0); - st->magic = 13371337; - return st; -} - -static void -free_state(void *arg) -{ - state_t *st = arg; - crypto_pk_free(st->rsa); - tor_free(st); -} - -static tor_weak_rng_t weak_rng; -static int n_sent = 0; -static int rsa_sent = 0; -static int ecdh_sent = 0; -static int n_received = 0; - -#ifdef TRACK_RESPONSES -bitarray_t *received; -#endif - -static void -handle_reply(void *arg) -{ -#ifdef TRACK_RESPONSES - rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */ - tor_assert(! bitarray_is_set(received, rw->serial)); - bitarray_set(received,rw->serial); -#endif - - tor_free(arg); - ++n_received; -} - -static int -add_work(threadpool_t *tp) -{ - int add_rsa = tor_weak_random_range(&weak_rng, 5) == 0; - if (add_rsa) { - rsa_work_t *w = tor_malloc_zero(sizeof(*w)); - w->serial = n_sent++; - crypto_rand((char*)w->msg, 20); - w->msglen = 20; - ++rsa_sent; - return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL; - } else { - ecdh_work_t *w = tor_malloc_zero(sizeof(*w)); - w->serial = n_sent++; - /* Not strictly right, but this is just for benchmarks. */ - crypto_rand((char*)w->u.pk.public_key, 32); - ++ecdh_sent; - return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL; - } -} - -static void -replysock_readable_cb(tor_socket_t sock, short what, void *arg) -{ - threadpool_t *tp = arg; - replyqueue_t *rq = threadpool_get_replyqueue(tp); - - int old_r = n_received; - (void) sock; - (void) what; - - replyqueue_process(rq); - if (old_r == n_received) - return; - - printf("%d / %d\n", n_received, n_sent); -#ifdef TRACK_RESPONSES - tor_mutex_acquire(&bitmap_mutex); - for (i = 0; i < N_ITEMS; ++i) { - if (bitarray_is_set(received, i)) - putc('o', stdout); - else if (bitarray_is_set(handled, i)) - putc('!', stdout); - else - putc('.', stdout); - } - puts(""); - tor_mutex_release(&bitmap_mutex); -#endif - - if (n_sent - n_received < RELAUNCH_AT) { - while (n_sent < n_received + N_INFLIGHT && n_sent < N_ITEMS) { - if (! add_work(tp)) { - puts("Couldn't add work."); - tor_event_base_loopexit(tor_libevent_get_base(), NULL); - } - } - } - - if (n_received == n_sent && n_sent >= N_ITEMS) { - tor_event_base_loopexit(tor_libevent_get_base(), NULL); - } -} - -int -main(int argc, char **argv) -{ - replyqueue_t *rq; - threadpool_t *tp; - int i; - tor_libevent_cfg evcfg; - struct event *ev; - - (void)argc; - (void)argv; - - init_logging(1); - crypto_global_init(1, NULL, NULL); - crypto_seed_rng(1); - - rq = replyqueue_new(); - tor_assert(rq); - tp = threadpool_new(16, - rq, new_state, free_state, NULL); - tor_assert(tp); - - crypto_seed_weak_rng(&weak_rng); - - memset(&evcfg, 0, sizeof(evcfg)); - tor_libevent_initialize(&evcfg); - - ev = tor_event_new(tor_libevent_get_base(), - replyqueue_get_socket(rq), EV_READ|EV_PERSIST, - replysock_readable_cb, tp); - - event_add(ev, NULL); - -#ifdef TRACK_RESPONSES - handled = bitarray_init_zero(N_ITEMS); - received = bitarray_init_zero(N_ITEMS); - tor_mutex_init(&bitmap_mutex); - handled_len = N_ITEMS; -#endif - - for (i = 0; i < N_INFLIGHT; ++i) { - if (! add_work(tp)) { - puts("Couldn't add work."); - return 1; - } - } - - event_base_loop(tor_libevent_get_base(), 0); - - return 0; -} diff --git a/src/test/include.am b/src/test/include.am index 6ad1b552b7..2badc47a47 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -1,8 +1,8 @@ TESTS += src/test/test -noinst_PROGRAMS+= src/test/bench src/test/bench_workqueue +noinst_PROGRAMS+= src/test/bench if UNITTESTS_ENABLED -noinst_PROGRAMS+= src/test/test src/test/test-child +noinst_PROGRAMS+= src/test/test src/test/test-child src/test/test_workqueue endif src_test_AM_CPPFLAGS = -DSHARE_DATADIR="\"$(datadir)\"" \ @@ -62,8 +62,10 @@ src_test_test_CPPFLAGS= $(src_test_AM_CPPFLAGS) src_test_bench_SOURCES = \ src/test/bench.c -src_test_bench_workqueue_SOURCES = \ - src/test/bench_workqueue.c +src_test_test_workqueue_SOURCES = \ + src/test/test_workqueue.c +src_test_test_workqueue_CPPFLAGS= $(src_test_AM_CPPFLAGS) +src_test_test_workqueue_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) src_test_test_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ @TOR_LDFLAGS_libevent@ @@ -83,11 +85,12 @@ src_test_bench_LDADD = src/or/libtor.a src/common/libor.a \ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ \ @TOR_SYSTEMD_LIBS@ -src_test_bench_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ +src_test_test_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ @TOR_LDFLAGS_libevent@ -src_test_bench_workqueue_LDADD = src/or/libtor.a src/common/libor.a \ - src/common/libor-crypto.a $(LIBDONNA) \ - src/common/libor-event.a \ +src_test_test_workqueue_LDADD = src/or/libtor-testing.a \ + src/common/libor-testing.a \ + src/common/libor-crypto-testing.a $(LIBDONNA) \ + src/common/libor-event-testing.a \ @TOR_ZLIB_LIBS@ @TOR_LIB_MATH@ @TOR_LIBEVENT_LIBS@ \ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c new file mode 100644 index 0000000000..4077fb27a8 --- /dev/null +++ b/src/test/test_workqueue.c @@ -0,0 +1,342 @@ +/* Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "or.h" +#include "compat_threads.h" +#include "onion.h" +#include "workqueue.h" +#include "crypto.h" +#include "crypto_curve25519.h" +#include "compat_libevent.h" + +#include +#ifdef HAVE_EVENT2_EVENT_H +#include +#else +#include +#endif + +static int opt_verbose = 0; +static int opt_n_threads = 8; +static int opt_n_items = 10000; +static int opt_n_inflight = 1000; +static int opt_n_lowwater = 250; +static int opt_ratio_rsa = 5; + +#ifdef TRACK_RESPONSES +tor_mutex_t bitmap_mutex; +int handled_len; +bitarray_t *handled; +#endif + +typedef struct state_s { + int magic; + int n_handled; + crypto_pk_t *rsa; + curve25519_secret_key_t ecdh; +} state_t; + +typedef struct rsa_work_s { + int serial; + uint8_t msg[128]; + uint8_t msglen; +} rsa_work_t; + +typedef struct ecdh_work_s { + int serial; + union { + curve25519_public_key_t pk; + uint8_t msg[32]; + } u; +} ecdh_work_t; + +static void +mark_handled(int serial) +{ +#ifdef TRACK_RESPONSES + tor_mutex_acquire(&bitmap_mutex); + tor_assert(serial < handled_len); + tor_assert(! bitarray_is_set(handled, serial)); + bitarray_set(handled, serial); + tor_mutex_release(&bitmap_mutex); +#else + (void)serial; +#endif +} + +static int +workqueue_do_rsa(void *state, void *work) +{ + rsa_work_t *rw = work; + state_t *st = state; + crypto_pk_t *rsa = st->rsa; + uint8_t sig[256]; + int len; + + tor_assert(st->magic == 13371337); + + len = crypto_pk_private_sign(rsa, (char*)sig, 256, + (char*)rw->msg, rw->msglen); + if (len < 0) { + rw->msglen = 0; + return WQ_RPL_ERROR; + } + + memset(rw->msg, 0, sizeof(rw->msg)); + rw->msglen = len; + memcpy(rw->msg, sig, len); + ++st->n_handled; + + mark_handled(rw->serial); + + return WQ_RPL_REPLY; +} + +#if 0 +static int +workqueue_do_shutdown(void *state, void *work) +{ + (void)state; + (void)work; + (void)cmd; + crypto_pk_free(((state_t*)state)->rsa); + tor_free(state); + return WQ_RPL_SHUTDOWN; +} +#endif + +static int +workqueue_do_ecdh(void *state, void *work) +{ + ecdh_work_t *ew = work; + uint8_t output[CURVE25519_OUTPUT_LEN]; + state_t *st = state; + + tor_assert(st->magic == 13371337); + + curve25519_handshake(output, &st->ecdh, &ew->u.pk); + memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN); + ++st->n_handled; + mark_handled(ew->serial); + return WQ_RPL_REPLY; +} + +static void * +new_state(void *arg) +{ + state_t *st; + (void)arg; + + st = tor_malloc(sizeof(*st)); + /* Every thread gets its own keys. not a problem for benchmarking */ + st->rsa = crypto_pk_new(); + if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) { + puts("keygen failed"); + crypto_pk_free(st->rsa); + tor_free(st); + return NULL; + } + curve25519_secret_key_generate(&st->ecdh, 0); + st->magic = 13371337; + return st; +} + +static void +free_state(void *arg) +{ + state_t *st = arg; + crypto_pk_free(st->rsa); + tor_free(st); +} + +static tor_weak_rng_t weak_rng; +static int n_sent = 0; +static int rsa_sent = 0; +static int ecdh_sent = 0; +static int n_received = 0; + +#ifdef TRACK_RESPONSES +bitarray_t *received; +#endif + +static void +handle_reply(void *arg) +{ +#ifdef TRACK_RESPONSES + rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */ + tor_assert(! bitarray_is_set(received, rw->serial)); + bitarray_set(received,rw->serial); +#endif + + tor_free(arg); + ++n_received; +} + +static int +add_work(threadpool_t *tp) +{ + int add_rsa = + opt_ratio_rsa == 0 || + tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0; + if (add_rsa) { + rsa_work_t *w = tor_malloc_zero(sizeof(*w)); + w->serial = n_sent++; + crypto_rand((char*)w->msg, 20); + w->msglen = 20; + ++rsa_sent; + return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL; + } else { + ecdh_work_t *w = tor_malloc_zero(sizeof(*w)); + w->serial = n_sent++; + /* Not strictly right, but this is just for benchmarks. */ + crypto_rand((char*)w->u.pk.public_key, 32); + ++ecdh_sent; + return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL; + } +} + +static void +replysock_readable_cb(tor_socket_t sock, short what, void *arg) +{ + threadpool_t *tp = arg; + replyqueue_t *rq = threadpool_get_replyqueue(tp); + + int old_r = n_received; + (void) sock; + (void) what; + + replyqueue_process(rq); + if (old_r == n_received) + return; + + if (opt_verbose) + printf("%d / %d\n", n_received, n_sent); +#ifdef TRACK_RESPONSES + tor_mutex_acquire(&bitmap_mutex); + for (i = 0; i < opt_n_items; ++i) { + if (bitarray_is_set(received, i)) + putc('o', stdout); + else if (bitarray_is_set(handled, i)) + putc('!', stdout); + else + putc('.', stdout); + } + puts(""); + tor_mutex_release(&bitmap_mutex); +#endif + + if (n_sent - n_received < opt_n_lowwater) { + while (n_sent < n_received + opt_n_inflight && n_sent < opt_n_items) { + if (! add_work(tp)) { + puts("Couldn't add work."); + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + } + } + } + + if (n_received == n_sent && n_sent >= opt_n_items) { + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + } +} + +static void +help(void) +{ + puts( + "Options:\n" + " -N Run this many items of work\n" + " -T Use this many threads\n" + " -I Have no more than this many requests queued at once\n" + " -L Add items whenever fewer than this many are pending.\n" + " -R Make one out of this many items be a slow (RSA) one"); +} + +int +main(int argc, char **argv) +{ + replyqueue_t *rq; + threadpool_t *tp; + int i; + tor_libevent_cfg evcfg; + struct event *ev; + + for (i = 1; i < argc; ++i) { + if (!strcmp(argv[i], "-v")) { + opt_verbose = 1; + } else if (!strcmp(argv[i], "-T") && i+1 Date: Wed, 25 Sep 2013 14:50:01 -0400 Subject: Move thread tests into their own module --- src/test/include.am | 1 + src/test/test.c | 2 + src/test/test_threads.c | 154 ++++++++++++++++++++++++++++++++++++++++++++++++ src/test/test_util.c | 137 ------------------------------------------ 4 files changed, 157 insertions(+), 137 deletions(-) create mode 100644 src/test/test_threads.c (limited to 'src/test/include.am') diff --git a/src/test/include.am b/src/test/include.am index 2badc47a47..2e13454983 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -46,6 +46,7 @@ src_test_test_SOURCES = \ src/test/test_routerkeys.c \ src/test/test_scheduler.c \ src/test/test_socks.c \ + src/test/test_threads.c \ src/test/test_util.c \ src/test/test_config.c \ src/test/test_hs.c \ diff --git a/src/test/test.c b/src/test/test.c index de6efaf873..edc28cd2d4 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -1297,6 +1297,7 @@ extern struct testcase_t cell_queue_tests[]; extern struct testcase_t options_tests[]; extern struct testcase_t socks_tests[]; extern struct testcase_t entrynodes_tests[]; +extern struct testcase_t thread_tests[]; extern struct testcase_t extorport_tests[]; extern struct testcase_t controller_event_tests[]; extern struct testcase_t logging_tests[]; @@ -1323,6 +1324,7 @@ static struct testgroup_t testgroups[] = { { "container/", container_tests }, { "util/", util_tests }, { "util/logging/", logging_tests }, + { "util/thread/", thread_tests }, { "cellfmt/", cell_format_tests }, { "cellqueue/", cell_queue_tests }, { "dir/", dir_tests }, diff --git a/src/test/test_threads.c b/src/test/test_threads.c new file mode 100644 index 0000000000..2b4c93393f --- /dev/null +++ b/src/test/test_threads.c @@ -0,0 +1,154 @@ +/* Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "orconfig.h" +#include "or.h" +#include "compat_threads.h" +#include "test.h" + +/** mutex for thread test to stop the threads hitting data at the same time. */ +static tor_mutex_t *thread_test_mutex_ = NULL; +/** mutexes for the thread test to make sure that the threads have to + * interleave somewhat. */ +static tor_mutex_t *thread_test_start1_ = NULL, + *thread_test_start2_ = NULL; +/** Shared strmap for the thread test. */ +static strmap_t *thread_test_strmap_ = NULL; +/** The name of thread1 for the thread test */ +static char *thread1_name_ = NULL; +/** The name of thread2 for the thread test */ +static char *thread2_name_ = NULL; + +static void thread_test_func_(void* _s) ATTR_NORETURN; + +/** How many iterations have the threads in the unit test run? */ +static int t1_count = 0, t2_count = 0; + +/** Helper function for threading unit tests: This function runs in a + * subthread. It grabs its own mutex (start1 or start2) to make sure that it + * should start, then it repeatedly alters _test_thread_strmap protected by + * thread_test_mutex_. */ +static void +thread_test_func_(void* _s) +{ + char *s = _s; + int i, *count; + tor_mutex_t *m; + char buf[64]; + char **cp; + if (!strcmp(s, "thread 1")) { + m = thread_test_start1_; + cp = &thread1_name_; + count = &t1_count; + } else { + m = thread_test_start2_; + cp = &thread2_name_; + count = &t2_count; + } + + tor_snprintf(buf, sizeof(buf), "%lu", tor_get_thread_id()); + *cp = tor_strdup(buf); + + tor_mutex_acquire(m); + + for (i=0; i<10000; ++i) { + tor_mutex_acquire(thread_test_mutex_); + strmap_set(thread_test_strmap_, "last to run", *cp); + ++*count; + tor_mutex_release(thread_test_mutex_); + } + tor_mutex_acquire(thread_test_mutex_); + strmap_set(thread_test_strmap_, s, *cp); + tor_mutex_release(thread_test_mutex_); + + tor_mutex_release(m); + + spawn_exit(); +} + +/** Run unit tests for threading logic. */ +static void +test_threads_basic(void *arg) +{ + char *s1 = NULL, *s2 = NULL; + int done = 0, timedout = 0; + time_t started; +#ifndef _WIN32 + struct timeval tv; + tv.tv_sec=0; + tv.tv_usec=100*1000; +#endif + (void)arg; + thread_test_mutex_ = tor_mutex_new(); + thread_test_start1_ = tor_mutex_new(); + thread_test_start2_ = tor_mutex_new(); + thread_test_strmap_ = strmap_new(); + s1 = tor_strdup("thread 1"); + s2 = tor_strdup("thread 2"); + tor_mutex_acquire(thread_test_start1_); + tor_mutex_acquire(thread_test_start2_); + spawn_func(thread_test_func_, s1); + spawn_func(thread_test_func_, s2); + tor_mutex_release(thread_test_start2_); + tor_mutex_release(thread_test_start1_); + started = time(NULL); + while (!done) { + tor_mutex_acquire(thread_test_mutex_); + strmap_assert_ok(thread_test_strmap_); + if (strmap_get(thread_test_strmap_, "thread 1") && + strmap_get(thread_test_strmap_, "thread 2")) { + done = 1; + } else if (time(NULL) > started + 150) { + timedout = done = 1; + } + tor_mutex_release(thread_test_mutex_); +#ifndef _WIN32 + /* Prevent the main thread from starving the worker threads. */ + select(0, NULL, NULL, NULL, &tv); +#endif + } + tor_mutex_acquire(thread_test_start1_); + tor_mutex_release(thread_test_start1_); + tor_mutex_acquire(thread_test_start2_); + tor_mutex_release(thread_test_start2_); + + tor_mutex_free(thread_test_mutex_); + + if (timedout) { + printf("\nTimed out: %d %d", t1_count, t2_count); + tt_assert(strmap_get(thread_test_strmap_, "thread 1")); + tt_assert(strmap_get(thread_test_strmap_, "thread 2")); + tt_assert(!timedout); + } + + /* different thread IDs. */ + tt_assert(strcmp(strmap_get(thread_test_strmap_, "thread 1"), + strmap_get(thread_test_strmap_, "thread 2"))); + tt_assert(!strcmp(strmap_get(thread_test_strmap_, "thread 1"), + strmap_get(thread_test_strmap_, "last to run")) || + !strcmp(strmap_get(thread_test_strmap_, "thread 2"), + strmap_get(thread_test_strmap_, "last to run"))); + + done: + tor_free(s1); + tor_free(s2); + tor_free(thread1_name_); + tor_free(thread2_name_); + if (thread_test_strmap_) + strmap_free(thread_test_strmap_, NULL); + if (thread_test_start1_) + tor_mutex_free(thread_test_start1_); + if (thread_test_start2_) + tor_mutex_free(thread_test_start2_); +} + +#define THREAD_TEST(name) \ + { #name, test_threads_##name, TT_FORK, NULL, NULL } + +struct testcase_t thread_tests[] = { + THREAD_TEST(basic), + END_OF_TESTCASES +}; + diff --git a/src/test/test_util.c b/src/test/test_util.c index 15470e8efa..b4ee934698 100644 --- a/src/test/test_util.c +++ b/src/test/test_util.c @@ -1607,142 +1607,6 @@ test_util_pow2(void *arg) ; } -/** mutex for thread test to stop the threads hitting data at the same time. */ -static tor_mutex_t *thread_test_mutex_ = NULL; -/** mutexes for the thread test to make sure that the threads have to - * interleave somewhat. */ -static tor_mutex_t *thread_test_start1_ = NULL, - *thread_test_start2_ = NULL; -/** Shared strmap for the thread test. */ -static strmap_t *thread_test_strmap_ = NULL; -/** The name of thread1 for the thread test */ -static char *thread1_name_ = NULL; -/** The name of thread2 for the thread test */ -static char *thread2_name_ = NULL; - -static void thread_test_func_(void* _s) ATTR_NORETURN; - -/** How many iterations have the threads in the unit test run? */ -static int t1_count = 0, t2_count = 0; - -/** Helper function for threading unit tests: This function runs in a - * subthread. It grabs its own mutex (start1 or start2) to make sure that it - * should start, then it repeatedly alters _test_thread_strmap protected by - * thread_test_mutex_. */ -static void -thread_test_func_(void* _s) -{ - char *s = _s; - int i, *count; - tor_mutex_t *m; - char buf[64]; - char **cp; - if (!strcmp(s, "thread 1")) { - m = thread_test_start1_; - cp = &thread1_name_; - count = &t1_count; - } else { - m = thread_test_start2_; - cp = &thread2_name_; - count = &t2_count; - } - - tor_snprintf(buf, sizeof(buf), "%lu", tor_get_thread_id()); - *cp = tor_strdup(buf); - - tor_mutex_acquire(m); - - for (i=0; i<10000; ++i) { - tor_mutex_acquire(thread_test_mutex_); - strmap_set(thread_test_strmap_, "last to run", *cp); - ++*count; - tor_mutex_release(thread_test_mutex_); - } - tor_mutex_acquire(thread_test_mutex_); - strmap_set(thread_test_strmap_, s, *cp); - tor_mutex_release(thread_test_mutex_); - - tor_mutex_release(m); - - spawn_exit(); -} - -/** Run unit tests for threading logic. */ -static void -test_util_threads(void *arg) -{ - char *s1 = NULL, *s2 = NULL; - int done = 0, timedout = 0; - time_t started; -#ifndef _WIN32 - struct timeval tv; - tv.tv_sec=0; - tv.tv_usec=100*1000; -#endif - (void)arg; - thread_test_mutex_ = tor_mutex_new(); - thread_test_start1_ = tor_mutex_new(); - thread_test_start2_ = tor_mutex_new(); - thread_test_strmap_ = strmap_new(); - s1 = tor_strdup("thread 1"); - s2 = tor_strdup("thread 2"); - tor_mutex_acquire(thread_test_start1_); - tor_mutex_acquire(thread_test_start2_); - spawn_func(thread_test_func_, s1); - spawn_func(thread_test_func_, s2); - tor_mutex_release(thread_test_start2_); - tor_mutex_release(thread_test_start1_); - started = time(NULL); - while (!done) { - tor_mutex_acquire(thread_test_mutex_); - strmap_assert_ok(thread_test_strmap_); - if (strmap_get(thread_test_strmap_, "thread 1") && - strmap_get(thread_test_strmap_, "thread 2")) { - done = 1; - } else if (time(NULL) > started + 150) { - timedout = done = 1; - } - tor_mutex_release(thread_test_mutex_); -#ifndef _WIN32 - /* Prevent the main thread from starving the worker threads. */ - select(0, NULL, NULL, NULL, &tv); -#endif - } - tor_mutex_acquire(thread_test_start1_); - tor_mutex_release(thread_test_start1_); - tor_mutex_acquire(thread_test_start2_); - tor_mutex_release(thread_test_start2_); - - tor_mutex_free(thread_test_mutex_); - - if (timedout) { - printf("\nTimed out: %d %d", t1_count, t2_count); - tt_assert(strmap_get(thread_test_strmap_, "thread 1")); - tt_assert(strmap_get(thread_test_strmap_, "thread 2")); - tt_assert(!timedout); - } - - /* different thread IDs. */ - tt_assert(strcmp(strmap_get(thread_test_strmap_, "thread 1"), - strmap_get(thread_test_strmap_, "thread 2"))); - tt_assert(!strcmp(strmap_get(thread_test_strmap_, "thread 1"), - strmap_get(thread_test_strmap_, "last to run")) || - !strcmp(strmap_get(thread_test_strmap_, "thread 2"), - strmap_get(thread_test_strmap_, "last to run"))); - - done: - tor_free(s1); - tor_free(s2); - tor_free(thread1_name_); - tor_free(thread2_name_); - if (thread_test_strmap_) - strmap_free(thread_test_strmap_, NULL); - if (thread_test_start1_) - tor_mutex_free(thread_test_start1_); - if (thread_test_start2_) - tor_mutex_free(thread_test_start2_); -} - /** Run unit tests for compression functions */ static void test_util_gzip(void *arg) @@ -4927,7 +4791,6 @@ struct testcase_t util_tests[] = { UTIL_LEGACY(memarea), UTIL_LEGACY(control_formats), UTIL_LEGACY(mmap), - UTIL_LEGACY(threads), UTIL_LEGACY(sscanf), UTIL_LEGACY(format_time_interval), UTIL_LEGACY(path_is_relative), -- cgit v1.2.3-54-g00ecf