From a82604b526a2a258e057d6d515ac17429eb6fb67 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 23 Sep 2013 01:19:16 -0400 Subject: Initial workqueue implemention, with a simple test. It seems to be working, but more tuning is needed. --- src/test/bench_workqueue.c | 298 +++++++++++++++++++++++++++++++++++++++++++++ src/test/include.am | 13 +- 2 files changed, 310 insertions(+), 1 deletion(-) create mode 100644 src/test/bench_workqueue.c (limited to 'src/test') diff --git a/src/test/bench_workqueue.c b/src/test/bench_workqueue.c new file mode 100644 index 0000000000..1bdfbefb3e --- /dev/null +++ b/src/test/bench_workqueue.c @@ -0,0 +1,298 @@ +/* Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "or.h" +#include "compat_threads.h" +#include "onion.h" +#include "workqueue.h" +#include "crypto.h" +#include "crypto_curve25519.h" +#include "compat_libevent.h" + +#include +#ifdef HAVE_EVENT2_EVENT_H +#include +#else +#include +#endif + +#ifdef TRACK_RESPONSES +tor_mutex_t bitmap_mutex; +int handled_len; +bitarray_t *handled; +#endif + +#define N_ITEMS 10000 +#define N_INFLIGHT 1000 +#define RELAUNCH_AT 250 + +typedef struct state_s { + int magic; + int n_handled; + crypto_pk_t *rsa; + curve25519_secret_key_t ecdh; +} state_t; + +typedef struct rsa_work_s { + int serial; + uint8_t msg[128]; + uint8_t msglen; +} rsa_work_t; + +typedef struct ecdh_work_s { + int serial; + union { + curve25519_public_key_t pk; + uint8_t msg[32]; + } u; +} ecdh_work_t; + +static void +mark_handled(int serial) +{ +#ifdef TRACK_RESPONSES + tor_mutex_acquire(&bitmap_mutex); + tor_assert(serial < handled_len); + tor_assert(! bitarray_is_set(handled, serial)); + bitarray_set(handled, serial); + tor_mutex_release(&bitmap_mutex); +#else + (void)serial; +#endif +} + +static int +workqueue_do_rsa(int cmd, void *state, void *work) +{ + rsa_work_t *rw = work; + state_t *st = state; + crypto_pk_t *rsa = st->rsa; + uint8_t sig[256]; + int len; + + tor_assert(st->magic == 13371337); + + if (cmd == WQ_CMD_CANCEL) { + tor_free(work); + return WQ_RPL_NOQUEUE; + } + + len = crypto_pk_private_sign(rsa, (char*)sig, 256, + (char*)rw->msg, rw->msglen); + if (len < 0) { + tor_free(work); + return WQ_RPL_NOQUEUE; + } + + memset(rw->msg, 0, sizeof(rw->msg)); + rw->msglen = len; + memcpy(rw->msg, sig, len); + ++st->n_handled; + + mark_handled(rw->serial); + + return WQ_RPL_QUEUE; +} + +#if 0 +static int +workqueue_do_shutdown(int cmd, void *state, void *work) +{ + (void)state; + (void)work; + (void)cmd; + crypto_pk_free(((state_t*)state)->rsa); + tor_free(state); + return WQ_RPL_SHUTDOWN; +} +#endif + +static int +workqueue_do_ecdh(int cmd, void *state, void *work) +{ + ecdh_work_t *ew = work; + uint8_t output[CURVE25519_OUTPUT_LEN]; + state_t *st = state; + + tor_assert(st->magic == 13371337); + + if (cmd == WQ_CMD_CANCEL) { + tor_free(work); + return WQ_RPL_NOQUEUE; + } + + curve25519_handshake(output, &st->ecdh, &ew->u.pk); + memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN); + ++st->n_handled; + mark_handled(ew->serial); + return WQ_RPL_QUEUE; +} + +static void * +new_state(void *arg) +{ + state_t *st; + (void)arg; + + st = tor_malloc(sizeof(*st)); + /* Every thread gets its own keys. not a problem for benchmarking */ + st->rsa = crypto_pk_new(); + if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) { + puts("keygen failed"); + crypto_pk_free(st->rsa); + tor_free(st); + return NULL; + } + curve25519_secret_key_generate(&st->ecdh, 0); + st->magic = 13371337; + return st; +} + +static void +free_state(void *arg) +{ + state_t *st = arg; + crypto_pk_free(st->rsa); + tor_free(st); +} + +static tor_weak_rng_t weak_rng; +static int n_sent = 0; +static int rsa_sent = 0; +static int ecdh_sent = 0; +static int n_received = 0; + +#ifdef TRACK_RESPONSES +bitarray_t *received; +#endif + +static void +handle_reply(void *arg) +{ +#ifdef TRACK_RESPONSES + rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */ + tor_assert(! bitarray_is_set(received, rw->serial)); + bitarray_set(received,rw->serial); +#endif + + tor_free(arg); + ++n_received; +} + +static int +add_work(threadpool_t *tp) +{ + int add_rsa = tor_weak_random_range(&weak_rng, 5) == 0; + if (add_rsa) { + rsa_work_t *w = tor_malloc_zero(sizeof(*w)); + w->serial = n_sent++; + crypto_rand((char*)w->msg, 20); + w->msglen = 20; + ++rsa_sent; + return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL; + } else { + ecdh_work_t *w = tor_malloc_zero(sizeof(*w)); + w->serial = n_sent++; + /* Not strictly right, but this is just for benchmarks. */ + crypto_rand((char*)w->u.pk.public_key, 32); + ++ecdh_sent; + return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL; + } +} + +static void +replysock_readable_cb(tor_socket_t sock, short what, void *arg) +{ + threadpool_t *tp = arg; + replyqueue_t *rq = threadpool_get_replyqueue(tp); + + int old_r = n_received; + (void) sock; + (void) what; + + replyqueue_process(rq); + if (old_r == n_received) + return; + + printf("%d / %d\n", n_received, n_sent); +#ifdef TRACK_RESPONSES + tor_mutex_acquire(&bitmap_mutex); + for (i = 0; i < N_ITEMS; ++i) { + if (bitarray_is_set(received, i)) + putc('o', stdout); + else if (bitarray_is_set(handled, i)) + putc('!', stdout); + else + putc('.', stdout); + } + puts(""); + tor_mutex_release(&bitmap_mutex); +#endif + + if (n_sent - n_received < RELAUNCH_AT) { + while (n_sent < n_received + N_INFLIGHT && n_sent < N_ITEMS) { + if (! add_work(tp)) { + puts("Couldn't add work."); + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + } + } + } + + if (n_received == n_sent && n_sent >= N_ITEMS) { + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + } +} + +int +main(int argc, char **argv) +{ + replyqueue_t *rq; + threadpool_t *tp; + int i; + tor_libevent_cfg evcfg; + struct event *ev; + + (void)argc; + (void)argv; + + init_logging(1); + crypto_global_init(1, NULL, NULL); + crypto_seed_rng(1); + + rq = replyqueue_new(); + tor_assert(rq); + tp = threadpool_new(16, + rq, new_state, free_state, NULL); + tor_assert(tp); + + crypto_seed_weak_rng(&weak_rng); + + memset(&evcfg, 0, sizeof(evcfg)); + tor_libevent_initialize(&evcfg); + + ev = tor_event_new(tor_libevent_get_base(), + replyqueue_get_socket(rq), EV_READ|EV_PERSIST, + replysock_readable_cb, tp); + + event_add(ev, NULL); + +#ifdef TRACK_RESPONSES + handled = bitarray_init_zero(N_ITEMS); + received = bitarray_init_zero(N_ITEMS); + tor_mutex_init(&bitmap_mutex); + handled_len = N_ITEMS; +#endif + + for (i = 0; i < N_INFLIGHT; ++i) { + if (! add_work(tp)) { + puts("Couldn't add work."); + return 1; + } + } + + event_base_loop(tor_libevent_get_base(), 0); + + return 0; +} diff --git a/src/test/include.am b/src/test/include.am index b9b381fdae..6ad1b552b7 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -1,6 +1,6 @@ TESTS += src/test/test -noinst_PROGRAMS+= src/test/bench +noinst_PROGRAMS+= src/test/bench src/test/bench_workqueue if UNITTESTS_ENABLED noinst_PROGRAMS+= src/test/test src/test/test-child endif @@ -62,6 +62,9 @@ src_test_test_CPPFLAGS= $(src_test_AM_CPPFLAGS) src_test_bench_SOURCES = \ src/test/bench.c +src_test_bench_workqueue_SOURCES = \ + src/test/bench_workqueue.c + src_test_test_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ @TOR_LDFLAGS_libevent@ src_test_test_LDADD = src/or/libtor-testing.a src/common/libor-testing.a \ @@ -80,6 +83,14 @@ src_test_bench_LDADD = src/or/libtor.a src/common/libor.a \ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ \ @TOR_SYSTEMD_LIBS@ +src_test_bench_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ + @TOR_LDFLAGS_libevent@ +src_test_bench_workqueue_LDADD = src/or/libtor.a src/common/libor.a \ + src/common/libor-crypto.a $(LIBDONNA) \ + src/common/libor-event.a \ + @TOR_ZLIB_LIBS@ @TOR_LIB_MATH@ @TOR_LIBEVENT_LIBS@ \ + @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ + noinst_HEADERS+= \ src/test/fakechans.h \ src/test/test.h \ -- cgit v1.2.3-54-g00ecf From c7eebe237ddf0555a99b2ef10fd95def2a4bbbd4 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 24 Sep 2013 16:57:40 -0400 Subject: Make pending work cancellable. --- src/common/workqueue.c | 72 +++++++++++++++++++++++++++++----------------- src/common/workqueue.h | 18 ++++++------ src/test/bench_workqueue.c | 24 +++++----------- 3 files changed, 61 insertions(+), 53 deletions(-) (limited to 'src/test') diff --git a/src/common/workqueue.c b/src/common/workqueue.c index ea8dcb0f9b..80e061dfb5 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -31,16 +31,18 @@ keep array of threads; round-robin between them. */ -typedef struct workqueue_entry_s { - TOR_SIMPLEQ_ENTRY(workqueue_entry_s) next_work; - int (*fn)(int status, void *state, void *arg); +struct workqueue_entry_s { + TOR_TAILQ_ENTRY(workqueue_entry_s) next_work; + struct workerthread_s *on_thread; + uint8_t pending; + int (*fn)(void *state, void *arg); void (*reply_fn)(void *arg); void *arg; -} workqueue_entry_t; +}; struct replyqueue_s { tor_mutex_t lock; - TOR_SIMPLEQ_HEAD(, workqueue_entry_s) answers; + TOR_TAILQ_HEAD(, workqueue_entry_s) answers; void (*alert_fn)(struct replyqueue_s *); // lock not held on this, next 2. tor_socket_t write_sock; @@ -50,7 +52,7 @@ struct replyqueue_s { typedef struct workerthread_s { tor_mutex_t lock; tor_cond_t condition; - TOR_SIMPLEQ_HEAD(, workqueue_entry_s) work; + TOR_TAILQ_HEAD(, workqueue_entry_s) work; unsigned is_running; unsigned is_shut_down; unsigned waiting; @@ -76,7 +78,7 @@ struct threadpool_s { static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); static workqueue_entry_t * -workqueue_entry_new(int (*fn)(int, void*, void*), +workqueue_entry_new(int (*fn)(void*, void*), void (*reply_fn)(void*), void *arg) { @@ -95,6 +97,23 @@ workqueue_entry_free(workqueue_entry_t *ent) tor_free(ent); } +int +workqueue_entry_cancel(workqueue_entry_t *ent) +{ + int cancelled = 0; + tor_mutex_acquire(&ent->on_thread->lock); + if (ent->pending) { + TOR_TAILQ_REMOVE(&ent->on_thread->work, ent, next_work); + cancelled = 1; + } + tor_mutex_release(&ent->on_thread->lock); + + if (cancelled) { + tor_free(ent); + } + return cancelled; +} + static void worker_thread_main(void *thread_) { @@ -107,20 +126,17 @@ worker_thread_main(void *thread_) thread->is_running = 1; while (1) { /* lock held. */ - while (!TOR_SIMPLEQ_EMPTY(&thread->work)) { + while (!TOR_TAILQ_EMPTY(&thread->work)) { /* lock held. */ - work = TOR_SIMPLEQ_FIRST(&thread->work); - TOR_SIMPLEQ_REMOVE_HEAD(&thread->work, next_work); + work = TOR_TAILQ_FIRST(&thread->work); + TOR_TAILQ_REMOVE(&thread->work, work, next_work); + work->pending = 0; tor_mutex_release(&thread->lock); - result = work->fn(WQ_CMD_RUN, thread->state, work->arg); + result = work->fn(thread->state, work->arg); - if (result == WQ_RPL_QUEUE) { - queue_reply(thread->reply_queue, work); - } else { - workqueue_entry_free(work); - } + queue_reply(thread->reply_queue, work); tor_mutex_acquire(&thread->lock); if (result >= WQ_RPL_ERROR) { @@ -148,8 +164,8 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work) { int was_empty; tor_mutex_acquire(&queue->lock); - was_empty = TOR_SIMPLEQ_EMPTY(&queue->answers); - TOR_SIMPLEQ_INSERT_TAIL(&queue->answers, work, next_work); + was_empty = TOR_TAILQ_EMPTY(&queue->answers); + TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work); tor_mutex_release(&queue->lock); if (was_empty) { @@ -175,7 +191,7 @@ workerthread_new(void *state, replyqueue_t *replyqueue) workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); tor_mutex_init_for_cond(&thr->lock); tor_cond_init(&thr->condition); - TOR_SIMPLEQ_INIT(&thr->work); + TOR_TAILQ_INIT(&thr->work); thr->state = state; thr->reply_queue = replyqueue; @@ -187,9 +203,9 @@ workerthread_new(void *state, replyqueue_t *replyqueue) return thr; } -void * +workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, - int (*fn)(int, void *, void *), + int (*fn)(void *, void *), void (*reply_fn)(void *), void *arg) { @@ -206,11 +222,12 @@ threadpool_queue_work(threadpool_t *pool, pool->next_for_work = 0; tor_mutex_release(&pool->lock); - ent = workqueue_entry_new(fn, reply_fn, arg); tor_mutex_acquire(&worker->lock); - TOR_SIMPLEQ_INSERT_TAIL(&worker->work, ent, next_work); + ent->on_thread = worker; + ent->pending = 1; + TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work); if (worker->waiting) /* XXXX inside or outside of lock?? */ tor_cond_signal_one(&worker->condition); @@ -298,7 +315,7 @@ replyqueue_new(void) rq = tor_malloc_zero(sizeof(replyqueue_t)); tor_mutex_init(&rq->lock); - TOR_SIMPLEQ_INIT(&rq->answers); + TOR_TAILQ_INIT(&rq->answers); rq->read_sock = pair[0]; rq->write_sock = pair[1]; @@ -331,10 +348,10 @@ replyqueue_process(replyqueue_t *queue) /* XXXX freak out on r == 0, or r == "error, not retryable". */ tor_mutex_acquire(&queue->lock); - while (!TOR_SIMPLEQ_EMPTY(&queue->answers)) { + while (!TOR_TAILQ_EMPTY(&queue->answers)) { /* lock held. */ - workqueue_entry_t *work = TOR_SIMPLEQ_FIRST(&queue->answers); - TOR_SIMPLEQ_REMOVE_HEAD(&queue->answers, next_work); + workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); + TOR_TAILQ_REMOVE(&queue->answers, work, next_work); tor_mutex_release(&queue->lock); work->reply_fn(work->arg); @@ -345,3 +362,4 @@ replyqueue_process(replyqueue_t *queue) tor_mutex_release(&queue->lock); } + diff --git a/src/common/workqueue.h b/src/common/workqueue.h index e502734b84..47753cff12 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -8,20 +8,20 @@ typedef struct replyqueue_s replyqueue_t; typedef struct threadpool_s threadpool_t; - +typedef struct workqueue_entry_s workqueue_entry_t; #define WQ_CMD_RUN 0 #define WQ_CMD_CANCEL 1 -#define WQ_RPL_QUEUE 0 -#define WQ_RPL_NOQUEUE 1 -#define WQ_RPL_ERROR 2 -#define WQ_RPL_SHUTDOWN 3 +#define WQ_RPL_REPLY 0 +#define WQ_RPL_ERROR 1 +#define WQ_RPL_SHUTDOWN 2 -void *threadpool_queue_work(threadpool_t *pool, - int (*fn)(int, void *, void *), - void (*reply_fn)(void *), - void *arg); +workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, + int (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg); +int workqueue_entry_cancel(workqueue_entry_t *pending_work); int threadpool_start_threads(threadpool_t *pool, int n); threadpool_t *threadpool_new(int n_threads, replyqueue_t *replyqueue, diff --git a/src/test/bench_workqueue.c b/src/test/bench_workqueue.c index 1bdfbefb3e..f190c613e5 100644 --- a/src/test/bench_workqueue.c +++ b/src/test/bench_workqueue.c @@ -64,7 +64,7 @@ mark_handled(int serial) } static int -workqueue_do_rsa(int cmd, void *state, void *work) +workqueue_do_rsa(void *state, void *work) { rsa_work_t *rw = work; state_t *st = state; @@ -74,16 +74,11 @@ workqueue_do_rsa(int cmd, void *state, void *work) tor_assert(st->magic == 13371337); - if (cmd == WQ_CMD_CANCEL) { - tor_free(work); - return WQ_RPL_NOQUEUE; - } - len = crypto_pk_private_sign(rsa, (char*)sig, 256, (char*)rw->msg, rw->msglen); if (len < 0) { - tor_free(work); - return WQ_RPL_NOQUEUE; + rw->msglen = 0; + return WQ_RPL_ERROR; } memset(rw->msg, 0, sizeof(rw->msg)); @@ -93,12 +88,12 @@ workqueue_do_rsa(int cmd, void *state, void *work) mark_handled(rw->serial); - return WQ_RPL_QUEUE; + return WQ_RPL_REPLY; } #if 0 static int -workqueue_do_shutdown(int cmd, void *state, void *work) +workqueue_do_shutdown(void *state, void *work) { (void)state; (void)work; @@ -110,7 +105,7 @@ workqueue_do_shutdown(int cmd, void *state, void *work) #endif static int -workqueue_do_ecdh(int cmd, void *state, void *work) +workqueue_do_ecdh(void *state, void *work) { ecdh_work_t *ew = work; uint8_t output[CURVE25519_OUTPUT_LEN]; @@ -118,16 +113,11 @@ workqueue_do_ecdh(int cmd, void *state, void *work) tor_assert(st->magic == 13371337); - if (cmd == WQ_CMD_CANCEL) { - tor_free(work); - return WQ_RPL_NOQUEUE; - } - curve25519_handshake(output, &st->ecdh, &ew->u.pk); memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN); ++st->n_handled; mark_handled(ew->serial); - return WQ_RPL_QUEUE; + return WQ_RPL_REPLY; } static void * -- cgit v1.2.3-54-g00ecf From 93ad89e9d219d6cea764652a05c236210c7de3fa Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 25 Sep 2013 11:36:02 -0400 Subject: Rename bench_workqueue -> test_workqueue and make it a unit test. --- .gitignore | 2 + src/test/bench_workqueue.c | 288 -------------------------------------- src/test/include.am | 19 +-- src/test/test_workqueue.c | 342 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 355 insertions(+), 296 deletions(-) delete mode 100644 src/test/bench_workqueue.c create mode 100644 src/test/test_workqueue.c (limited to 'src/test') diff --git a/.gitignore b/.gitignore index 9ddd0c5385..e63576cfd4 100644 --- a/.gitignore +++ b/.gitignore @@ -163,10 +163,12 @@ cscope.* /src/test/test-bt-cl /src/test/test-child /src/test/test-ntor-cl +/src/test/test_workqueue /src/test/test.exe /src/test/test-bt-cl.exe /src/test/test-child.exe /src/test/test-ntor-cl.exe +/src/test/test_workqueue.exe # /src/tools/ /src/tools/tor-checkkey diff --git a/src/test/bench_workqueue.c b/src/test/bench_workqueue.c deleted file mode 100644 index f190c613e5..0000000000 --- a/src/test/bench_workqueue.c +++ /dev/null @@ -1,288 +0,0 @@ -/* Copyright (c) 2001-2004, Roger Dingledine. - * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. - * Copyright (c) 2007-2013, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -#include "or.h" -#include "compat_threads.h" -#include "onion.h" -#include "workqueue.h" -#include "crypto.h" -#include "crypto_curve25519.h" -#include "compat_libevent.h" - -#include -#ifdef HAVE_EVENT2_EVENT_H -#include -#else -#include -#endif - -#ifdef TRACK_RESPONSES -tor_mutex_t bitmap_mutex; -int handled_len; -bitarray_t *handled; -#endif - -#define N_ITEMS 10000 -#define N_INFLIGHT 1000 -#define RELAUNCH_AT 250 - -typedef struct state_s { - int magic; - int n_handled; - crypto_pk_t *rsa; - curve25519_secret_key_t ecdh; -} state_t; - -typedef struct rsa_work_s { - int serial; - uint8_t msg[128]; - uint8_t msglen; -} rsa_work_t; - -typedef struct ecdh_work_s { - int serial; - union { - curve25519_public_key_t pk; - uint8_t msg[32]; - } u; -} ecdh_work_t; - -static void -mark_handled(int serial) -{ -#ifdef TRACK_RESPONSES - tor_mutex_acquire(&bitmap_mutex); - tor_assert(serial < handled_len); - tor_assert(! bitarray_is_set(handled, serial)); - bitarray_set(handled, serial); - tor_mutex_release(&bitmap_mutex); -#else - (void)serial; -#endif -} - -static int -workqueue_do_rsa(void *state, void *work) -{ - rsa_work_t *rw = work; - state_t *st = state; - crypto_pk_t *rsa = st->rsa; - uint8_t sig[256]; - int len; - - tor_assert(st->magic == 13371337); - - len = crypto_pk_private_sign(rsa, (char*)sig, 256, - (char*)rw->msg, rw->msglen); - if (len < 0) { - rw->msglen = 0; - return WQ_RPL_ERROR; - } - - memset(rw->msg, 0, sizeof(rw->msg)); - rw->msglen = len; - memcpy(rw->msg, sig, len); - ++st->n_handled; - - mark_handled(rw->serial); - - return WQ_RPL_REPLY; -} - -#if 0 -static int -workqueue_do_shutdown(void *state, void *work) -{ - (void)state; - (void)work; - (void)cmd; - crypto_pk_free(((state_t*)state)->rsa); - tor_free(state); - return WQ_RPL_SHUTDOWN; -} -#endif - -static int -workqueue_do_ecdh(void *state, void *work) -{ - ecdh_work_t *ew = work; - uint8_t output[CURVE25519_OUTPUT_LEN]; - state_t *st = state; - - tor_assert(st->magic == 13371337); - - curve25519_handshake(output, &st->ecdh, &ew->u.pk); - memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN); - ++st->n_handled; - mark_handled(ew->serial); - return WQ_RPL_REPLY; -} - -static void * -new_state(void *arg) -{ - state_t *st; - (void)arg; - - st = tor_malloc(sizeof(*st)); - /* Every thread gets its own keys. not a problem for benchmarking */ - st->rsa = crypto_pk_new(); - if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) { - puts("keygen failed"); - crypto_pk_free(st->rsa); - tor_free(st); - return NULL; - } - curve25519_secret_key_generate(&st->ecdh, 0); - st->magic = 13371337; - return st; -} - -static void -free_state(void *arg) -{ - state_t *st = arg; - crypto_pk_free(st->rsa); - tor_free(st); -} - -static tor_weak_rng_t weak_rng; -static int n_sent = 0; -static int rsa_sent = 0; -static int ecdh_sent = 0; -static int n_received = 0; - -#ifdef TRACK_RESPONSES -bitarray_t *received; -#endif - -static void -handle_reply(void *arg) -{ -#ifdef TRACK_RESPONSES - rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */ - tor_assert(! bitarray_is_set(received, rw->serial)); - bitarray_set(received,rw->serial); -#endif - - tor_free(arg); - ++n_received; -} - -static int -add_work(threadpool_t *tp) -{ - int add_rsa = tor_weak_random_range(&weak_rng, 5) == 0; - if (add_rsa) { - rsa_work_t *w = tor_malloc_zero(sizeof(*w)); - w->serial = n_sent++; - crypto_rand((char*)w->msg, 20); - w->msglen = 20; - ++rsa_sent; - return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL; - } else { - ecdh_work_t *w = tor_malloc_zero(sizeof(*w)); - w->serial = n_sent++; - /* Not strictly right, but this is just for benchmarks. */ - crypto_rand((char*)w->u.pk.public_key, 32); - ++ecdh_sent; - return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL; - } -} - -static void -replysock_readable_cb(tor_socket_t sock, short what, void *arg) -{ - threadpool_t *tp = arg; - replyqueue_t *rq = threadpool_get_replyqueue(tp); - - int old_r = n_received; - (void) sock; - (void) what; - - replyqueue_process(rq); - if (old_r == n_received) - return; - - printf("%d / %d\n", n_received, n_sent); -#ifdef TRACK_RESPONSES - tor_mutex_acquire(&bitmap_mutex); - for (i = 0; i < N_ITEMS; ++i) { - if (bitarray_is_set(received, i)) - putc('o', stdout); - else if (bitarray_is_set(handled, i)) - putc('!', stdout); - else - putc('.', stdout); - } - puts(""); - tor_mutex_release(&bitmap_mutex); -#endif - - if (n_sent - n_received < RELAUNCH_AT) { - while (n_sent < n_received + N_INFLIGHT && n_sent < N_ITEMS) { - if (! add_work(tp)) { - puts("Couldn't add work."); - tor_event_base_loopexit(tor_libevent_get_base(), NULL); - } - } - } - - if (n_received == n_sent && n_sent >= N_ITEMS) { - tor_event_base_loopexit(tor_libevent_get_base(), NULL); - } -} - -int -main(int argc, char **argv) -{ - replyqueue_t *rq; - threadpool_t *tp; - int i; - tor_libevent_cfg evcfg; - struct event *ev; - - (void)argc; - (void)argv; - - init_logging(1); - crypto_global_init(1, NULL, NULL); - crypto_seed_rng(1); - - rq = replyqueue_new(); - tor_assert(rq); - tp = threadpool_new(16, - rq, new_state, free_state, NULL); - tor_assert(tp); - - crypto_seed_weak_rng(&weak_rng); - - memset(&evcfg, 0, sizeof(evcfg)); - tor_libevent_initialize(&evcfg); - - ev = tor_event_new(tor_libevent_get_base(), - replyqueue_get_socket(rq), EV_READ|EV_PERSIST, - replysock_readable_cb, tp); - - event_add(ev, NULL); - -#ifdef TRACK_RESPONSES - handled = bitarray_init_zero(N_ITEMS); - received = bitarray_init_zero(N_ITEMS); - tor_mutex_init(&bitmap_mutex); - handled_len = N_ITEMS; -#endif - - for (i = 0; i < N_INFLIGHT; ++i) { - if (! add_work(tp)) { - puts("Couldn't add work."); - return 1; - } - } - - event_base_loop(tor_libevent_get_base(), 0); - - return 0; -} diff --git a/src/test/include.am b/src/test/include.am index 6ad1b552b7..2badc47a47 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -1,8 +1,8 @@ TESTS += src/test/test -noinst_PROGRAMS+= src/test/bench src/test/bench_workqueue +noinst_PROGRAMS+= src/test/bench if UNITTESTS_ENABLED -noinst_PROGRAMS+= src/test/test src/test/test-child +noinst_PROGRAMS+= src/test/test src/test/test-child src/test/test_workqueue endif src_test_AM_CPPFLAGS = -DSHARE_DATADIR="\"$(datadir)\"" \ @@ -62,8 +62,10 @@ src_test_test_CPPFLAGS= $(src_test_AM_CPPFLAGS) src_test_bench_SOURCES = \ src/test/bench.c -src_test_bench_workqueue_SOURCES = \ - src/test/bench_workqueue.c +src_test_test_workqueue_SOURCES = \ + src/test/test_workqueue.c +src_test_test_workqueue_CPPFLAGS= $(src_test_AM_CPPFLAGS) +src_test_test_workqueue_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) src_test_test_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ @TOR_LDFLAGS_libevent@ @@ -83,11 +85,12 @@ src_test_bench_LDADD = src/or/libtor.a src/common/libor.a \ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ \ @TOR_SYSTEMD_LIBS@ -src_test_bench_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ +src_test_test_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ @TOR_LDFLAGS_libevent@ -src_test_bench_workqueue_LDADD = src/or/libtor.a src/common/libor.a \ - src/common/libor-crypto.a $(LIBDONNA) \ - src/common/libor-event.a \ +src_test_test_workqueue_LDADD = src/or/libtor-testing.a \ + src/common/libor-testing.a \ + src/common/libor-crypto-testing.a $(LIBDONNA) \ + src/common/libor-event-testing.a \ @TOR_ZLIB_LIBS@ @TOR_LIB_MATH@ @TOR_LIBEVENT_LIBS@ \ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c new file mode 100644 index 0000000000..4077fb27a8 --- /dev/null +++ b/src/test/test_workqueue.c @@ -0,0 +1,342 @@ +/* Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "or.h" +#include "compat_threads.h" +#include "onion.h" +#include "workqueue.h" +#include "crypto.h" +#include "crypto_curve25519.h" +#include "compat_libevent.h" + +#include +#ifdef HAVE_EVENT2_EVENT_H +#include +#else +#include +#endif + +static int opt_verbose = 0; +static int opt_n_threads = 8; +static int opt_n_items = 10000; +static int opt_n_inflight = 1000; +static int opt_n_lowwater = 250; +static int opt_ratio_rsa = 5; + +#ifdef TRACK_RESPONSES +tor_mutex_t bitmap_mutex; +int handled_len; +bitarray_t *handled; +#endif + +typedef struct state_s { + int magic; + int n_handled; + crypto_pk_t *rsa; + curve25519_secret_key_t ecdh; +} state_t; + +typedef struct rsa_work_s { + int serial; + uint8_t msg[128]; + uint8_t msglen; +} rsa_work_t; + +typedef struct ecdh_work_s { + int serial; + union { + curve25519_public_key_t pk; + uint8_t msg[32]; + } u; +} ecdh_work_t; + +static void +mark_handled(int serial) +{ +#ifdef TRACK_RESPONSES + tor_mutex_acquire(&bitmap_mutex); + tor_assert(serial < handled_len); + tor_assert(! bitarray_is_set(handled, serial)); + bitarray_set(handled, serial); + tor_mutex_release(&bitmap_mutex); +#else + (void)serial; +#endif +} + +static int +workqueue_do_rsa(void *state, void *work) +{ + rsa_work_t *rw = work; + state_t *st = state; + crypto_pk_t *rsa = st->rsa; + uint8_t sig[256]; + int len; + + tor_assert(st->magic == 13371337); + + len = crypto_pk_private_sign(rsa, (char*)sig, 256, + (char*)rw->msg, rw->msglen); + if (len < 0) { + rw->msglen = 0; + return WQ_RPL_ERROR; + } + + memset(rw->msg, 0, sizeof(rw->msg)); + rw->msglen = len; + memcpy(rw->msg, sig, len); + ++st->n_handled; + + mark_handled(rw->serial); + + return WQ_RPL_REPLY; +} + +#if 0 +static int +workqueue_do_shutdown(void *state, void *work) +{ + (void)state; + (void)work; + (void)cmd; + crypto_pk_free(((state_t*)state)->rsa); + tor_free(state); + return WQ_RPL_SHUTDOWN; +} +#endif + +static int +workqueue_do_ecdh(void *state, void *work) +{ + ecdh_work_t *ew = work; + uint8_t output[CURVE25519_OUTPUT_LEN]; + state_t *st = state; + + tor_assert(st->magic == 13371337); + + curve25519_handshake(output, &st->ecdh, &ew->u.pk); + memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN); + ++st->n_handled; + mark_handled(ew->serial); + return WQ_RPL_REPLY; +} + +static void * +new_state(void *arg) +{ + state_t *st; + (void)arg; + + st = tor_malloc(sizeof(*st)); + /* Every thread gets its own keys. not a problem for benchmarking */ + st->rsa = crypto_pk_new(); + if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) { + puts("keygen failed"); + crypto_pk_free(st->rsa); + tor_free(st); + return NULL; + } + curve25519_secret_key_generate(&st->ecdh, 0); + st->magic = 13371337; + return st; +} + +static void +free_state(void *arg) +{ + state_t *st = arg; + crypto_pk_free(st->rsa); + tor_free(st); +} + +static tor_weak_rng_t weak_rng; +static int n_sent = 0; +static int rsa_sent = 0; +static int ecdh_sent = 0; +static int n_received = 0; + +#ifdef TRACK_RESPONSES +bitarray_t *received; +#endif + +static void +handle_reply(void *arg) +{ +#ifdef TRACK_RESPONSES + rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */ + tor_assert(! bitarray_is_set(received, rw->serial)); + bitarray_set(received,rw->serial); +#endif + + tor_free(arg); + ++n_received; +} + +static int +add_work(threadpool_t *tp) +{ + int add_rsa = + opt_ratio_rsa == 0 || + tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0; + if (add_rsa) { + rsa_work_t *w = tor_malloc_zero(sizeof(*w)); + w->serial = n_sent++; + crypto_rand((char*)w->msg, 20); + w->msglen = 20; + ++rsa_sent; + return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL; + } else { + ecdh_work_t *w = tor_malloc_zero(sizeof(*w)); + w->serial = n_sent++; + /* Not strictly right, but this is just for benchmarks. */ + crypto_rand((char*)w->u.pk.public_key, 32); + ++ecdh_sent; + return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL; + } +} + +static void +replysock_readable_cb(tor_socket_t sock, short what, void *arg) +{ + threadpool_t *tp = arg; + replyqueue_t *rq = threadpool_get_replyqueue(tp); + + int old_r = n_received; + (void) sock; + (void) what; + + replyqueue_process(rq); + if (old_r == n_received) + return; + + if (opt_verbose) + printf("%d / %d\n", n_received, n_sent); +#ifdef TRACK_RESPONSES + tor_mutex_acquire(&bitmap_mutex); + for (i = 0; i < opt_n_items; ++i) { + if (bitarray_is_set(received, i)) + putc('o', stdout); + else if (bitarray_is_set(handled, i)) + putc('!', stdout); + else + putc('.', stdout); + } + puts(""); + tor_mutex_release(&bitmap_mutex); +#endif + + if (n_sent - n_received < opt_n_lowwater) { + while (n_sent < n_received + opt_n_inflight && n_sent < opt_n_items) { + if (! add_work(tp)) { + puts("Couldn't add work."); + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + } + } + } + + if (n_received == n_sent && n_sent >= opt_n_items) { + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + } +} + +static void +help(void) +{ + puts( + "Options:\n" + " -N Run this many items of work\n" + " -T Use this many threads\n" + " -I Have no more than this many requests queued at once\n" + " -L Add items whenever fewer than this many are pending.\n" + " -R Make one out of this many items be a slow (RSA) one"); +} + +int +main(int argc, char **argv) +{ + replyqueue_t *rq; + threadpool_t *tp; + int i; + tor_libevent_cfg evcfg; + struct event *ev; + + for (i = 1; i < argc; ++i) { + if (!strcmp(argv[i], "-v")) { + opt_verbose = 1; + } else if (!strcmp(argv[i], "-T") && i+1 Date: Wed, 25 Sep 2013 14:31:59 -0400 Subject: Test a little more of compat_threads.c --- src/common/compat_threads.c | 20 +++++++++++--------- src/common/compat_threads.h | 9 ++++++++- src/common/workqueue.c | 4 ++-- src/common/workqueue.h | 2 +- src/test/test_workqueue.c | 19 ++++++++++++++++--- 5 files changed, 38 insertions(+), 16 deletions(-) (limited to 'src/test') diff --git a/src/common/compat_threads.c b/src/common/compat_threads.c index f2a516a4a3..648eaa2d80 100644 --- a/src/common/compat_threads.c +++ b/src/common/compat_threads.c @@ -155,19 +155,18 @@ sock_drain(tor_socket_t fd) /** Allocate a new set of alert sockets, and set the appropriate function * pointers, in socks_out. */ int -alert_sockets_create(alert_sockets_t *socks_out) +alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags) { - tor_socket_t socks[2]; + tor_socket_t socks[2] = { TOR_INVALID_SOCKET, TOR_INVALID_SOCKET }; #ifdef HAVE_EVENTFD /* First, we try the Linux eventfd() syscall. This gives a 64-bit counter * associated with a single file descriptor. */ #if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK) - socks[0] = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); -#else - socks[0] = -1; + if (!(flags & ASOCKS_NOEVENTFD2)) + socks[0] = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); #endif - if (socks[0] < 0) { + if (socks[0] < 0 && !(flags & ASOCKS_NOEVENTFD)) { socks[0] = eventfd(0,0); if (socks[0] >= 0) { if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 || @@ -188,7 +187,8 @@ alert_sockets_create(alert_sockets_t *socks_out) #ifdef HAVE_PIPE2 /* Now we're going to try pipes. First type the pipe2() syscall, if we * have it, so we can save some calls... */ - if (pipe2(socks, O_NONBLOCK|O_CLOEXEC) == 0) { + if (!(flags & ASOCKS_NOPIPE2) && + pipe2(socks, O_NONBLOCK|O_CLOEXEC) == 0) { socks_out->read_fd = socks[0]; socks_out->write_fd = socks[1]; socks_out->alert_fn = pipe_alert; @@ -200,7 +200,8 @@ alert_sockets_create(alert_sockets_t *socks_out) #ifdef HAVE_PIPE /* Now try the regular pipe() syscall. Pipes have a bit lower overhead than * socketpairs, fwict. */ - if (pipe(socks) == 0) { + if (!(flags & ASOCKS_NOPIPE) && + pipe(socks) == 0) { if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 || fcntl(socks[1], F_SETFD, FD_CLOEXEC) < 0 || set_socket_nonblocking(socks[0]) < 0 || @@ -218,7 +219,8 @@ alert_sockets_create(alert_sockets_t *socks_out) #endif /* If nothing else worked, fall back on socketpair(). */ - if (tor_socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == 0) { + if (!(flags & ASOCKS_NOSOCKETPAIR) && + tor_socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == 0) { if (set_socket_nonblocking(socks[0]) < 0 || set_socket_nonblocking(socks[1])) { tor_close_socket(socks[0]); diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h index 245df76178..1b59391d3b 100644 --- a/src/common/compat_threads.h +++ b/src/common/compat_threads.h @@ -102,7 +102,14 @@ typedef struct alert_sockets_s { int (*drain_fn)(tor_socket_t read_fd); } alert_sockets_t; -int alert_sockets_create(alert_sockets_t *socks_out); +/* Flags to disable one or more alert_sockets backends. */ +#define ASOCKS_NOEVENTFD2 (1u<<0) +#define ASOCKS_NOEVENTFD (1u<<1) +#define ASOCKS_NOPIPE2 (1u<<2) +#define ASOCKS_NOPIPE (1u<<3) +#define ASOCKS_NOSOCKETPAIR (1u<<4) + +int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags); void alert_sockets_close(alert_sockets_t *socks); #endif diff --git a/src/common/workqueue.c b/src/common/workqueue.c index e07787b404..9293e1f9f0 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -397,12 +397,12 @@ threadpool_get_replyqueue(threadpool_t *tp) * IO-centric event loop, it needs to get woken up with means other than a * condition variable. */ replyqueue_t * -replyqueue_new(void) +replyqueue_new(uint32_t alertsocks_flags) { replyqueue_t *rq; rq = tor_malloc_zero(sizeof(replyqueue_t)); - if (alert_sockets_create(&rq->alert) < 0) { + if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) { tor_free(rq); return NULL; } diff --git a/src/common/workqueue.h b/src/common/workqueue.h index dca947e915..5a6cd80fb0 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -40,7 +40,7 @@ threadpool_t *threadpool_new(int n_threads, void *arg); replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp); -replyqueue_t *replyqueue_new(void); +replyqueue_t *replyqueue_new(uint32_t alertsocks_flags); tor_socket_t replyqueue_get_socket(replyqueue_t *rq); void replyqueue_process(replyqueue_t *queue); diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c index 4077fb27a8..7ef54ef22b 100644 --- a/src/test/test_workqueue.c +++ b/src/test/test_workqueue.c @@ -249,8 +249,10 @@ help(void) " -N Run this many items of work\n" " -T Use this many threads\n" " -I Have no more than this many requests queued at once\n" - " -L Add items whenever fewer than this many are pending.\n" - " -R Make one out of this many items be a slow (RSA) one"); + " -L Add items whenever fewer than this many are pending\n" + " -R Make one out of this many items be a slow (RSA) one\n" + " --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n" + " Disable one of the alert_socket backends."); } int @@ -261,6 +263,7 @@ main(int argc, char **argv) int i; tor_libevent_cfg evcfg; struct event *ev; + uint32_t as_flags = 0; for (i = 1; i < argc; ++i) { if (!strcmp(argv[i], "-v")) { @@ -275,6 +278,16 @@ main(int argc, char **argv) opt_n_lowwater = atoi(argv[++i]); } else if (!strcmp(argv[i], "-R") && i+1 Date: Wed, 25 Sep 2013 14:50:01 -0400 Subject: Move thread tests into their own module --- src/test/include.am | 1 + src/test/test.c | 2 + src/test/test_threads.c | 154 ++++++++++++++++++++++++++++++++++++++++++++++++ src/test/test_util.c | 137 ------------------------------------------ 4 files changed, 157 insertions(+), 137 deletions(-) create mode 100644 src/test/test_threads.c (limited to 'src/test') diff --git a/src/test/include.am b/src/test/include.am index 2badc47a47..2e13454983 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -46,6 +46,7 @@ src_test_test_SOURCES = \ src/test/test_routerkeys.c \ src/test/test_scheduler.c \ src/test/test_socks.c \ + src/test/test_threads.c \ src/test/test_util.c \ src/test/test_config.c \ src/test/test_hs.c \ diff --git a/src/test/test.c b/src/test/test.c index de6efaf873..edc28cd2d4 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -1297,6 +1297,7 @@ extern struct testcase_t cell_queue_tests[]; extern struct testcase_t options_tests[]; extern struct testcase_t socks_tests[]; extern struct testcase_t entrynodes_tests[]; +extern struct testcase_t thread_tests[]; extern struct testcase_t extorport_tests[]; extern struct testcase_t controller_event_tests[]; extern struct testcase_t logging_tests[]; @@ -1323,6 +1324,7 @@ static struct testgroup_t testgroups[] = { { "container/", container_tests }, { "util/", util_tests }, { "util/logging/", logging_tests }, + { "util/thread/", thread_tests }, { "cellfmt/", cell_format_tests }, { "cellqueue/", cell_queue_tests }, { "dir/", dir_tests }, diff --git a/src/test/test_threads.c b/src/test/test_threads.c new file mode 100644 index 0000000000..2b4c93393f --- /dev/null +++ b/src/test/test_threads.c @@ -0,0 +1,154 @@ +/* Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "orconfig.h" +#include "or.h" +#include "compat_threads.h" +#include "test.h" + +/** mutex for thread test to stop the threads hitting data at the same time. */ +static tor_mutex_t *thread_test_mutex_ = NULL; +/** mutexes for the thread test to make sure that the threads have to + * interleave somewhat. */ +static tor_mutex_t *thread_test_start1_ = NULL, + *thread_test_start2_ = NULL; +/** Shared strmap for the thread test. */ +static strmap_t *thread_test_strmap_ = NULL; +/** The name of thread1 for the thread test */ +static char *thread1_name_ = NULL; +/** The name of thread2 for the thread test */ +static char *thread2_name_ = NULL; + +static void thread_test_func_(void* _s) ATTR_NORETURN; + +/** How many iterations have the threads in the unit test run? */ +static int t1_count = 0, t2_count = 0; + +/** Helper function for threading unit tests: This function runs in a + * subthread. It grabs its own mutex (start1 or start2) to make sure that it + * should start, then it repeatedly alters _test_thread_strmap protected by + * thread_test_mutex_. */ +static void +thread_test_func_(void* _s) +{ + char *s = _s; + int i, *count; + tor_mutex_t *m; + char buf[64]; + char **cp; + if (!strcmp(s, "thread 1")) { + m = thread_test_start1_; + cp = &thread1_name_; + count = &t1_count; + } else { + m = thread_test_start2_; + cp = &thread2_name_; + count = &t2_count; + } + + tor_snprintf(buf, sizeof(buf), "%lu", tor_get_thread_id()); + *cp = tor_strdup(buf); + + tor_mutex_acquire(m); + + for (i=0; i<10000; ++i) { + tor_mutex_acquire(thread_test_mutex_); + strmap_set(thread_test_strmap_, "last to run", *cp); + ++*count; + tor_mutex_release(thread_test_mutex_); + } + tor_mutex_acquire(thread_test_mutex_); + strmap_set(thread_test_strmap_, s, *cp); + tor_mutex_release(thread_test_mutex_); + + tor_mutex_release(m); + + spawn_exit(); +} + +/** Run unit tests for threading logic. */ +static void +test_threads_basic(void *arg) +{ + char *s1 = NULL, *s2 = NULL; + int done = 0, timedout = 0; + time_t started; +#ifndef _WIN32 + struct timeval tv; + tv.tv_sec=0; + tv.tv_usec=100*1000; +#endif + (void)arg; + thread_test_mutex_ = tor_mutex_new(); + thread_test_start1_ = tor_mutex_new(); + thread_test_start2_ = tor_mutex_new(); + thread_test_strmap_ = strmap_new(); + s1 = tor_strdup("thread 1"); + s2 = tor_strdup("thread 2"); + tor_mutex_acquire(thread_test_start1_); + tor_mutex_acquire(thread_test_start2_); + spawn_func(thread_test_func_, s1); + spawn_func(thread_test_func_, s2); + tor_mutex_release(thread_test_start2_); + tor_mutex_release(thread_test_start1_); + started = time(NULL); + while (!done) { + tor_mutex_acquire(thread_test_mutex_); + strmap_assert_ok(thread_test_strmap_); + if (strmap_get(thread_test_strmap_, "thread 1") && + strmap_get(thread_test_strmap_, "thread 2")) { + done = 1; + } else if (time(NULL) > started + 150) { + timedout = done = 1; + } + tor_mutex_release(thread_test_mutex_); +#ifndef _WIN32 + /* Prevent the main thread from starving the worker threads. */ + select(0, NULL, NULL, NULL, &tv); +#endif + } + tor_mutex_acquire(thread_test_start1_); + tor_mutex_release(thread_test_start1_); + tor_mutex_acquire(thread_test_start2_); + tor_mutex_release(thread_test_start2_); + + tor_mutex_free(thread_test_mutex_); + + if (timedout) { + printf("\nTimed out: %d %d", t1_count, t2_count); + tt_assert(strmap_get(thread_test_strmap_, "thread 1")); + tt_assert(strmap_get(thread_test_strmap_, "thread 2")); + tt_assert(!timedout); + } + + /* different thread IDs. */ + tt_assert(strcmp(strmap_get(thread_test_strmap_, "thread 1"), + strmap_get(thread_test_strmap_, "thread 2"))); + tt_assert(!strcmp(strmap_get(thread_test_strmap_, "thread 1"), + strmap_get(thread_test_strmap_, "last to run")) || + !strcmp(strmap_get(thread_test_strmap_, "thread 2"), + strmap_get(thread_test_strmap_, "last to run"))); + + done: + tor_free(s1); + tor_free(s2); + tor_free(thread1_name_); + tor_free(thread2_name_); + if (thread_test_strmap_) + strmap_free(thread_test_strmap_, NULL); + if (thread_test_start1_) + tor_mutex_free(thread_test_start1_); + if (thread_test_start2_) + tor_mutex_free(thread_test_start2_); +} + +#define THREAD_TEST(name) \ + { #name, test_threads_##name, TT_FORK, NULL, NULL } + +struct testcase_t thread_tests[] = { + THREAD_TEST(basic), + END_OF_TESTCASES +}; + diff --git a/src/test/test_util.c b/src/test/test_util.c index 15470e8efa..b4ee934698 100644 --- a/src/test/test_util.c +++ b/src/test/test_util.c @@ -1607,142 +1607,6 @@ test_util_pow2(void *arg) ; } -/** mutex for thread test to stop the threads hitting data at the same time. */ -static tor_mutex_t *thread_test_mutex_ = NULL; -/** mutexes for the thread test to make sure that the threads have to - * interleave somewhat. */ -static tor_mutex_t *thread_test_start1_ = NULL, - *thread_test_start2_ = NULL; -/** Shared strmap for the thread test. */ -static strmap_t *thread_test_strmap_ = NULL; -/** The name of thread1 for the thread test */ -static char *thread1_name_ = NULL; -/** The name of thread2 for the thread test */ -static char *thread2_name_ = NULL; - -static void thread_test_func_(void* _s) ATTR_NORETURN; - -/** How many iterations have the threads in the unit test run? */ -static int t1_count = 0, t2_count = 0; - -/** Helper function for threading unit tests: This function runs in a - * subthread. It grabs its own mutex (start1 or start2) to make sure that it - * should start, then it repeatedly alters _test_thread_strmap protected by - * thread_test_mutex_. */ -static void -thread_test_func_(void* _s) -{ - char *s = _s; - int i, *count; - tor_mutex_t *m; - char buf[64]; - char **cp; - if (!strcmp(s, "thread 1")) { - m = thread_test_start1_; - cp = &thread1_name_; - count = &t1_count; - } else { - m = thread_test_start2_; - cp = &thread2_name_; - count = &t2_count; - } - - tor_snprintf(buf, sizeof(buf), "%lu", tor_get_thread_id()); - *cp = tor_strdup(buf); - - tor_mutex_acquire(m); - - for (i=0; i<10000; ++i) { - tor_mutex_acquire(thread_test_mutex_); - strmap_set(thread_test_strmap_, "last to run", *cp); - ++*count; - tor_mutex_release(thread_test_mutex_); - } - tor_mutex_acquire(thread_test_mutex_); - strmap_set(thread_test_strmap_, s, *cp); - tor_mutex_release(thread_test_mutex_); - - tor_mutex_release(m); - - spawn_exit(); -} - -/** Run unit tests for threading logic. */ -static void -test_util_threads(void *arg) -{ - char *s1 = NULL, *s2 = NULL; - int done = 0, timedout = 0; - time_t started; -#ifndef _WIN32 - struct timeval tv; - tv.tv_sec=0; - tv.tv_usec=100*1000; -#endif - (void)arg; - thread_test_mutex_ = tor_mutex_new(); - thread_test_start1_ = tor_mutex_new(); - thread_test_start2_ = tor_mutex_new(); - thread_test_strmap_ = strmap_new(); - s1 = tor_strdup("thread 1"); - s2 = tor_strdup("thread 2"); - tor_mutex_acquire(thread_test_start1_); - tor_mutex_acquire(thread_test_start2_); - spawn_func(thread_test_func_, s1); - spawn_func(thread_test_func_, s2); - tor_mutex_release(thread_test_start2_); - tor_mutex_release(thread_test_start1_); - started = time(NULL); - while (!done) { - tor_mutex_acquire(thread_test_mutex_); - strmap_assert_ok(thread_test_strmap_); - if (strmap_get(thread_test_strmap_, "thread 1") && - strmap_get(thread_test_strmap_, "thread 2")) { - done = 1; - } else if (time(NULL) > started + 150) { - timedout = done = 1; - } - tor_mutex_release(thread_test_mutex_); -#ifndef _WIN32 - /* Prevent the main thread from starving the worker threads. */ - select(0, NULL, NULL, NULL, &tv); -#endif - } - tor_mutex_acquire(thread_test_start1_); - tor_mutex_release(thread_test_start1_); - tor_mutex_acquire(thread_test_start2_); - tor_mutex_release(thread_test_start2_); - - tor_mutex_free(thread_test_mutex_); - - if (timedout) { - printf("\nTimed out: %d %d", t1_count, t2_count); - tt_assert(strmap_get(thread_test_strmap_, "thread 1")); - tt_assert(strmap_get(thread_test_strmap_, "thread 2")); - tt_assert(!timedout); - } - - /* different thread IDs. */ - tt_assert(strcmp(strmap_get(thread_test_strmap_, "thread 1"), - strmap_get(thread_test_strmap_, "thread 2"))); - tt_assert(!strcmp(strmap_get(thread_test_strmap_, "thread 1"), - strmap_get(thread_test_strmap_, "last to run")) || - !strcmp(strmap_get(thread_test_strmap_, "thread 2"), - strmap_get(thread_test_strmap_, "last to run"))); - - done: - tor_free(s1); - tor_free(s2); - tor_free(thread1_name_); - tor_free(thread2_name_); - if (thread_test_strmap_) - strmap_free(thread_test_strmap_, NULL); - if (thread_test_start1_) - tor_mutex_free(thread_test_start1_); - if (thread_test_start2_) - tor_mutex_free(thread_test_start2_); -} - /** Run unit tests for compression functions */ static void test_util_gzip(void *arg) @@ -4927,7 +4791,6 @@ struct testcase_t util_tests[] = { UTIL_LEGACY(memarea), UTIL_LEGACY(control_formats), UTIL_LEGACY(mmap), - UTIL_LEGACY(threads), UTIL_LEGACY(sscanf), UTIL_LEGACY(format_time_interval), UTIL_LEGACY(path_is_relative), -- cgit v1.2.3-54-g00ecf From 7a63005220938b30df41b51334942d7d79c14cf9 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 27 Sep 2013 23:15:53 -0400 Subject: Basic unit test for condition variables. --- configure.ac | 2 +- src/test/test.c | 18 ++++++ src/test/test.h | 2 + src/test/test_crypto.c | 36 ++++-------- src/test/test_threads.c | 150 +++++++++++++++++++++++++++++++++++++++++++++++- src/test/test_util.c | 21 +------ 6 files changed, 183 insertions(+), 46 deletions(-) (limited to 'src/test') diff --git a/configure.ac b/configure.ac index 69a266a717..929b701594 100644 --- a/configure.ac +++ b/configure.ac @@ -437,7 +437,7 @@ AC_CHECK_FUNCS( sysconf \ sysctl \ uname \ - usleep \ + usleep \ vasprintf \ _vscprintf ) diff --git a/src/test/test.c b/src/test/test.c index edc28cd2d4..9171306d18 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -1258,6 +1258,24 @@ test_stats(void *arg) tor_free(s); } + +static void * +passthrough_test_setup(const struct testcase_t *testcase) +{ + return testcase->setup_data; +} +static int +passthrough_test_cleanup(const struct testcase_t *testcase, void *ptr) +{ + (void)testcase; + (void)ptr; + return 1; +} + +const struct testcase_setup_t passthrough_setup = { + passthrough_test_setup, passthrough_test_cleanup +}; + #define ENT(name) \ { #name, test_ ## name , 0, NULL, NULL } #define FORK(name) \ diff --git a/src/test/test.h b/src/test/test.h index 48037a5ba3..b8057c59bf 100644 --- a/src/test/test.h +++ b/src/test/test.h @@ -158,5 +158,7 @@ crypto_pk_t *pk_generate(int idx); #define NS_MOCK(name) MOCK(name, NS(name)) #define NS_UNMOCK(name) UNMOCK(name) +extern const struct testcase_setup_t passthrough_setup; + #endif diff --git a/src/test/test_crypto.c b/src/test/test_crypto.c index 4a5a12c50a..8426c715a4 100644 --- a/src/test/test_crypto.c +++ b/src/test/test_crypto.c @@ -1975,30 +1975,14 @@ test_crypto_siphash(void *arg) ; } -static void * -pass_data_setup_fn(const struct testcase_t *testcase) -{ - return testcase->setup_data; -} -static int -pass_data_cleanup_fn(const struct testcase_t *testcase, void *ptr) -{ - (void)ptr; - (void)testcase; - return 1; -} -static const struct testcase_setup_t pass_data = { - pass_data_setup_fn, pass_data_cleanup_fn -}; - #define CRYPTO_LEGACY(name) \ { #name, test_crypto_ ## name , 0, NULL, NULL } struct testcase_t crypto_tests[] = { CRYPTO_LEGACY(formats), CRYPTO_LEGACY(rng), - { "aes_AES", test_crypto_aes, TT_FORK, &pass_data, (void*)"aes" }, - { "aes_EVP", test_crypto_aes, TT_FORK, &pass_data, (void*)"evp" }, + { "aes_AES", test_crypto_aes, TT_FORK, &passthrough_setup, (void*)"aes" }, + { "aes_EVP", test_crypto_aes, TT_FORK, &passthrough_setup, (void*)"evp" }, CRYPTO_LEGACY(sha), CRYPTO_LEGACY(pk), { "pk_fingerprints", test_crypto_pk_fingerprints, TT_FORK, NULL, NULL }, @@ -2006,23 +1990,25 @@ struct testcase_t crypto_tests[] = { CRYPTO_LEGACY(dh), CRYPTO_LEGACY(s2k_rfc2440), #ifdef HAVE_LIBSCRYPT_H - { "s2k_scrypt", test_crypto_s2k_general, 0, &pass_data, + { "s2k_scrypt", test_crypto_s2k_general, 0, &passthrough_setup, (void*)"scrypt" }, - { "s2k_scrypt_low", test_crypto_s2k_general, 0, &pass_data, + { "s2k_scrypt_low", test_crypto_s2k_general, 0, &passthrough_setup, (void*)"scrypt-low" }, #endif - { "s2k_pbkdf2", test_crypto_s2k_general, 0, &pass_data, + { "s2k_pbkdf2", test_crypto_s2k_general, 0, &passthrough_setup, (void*)"pbkdf2" }, - { "s2k_rfc2440_general", test_crypto_s2k_general, 0, &pass_data, + { "s2k_rfc2440_general", test_crypto_s2k_general, 0, &passthrough_setup, (void*)"rfc2440" }, - { "s2k_rfc2440_legacy", test_crypto_s2k_general, 0, &pass_data, + { "s2k_rfc2440_legacy", test_crypto_s2k_general, 0, &passthrough_setup, (void*)"rfc2440-legacy" }, { "s2k_errors", test_crypto_s2k_errors, 0, NULL, NULL }, { "scrypt_vectors", test_crypto_scrypt_vectors, 0, NULL, NULL }, { "pbkdf2_vectors", test_crypto_pbkdf2_vectors, 0, NULL, NULL }, { "pwbox", test_crypto_pwbox, 0, NULL, NULL }, - { "aes_iv_AES", test_crypto_aes_iv, TT_FORK, &pass_data, (void*)"aes" }, - { "aes_iv_EVP", test_crypto_aes_iv, TT_FORK, &pass_data, (void*)"evp" }, + { "aes_iv_AES", test_crypto_aes_iv, TT_FORK, &passthrough_setup, + (void*)"aes" }, + { "aes_iv_EVP", test_crypto_aes_iv, TT_FORK, &passthrough_setup, + (void*)"evp" }, CRYPTO_LEGACY(base32_decode), { "kdf_TAP", test_crypto_kdf_TAP, 0, NULL, NULL }, { "hkdf_sha256", test_crypto_hkdf_sha256, 0, NULL, NULL }, diff --git a/src/test/test_threads.c b/src/test/test_threads.c index 2b4c93393f..d2a61a17d0 100644 --- a/src/test/test_threads.c +++ b/src/test/test_threads.c @@ -144,11 +144,159 @@ test_threads_basic(void *arg) tor_mutex_free(thread_test_start2_); } -#define THREAD_TEST(name) \ +typedef struct cv_testinfo_s { + tor_cond_t *cond; + tor_mutex_t *mutex; + int value; + int addend; + int shutdown; + int n_shutdown; + int n_wakeups; + int n_timeouts; + int n_threads; + const struct timeval *tv; +} cv_testinfo_t; + +static cv_testinfo_t * +cv_testinfo_new(void) +{ + cv_testinfo_t *i = tor_malloc_zero(sizeof(*i)); + i->cond = tor_cond_new(); + i->mutex = tor_mutex_new_nonrecursive(); + return i; +} + +static void +cv_testinfo_free(cv_testinfo_t *i) +{ + if (!i) + return; + tor_cond_free(i->cond); + tor_mutex_free(i->mutex); + tor_free(i); +} + +static void cv_test_thr_fn_(void *arg) ATTR_NORETURN; + +static void +cv_test_thr_fn_(void *arg) +{ + cv_testinfo_t *i = arg; + int tid, r; + + tor_mutex_acquire(i->mutex); + tid = i->n_threads++; + tor_mutex_release(i->mutex); + (void) tid; + + tor_mutex_acquire(i->mutex); + while (1) { + if (i->addend) { + i->value += i->addend; + i->addend = 0; + } + + if (i->shutdown) { + ++i->n_shutdown; + i->shutdown = 0; + tor_mutex_release(i->mutex); + spawn_exit(); + } + r = tor_cond_wait(i->cond, i->mutex, i->tv); + ++i->n_wakeups; + if (r == 1) { + ++i->n_timeouts; + tor_mutex_release(i->mutex); + spawn_exit(); + } + } +} + +static void +test_threads_conditionvar(void *arg) +{ + cv_testinfo_t *ti=NULL; + const struct timeval msec100 = { 0, 100*1000 }; + const int timeout = !strcmp(arg, "tv"); + + ti = cv_testinfo_new(); + if (timeout) { + ti->tv = &msec100; + } + spawn_func(cv_test_thr_fn_, ti); + spawn_func(cv_test_thr_fn_, ti); + spawn_func(cv_test_thr_fn_, ti); + spawn_func(cv_test_thr_fn_, ti); + + tor_mutex_acquire(ti->mutex); + ti->addend = 7; + ti->shutdown = 1; + tor_cond_signal_one(ti->cond); + tor_mutex_release(ti->mutex); + +#define SPIN() \ + while (1) { \ + tor_mutex_acquire(ti->mutex); \ + if (ti->addend == 0) { \ + break; \ + } \ + tor_mutex_release(ti->mutex); \ + } + + SPIN(); + + ti->addend = 30; + ti->shutdown = 1; + tor_cond_signal_all(ti->cond); + tor_mutex_release(ti->mutex); + SPIN(); + + ti->addend = 1000; + if (! timeout) ti->shutdown = 1; + tor_cond_signal_one(ti->cond); + tor_mutex_release(ti->mutex); + SPIN(); + ti->addend = 300; + if (! timeout) ti->shutdown = 1; + tor_cond_signal_all(ti->cond); + tor_mutex_release(ti->mutex); + + SPIN(); + tor_mutex_release(ti->mutex); + + tt_int_op(ti->value, ==, 1337); + if (!timeout) { + tt_int_op(ti->n_shutdown, ==, 4); + } else { +#ifdef _WIN32 + Sleep(500); /* msec */ +#elif defined(HAVE_USLEEP) + usleep(500*1000); /* usec */ +#else + { + struct tv = { 0, 500*1000 }; + select(0, NULL, NULL, NULL, &tv); + } +#endif + tor_mutex_acquire(ti->mutex); + tt_int_op(ti->n_shutdown, ==, 2); + tt_int_op(ti->n_timeouts, ==, 2); + tor_mutex_release(ti->mutex); + } + + done: + cv_testinfo_free(ti); +} + +#define THREAD_TEST(name) \ { #name, test_threads_##name, TT_FORK, NULL, NULL } struct testcase_t thread_tests[] = { THREAD_TEST(basic), + { "conditionvar", test_threads_conditionvar, TT_FORK, + &passthrough_setup, (void*)"no-tv" }, + { "conditionvar_timeout", test_threads_conditionvar, TT_FORK, + &passthrough_setup, (void*)"tv" }, END_OF_TESTCASES }; diff --git a/src/test/test_util.c b/src/test/test_util.c index b4ee934698..97cf3870f4 100644 --- a/src/test/test_util.c +++ b/src/test/test_util.c @@ -4646,23 +4646,6 @@ test_util_socket(void *arg) tor_close_socket(fd4); } -static void * -socketpair_test_setup(const struct testcase_t *testcase) -{ - return testcase->setup_data; -} -static int -socketpair_test_cleanup(const struct testcase_t *testcase, void *ptr) -{ - (void)testcase; - (void)ptr; - return 1; -} - -static const struct testcase_setup_t socketpair_setup = { - socketpair_test_setup, socketpair_test_cleanup -}; - /* Test for socketpair and ersatz_socketpair(). We test them both, since * the latter is a tolerably good way to exersize tor_accept_socket(). */ static void @@ -4837,10 +4820,10 @@ struct testcase_t util_tests[] = { UTIL_TEST(mathlog, 0), UTIL_TEST(weak_random, 0), UTIL_TEST(socket, TT_FORK), - { "socketpair", test_util_socketpair, TT_FORK, &socketpair_setup, + { "socketpair", test_util_socketpair, TT_FORK, &passthrough_setup, (void*)"0" }, { "socketpair_ersatz", test_util_socketpair, TT_FORK, - &socketpair_setup, (void*)"1" }, + &passthrough_setup, (void*)"1" }, UTIL_TEST(max_mem, 0), UTIL_TEST(hostname_validation, 0), UTIL_TEST(ipv4_validation, 0), -- cgit v1.2.3-54-g00ecf From 81354b081b7bb9deabd6c53e48623190b01aab1c Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 27 Sep 2013 23:20:22 -0400 Subject: Add unit test for thread IDs. --- src/test/test_threads.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) (limited to 'src/test') diff --git a/src/test/test_threads.c b/src/test/test_threads.c index d2a61a17d0..2bc24e1edc 100644 --- a/src/test/test_threads.c +++ b/src/test/test_threads.c @@ -21,6 +21,10 @@ static char *thread1_name_ = NULL; /** The name of thread2 for the thread test */ static char *thread2_name_ = NULL; +static int thread_fns_failed = 0; + +static unsigned long thread_fn_tid1, thread_fn_tid2; + static void thread_test_func_(void* _s) ATTR_NORETURN; /** How many iterations have the threads in the unit test run? */ @@ -42,10 +46,12 @@ thread_test_func_(void* _s) m = thread_test_start1_; cp = &thread1_name_; count = &t1_count; + thread_fn_tid1 = tor_get_thread_id(); } else { m = thread_test_start2_; cp = &thread2_name_; count = &t2_count; + thread_fn_tid2 = tor_get_thread_id(); } tor_snprintf(buf, sizeof(buf), "%lu", tor_get_thread_id()); @@ -61,6 +67,8 @@ thread_test_func_(void* _s) } tor_mutex_acquire(thread_test_mutex_); strmap_set(thread_test_strmap_, s, *cp); + if (in_main_thread()) + ++thread_fns_failed; tor_mutex_release(thread_test_mutex_); tor_mutex_release(m); @@ -80,7 +88,10 @@ test_threads_basic(void *arg) tv.tv_sec=0; tv.tv_usec=100*1000; #endif - (void)arg; + (void) arg; + + set_main_thread(); + thread_test_mutex_ = tor_mutex_new(); thread_test_start1_ = tor_mutex_new(); thread_test_start2_ = tor_mutex_new(); @@ -131,6 +142,9 @@ test_threads_basic(void *arg) !strcmp(strmap_get(thread_test_strmap_, "thread 2"), strmap_get(thread_test_strmap_, "last to run"))); + tt_int_op(thread_fns_failed, ==, 0); + tt_int_op(thread_fn_tid1, !=, thread_fn_tid2); + done: tor_free(s1); tor_free(s2); @@ -188,7 +202,7 @@ cv_test_thr_fn_(void *arg) tid = i->n_threads++; tor_mutex_release(i->mutex); (void) tid; - + tor_mutex_acquire(i->mutex); while (1) { if (i->addend) { @@ -299,4 +313,3 @@ struct testcase_t thread_tests[] = { &passthrough_setup, (void*)"tv" }, END_OF_TESTCASES }; - -- cgit v1.2.3-54-g00ecf From ebbc177005eaf9bd949daba657b2c703a7bd1769 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Sat, 28 Sep 2013 00:09:20 -0400 Subject: Add shutdown and broadcast support to test_workqueue. --- src/test/test_workqueue.c | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) (limited to 'src/test') diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c index 7ef54ef22b..cbf9d81950 100644 --- a/src/test/test_workqueue.c +++ b/src/test/test_workqueue.c @@ -36,6 +36,7 @@ typedef struct state_s { int n_handled; crypto_pk_t *rsa; curve25519_secret_key_t ecdh; + int is_shutdown; } state_t; typedef struct rsa_work_s { @@ -94,18 +95,15 @@ workqueue_do_rsa(void *state, void *work) return WQ_RPL_REPLY; } -#if 0 static int workqueue_do_shutdown(void *state, void *work) { (void)state; (void)work; - (void)cmd; crypto_pk_free(((state_t*)state)->rsa); tor_free(state); return WQ_RPL_SHUTDOWN; } -#endif static int workqueue_do_ecdh(void *state, void *work) @@ -197,6 +195,20 @@ add_work(threadpool_t *tp) } } +static int shutting_down = 0; +static int n_shutdowns_done = 0; + +static void +shutdown_reply(void *arg) +{ + (void)arg; + tor_assert(shutting_down); + ++n_shutdowns_done; + if (n_shutdowns_done == opt_n_threads) { + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + } +} + static void replysock_readable_cb(tor_socket_t sock, short what, void *arg) { @@ -236,8 +248,9 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg) } } - if (n_received == n_sent && n_sent >= opt_n_items) { - tor_event_base_loopexit(tor_libevent_get_base(), NULL); + if (shutting_down == 0 && n_received == n_sent && n_sent >= opt_n_items) { + shutting_down = 1; + threadpool_queue_for_all(tp, NULL, workqueue_do_shutdown, shutdown_reply, NULL); } } @@ -345,7 +358,8 @@ main(int argc, char **argv) event_base_loop(tor_libevent_get_base(), 0); - if (n_sent != opt_n_items || n_received != n_sent) { + if (n_sent != opt_n_items || n_received != n_sent || + n_shutdowns_done != opt_n_threads) { puts("FAIL"); return 1; } else { -- cgit v1.2.3-54-g00ecf From e5f8c772f4c468a20da8b9176c2b276ac76bbe78 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Sat, 28 Sep 2013 00:33:10 -0400 Subject: Test and fix workqueue_entry_cancel(). --- src/common/workqueue.c | 12 +++++--- src/common/workqueue.h | 2 +- src/test/test_workqueue.c | 77 ++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 71 insertions(+), 20 deletions(-) (limited to 'src/test') diff --git a/src/common/workqueue.c b/src/common/workqueue.c index 9293e1f9f0..44cf98d0dc 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -119,27 +119,29 @@ workqueue_entry_free(workqueue_entry_t *ent) * executed in the main thread; that will cause undefined behavior (probably, * a crash). * - * If the work is cancelled, this function return 1. It is the caller's - * responsibility to free any storage in the work function's arguments. + * If the work is cancelled, this function return the argument passed to the + * work function. It is the caller's responsibility to free this storage. * * This function will have no effect if the worker thread has already executed - * or begun to execute the work item. In that case, it will return 0. + * or begun to execute the work item. In that case, it will return NULL. */ -int +void * workqueue_entry_cancel(workqueue_entry_t *ent) { int cancelled = 0; + void *result = NULL; tor_mutex_acquire(&ent->on_thread->lock); if (ent->pending) { TOR_TAILQ_REMOVE(&ent->on_thread->work, ent, next_work); cancelled = 1; + result = ent->arg; } tor_mutex_release(&ent->on_thread->lock); if (cancelled) { tor_free(ent); } - return cancelled; + return result; } /** diff --git a/src/common/workqueue.h b/src/common/workqueue.h index 5a6cd80fb0..ec1f7c9000 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -32,7 +32,7 @@ int threadpool_queue_for_all(threadpool_t *pool, int (*fn)(void *, void *), void (*reply_fn)(void *), void *arg); -int workqueue_entry_cancel(workqueue_entry_t *pending_work); +void *workqueue_entry_cancel(workqueue_entry_t *pending_work); threadpool_t *threadpool_new(int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void*), diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c index cbf9d81950..6de6f03c33 100644 --- a/src/test/test_workqueue.c +++ b/src/test/test_workqueue.c @@ -23,6 +23,7 @@ static int opt_n_threads = 8; static int opt_n_items = 10000; static int opt_n_inflight = 1000; static int opt_n_lowwater = 250; +static int opt_n_cancel = 0; static int opt_ratio_rsa = 5; #ifdef TRACK_RESPONSES @@ -172,27 +173,68 @@ handle_reply(void *arg) ++n_received; } -static int +static workqueue_entry_t * add_work(threadpool_t *tp) { int add_rsa = opt_ratio_rsa == 0 || tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0; + if (add_rsa) { rsa_work_t *w = tor_malloc_zero(sizeof(*w)); w->serial = n_sent++; crypto_rand((char*)w->msg, 20); w->msglen = 20; ++rsa_sent; - return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL; + return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w); } else { ecdh_work_t *w = tor_malloc_zero(sizeof(*w)); w->serial = n_sent++; /* Not strictly right, but this is just for benchmarks. */ crypto_rand((char*)w->u.pk.public_key, 32); ++ecdh_sent; - return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL; + return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w); + } +} + +static int n_failed_cancel = 0; +static int n_successful_cancel = 0; + +static int +add_n_work_items(threadpool_t *tp, int n) +{ + int n_queued = 0; + int n_try_cancel = 0, i; + workqueue_entry_t **to_cancel; + workqueue_entry_t *ent; + + to_cancel = tor_malloc(sizeof(workqueue_entry_t*) * opt_n_cancel); + + while (n_queued++ < n) { + ent = add_work(tp); + if (! ent) { + puts("Couldn't add work."); + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + return -1; + } + if (n_try_cancel < opt_n_cancel && + tor_weak_random_range(&weak_rng, n) < opt_n_cancel) { + to_cancel[n_try_cancel++] = ent; + } + } + + for (i = 0; i < n_try_cancel; ++i) { + void *work = workqueue_entry_cancel(to_cancel[i]); + if (! work) { + n_failed_cancel++; + } else { + n_successful_cancel++; + tor_free(work); + } } + + tor_free(to_cancel); + return 0; } static int shutting_down = 0; @@ -223,8 +265,13 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg) if (old_r == n_received) return; - if (opt_verbose) - printf("%d / %d\n", n_received, n_sent); + if (opt_verbose) { + printf("%d / %d", n_received, n_sent); + if (opt_n_cancel) + printf(" (%d cancelled, %d uncancellable)", + n_successful_cancel, n_failed_cancel); + puts(""); + } #ifdef TRACK_RESPONSES tor_mutex_acquire(&bitmap_mutex); for (i = 0; i < opt_n_items; ++i) { @@ -239,16 +286,14 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg) tor_mutex_release(&bitmap_mutex); #endif - if (n_sent - n_received < opt_n_lowwater) { - while (n_sent < n_received + opt_n_inflight && n_sent < opt_n_items) { - if (! add_work(tp)) { - puts("Couldn't add work."); - tor_event_base_loopexit(tor_libevent_get_base(), NULL); - } - } + if (n_sent - (n_received+n_successful_cancel) < opt_n_lowwater) { + int n_to_send = n_received + opt_n_inflight - n_sent; + if (n_to_send > opt_n_items - n_sent) + n_to_send = opt_n_items - n_sent; + add_n_work_items(tp, n_to_send); } - if (shutting_down == 0 && n_received == n_sent && n_sent >= opt_n_items) { + if (shutting_down == 0 && n_received+n_successful_cancel == n_sent && n_sent >= opt_n_items) { shutting_down = 1; threadpool_queue_for_all(tp, NULL, workqueue_do_shutdown, shutdown_reply, NULL); } @@ -263,6 +308,7 @@ help(void) " -T Use this many threads\n" " -I Have no more than this many requests queued at once\n" " -L Add items whenever fewer than this many are pending\n" + " -C Try to cancel N items of every batch that we add\n" " -R Make one out of this many items be a slow (RSA) one\n" " --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n" " Disable one of the alert_socket backends."); @@ -291,6 +337,8 @@ main(int argc, char **argv) opt_n_lowwater = atoi(argv[++i]); } else if (!strcmp(argv[i], "-R") && i+1 opt_n_inflight || opt_ratio_rsa < 0) { help(); return 1; @@ -358,7 +407,7 @@ main(int argc, char **argv) event_base_loop(tor_libevent_get_base(), 0); - if (n_sent != opt_n_items || n_received != n_sent || + if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent || n_shutdowns_done != opt_n_threads) { puts("FAIL"); return 1; -- cgit v1.2.3-54-g00ecf From cc6529e9bb7d7e01a25b5632d6d6c2424c6fc2b4 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Sat, 28 Sep 2013 00:52:28 -0400 Subject: Fix check-spaces --- src/common/compat_pthreads.c | 1 + src/common/compat_threads.c | 1 + src/common/compat_threads.h | 2 +- src/common/compat_winthreads.c | 3 +-- src/common/workqueue.h | 1 + src/test/test.c | 1 - src/test/test_threads.c | 1 + src/test/test_workqueue.c | 8 ++++++-- 8 files changed, 12 insertions(+), 6 deletions(-) (limited to 'src/test') diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c index 69f7bac9c9..848bfe0973 100644 --- a/src/common/compat_pthreads.c +++ b/src/common/compat_pthreads.c @@ -240,3 +240,4 @@ tor_threads_init(void) set_main_thread(); } } + diff --git a/src/common/compat_threads.c b/src/common/compat_threads.c index f018475e18..79440070a2 100644 --- a/src/common/compat_threads.c +++ b/src/common/compat_threads.c @@ -253,3 +253,4 @@ alert_sockets_close(alert_sockets_t *socks) } socks->read_fd = socks->write_fd = -1; } + diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h index a5db72d45f..acf3083f37 100644 --- a/src/common/compat_threads.h +++ b/src/common/compat_threads.h @@ -44,7 +44,6 @@ typedef struct tor_mutex_t { #endif } tor_mutex_t; - tor_mutex_t *tor_mutex_new(void); tor_mutex_t *tor_mutex_new_nonrecursive(void); void tor_mutex_init(tor_mutex_t *m); @@ -113,3 +112,4 @@ int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags); void alert_sockets_close(alert_sockets_t *socks); #endif + diff --git a/src/common/compat_winthreads.c b/src/common/compat_winthreads.c index 4820eb3481..71b994c4e4 100644 --- a/src/common/compat_winthreads.c +++ b/src/common/compat_winthreads.c @@ -33,7 +33,6 @@ spawn_func(void (*func)(void *), void *data) return 0; } - /** End the current thread/process. */ void @@ -46,7 +45,6 @@ spawn_exit(void) _exit(0); } - void tor_mutex_init(tor_mutex_t *m) { @@ -195,3 +193,4 @@ tor_threads_init(void) { set_main_thread(); } + diff --git a/src/common/workqueue.h b/src/common/workqueue.h index ec1f7c9000..aa8620ddb7 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -45,3 +45,4 @@ tor_socket_t replyqueue_get_socket(replyqueue_t *rq); void replyqueue_process(replyqueue_t *queue); #endif + diff --git a/src/test/test.c b/src/test/test.c index 9171306d18..2c2328c197 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -1258,7 +1258,6 @@ test_stats(void *arg) tor_free(s); } - static void * passthrough_test_setup(const struct testcase_t *testcase) { diff --git a/src/test/test_threads.c b/src/test/test_threads.c index 2bc24e1edc..c0293048fe 100644 --- a/src/test/test_threads.c +++ b/src/test/test_threads.c @@ -313,3 +313,4 @@ struct testcase_t thread_tests[] = { &passthrough_setup, (void*)"tv" }, END_OF_TESTCASES }; + diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c index 6de6f03c33..410f43cce4 100644 --- a/src/test/test_workqueue.c +++ b/src/test/test_workqueue.c @@ -293,9 +293,12 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg) add_n_work_items(tp, n_to_send); } - if (shutting_down == 0 && n_received+n_successful_cancel == n_sent && n_sent >= opt_n_items) { + if (shutting_down == 0 && + n_received+n_successful_cancel == n_sent && + n_sent >= opt_n_items) { shutting_down = 1; - threadpool_queue_for_all(tp, NULL, workqueue_do_shutdown, shutdown_reply, NULL); + threadpool_queue_for_all(tp, NULL, + workqueue_do_shutdown, shutdown_reply, NULL); } } @@ -416,3 +419,4 @@ main(int argc, char **argv) return 0; } } + -- cgit v1.2.3-54-g00ecf From a52e549124adb09ad0b49b7d2b5b3fb79bfe7aeb Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 14 Jan 2015 13:29:58 -0500 Subject: Update workqueue implementation to use a single queue for the work Previously I used one queue per worker; now I use one queue for everyone. The "broadcast" code is gone, replaced with an idempotent 'update' operation. --- src/common/compat_pthreads.c | 1 + src/common/workqueue.c | 206 ++++++++++++++++++++++++++----------------- src/common/workqueue.h | 10 +-- src/or/cpuworker.c | 15 ++-- src/test/test_workqueue.c | 23 ++--- 5 files changed, 141 insertions(+), 114 deletions(-) (limited to 'src/test') diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c index 848bfe0973..f43480539f 100644 --- a/src/common/compat_pthreads.c +++ b/src/common/compat_pthreads.c @@ -5,6 +5,7 @@ #include "orconfig.h" #include +#include #include "compat.h" #include "torlog.h" diff --git a/src/common/workqueue.c b/src/common/workqueue.c index 7fa8967580..5ba29e3b26 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -1,4 +1,4 @@ -/* Copyright (c) 2013, The Tor Project, Inc. */ +/* Copyright (c) 2013-2015, The Tor Project, Inc. */ /* See LICENSE for licensing information */ #include "orconfig.h" @@ -13,8 +13,23 @@ struct threadpool_s { /** An array of pointers to workerthread_t: one for each running worker * thread. */ struct workerthread_s **threads; - /** Index of the next thread that we'll give work to.*/ - int next_for_work; + + /** Condition variable that we wait on when we have no work, and which + * gets signaled when our queue becomes nonempty. */ + tor_cond_t condition; + /** Queue of pending work that we have to do. */ + TOR_TAILQ_HEAD(, workqueue_entry_s) work; + + /** The current 'update generation' of the threadpool. Any thread that is + * at an earlier generation needs to run the update function. */ + unsigned generation; + + /** Function that should be run for updates on each thread. */ + int (*update_fn)(void *, void *); + /** Function to free update arguments if they can't be run. */ + void (*free_update_arg_fn)(void *); + /** Array of n_threads update arguments. */ + void **update_args; /** Number of elements in threads. */ int n_threads; @@ -34,10 +49,10 @@ struct workqueue_entry_s { /** The next workqueue_entry_t that's pending on the same thread or * reply queue. */ TOR_TAILQ_ENTRY(workqueue_entry_s) next_work; - /** The thread to which this workqueue_entry_t was assigned. This field + /** The threadpool to which this workqueue_entry_t was assigned. This field * is set when the workqueue_entry_t is created, and won't be cleared until * after it's handled in the main thread. */ - struct workerthread_s *on_thread; + struct threadpool_s *on_pool; /** True iff this entry is waiting for a worker to start processing it. */ uint8_t pending; /** Function to run in the worker thread. */ @@ -62,13 +77,10 @@ struct replyqueue_s { * contention, each gets its own queue. This breaks the guarantee that that * queued work will get executed strictly in order. */ typedef struct workerthread_s { - /** Lock to protect all fields of this thread and its queue. */ - tor_mutex_t lock; - /** Condition variable that we wait on when we have no work, and which - * gets signaled when our queue becomes nonempty. */ - tor_cond_t condition; - /** Queue of pending work that we have to do. */ - TOR_TAILQ_HEAD(, workqueue_entry_s) work; + /** Which thread it this? In range 0..in_pool->n_threads-1 */ + int index; + /** The pool this thread is a part of. */ + struct threadpool_s *in_pool; /** True iff this thread is currently in its loop. (Not currently used.) */ unsigned is_running; /** True iff this thread has crashed or is shut down for some reason. (Not @@ -81,6 +93,8 @@ typedef struct workerthread_s { void *state; /** Reply queue to which we pass our results. */ replyqueue_t *reply_queue; + /** The current update generation of this thread */ + unsigned generation; } workerthread_t; static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); @@ -132,13 +146,13 @@ workqueue_entry_cancel(workqueue_entry_t *ent) { int cancelled = 0; void *result = NULL; - tor_mutex_acquire(&ent->on_thread->lock); + tor_mutex_acquire(&ent->on_pool->lock); if (ent->pending) { - TOR_TAILQ_REMOVE(&ent->on_thread->work, ent, next_work); + TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work); cancelled = 1; result = ent->arg; } - tor_mutex_release(&ent->on_thread->lock); + tor_mutex_release(&ent->on_pool->lock); if (cancelled) { tor_free(ent); @@ -146,6 +160,16 @@ workqueue_entry_cancel(workqueue_entry_t *ent) return result; } +/**DOCDOC + + must hold lock */ +static int +worker_thread_has_work(workerthread_t *thread) +{ + return !TOR_TAILQ_EMPTY(&thread->in_pool->work) || + thread->generation != thread->in_pool->generation; +} + /** * Main function for the worker thread. */ @@ -153,20 +177,39 @@ static void worker_thread_main(void *thread_) { workerthread_t *thread = thread_; + threadpool_t *pool = thread->in_pool; workqueue_entry_t *work; int result; - tor_mutex_acquire(&thread->lock); thread->is_running = 1; + + tor_mutex_acquire(&pool->lock); while (1) { /* lock must be held at this point. */ - while (!TOR_TAILQ_EMPTY(&thread->work)) { + while (worker_thread_has_work(thread)) { /* lock must be held at this point. */ - - work = TOR_TAILQ_FIRST(&thread->work); - TOR_TAILQ_REMOVE(&thread->work, work, next_work); + if (thread->in_pool->generation != thread->generation) { + void *arg = thread->in_pool->update_args[thread->index]; + thread->in_pool->update_args[thread->index] = NULL; + int (*update_fn)(void*,void*) = thread->in_pool->update_fn; + thread->generation = thread->in_pool->generation; + tor_mutex_release(&pool->lock); + + int r = update_fn(thread->state, arg); + + if (r < 0) { + thread->is_running = 0; + thread->is_shut_down = 1; + return; + } + + tor_mutex_acquire(&pool->lock); + continue; + } + work = TOR_TAILQ_FIRST(&pool->work); + TOR_TAILQ_REMOVE(&pool->work, work, next_work); work->pending = 0; - tor_mutex_release(&thread->lock); + tor_mutex_release(&pool->lock); /* We run the work function without holding the thread lock. This * is the main thread's first opportunity to give us more work. */ @@ -175,25 +218,23 @@ worker_thread_main(void *thread_) /* Queue the reply for the main thread. */ queue_reply(thread->reply_queue, work); - tor_mutex_acquire(&thread->lock); /* We may need to exit the thread. */ if (result >= WQ_RPL_ERROR) { thread->is_running = 0; thread->is_shut_down = 1; - tor_mutex_release(&thread->lock); return; } + tor_mutex_acquire(&pool->lock); } /* At this point the lock is held, and there is no work in this thread's * queue. */ - /* TODO: Try work-stealing. */ /* TODO: support an idle-function */ /* Okay. Now, wait till somebody has work for us. */ /* XXXX we could just omit waiting and instead */ thread->waiting = 1; - if (tor_cond_wait(&thread->condition, &thread->lock, NULL) < 0) { + if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) { /* XXXX ERROR */ } thread->waiting = 0; @@ -221,14 +262,12 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work) /** Allocate and start a new worker thread to use state object state, * and send responses to replyqueue. */ static workerthread_t * -workerthread_new(void *state, replyqueue_t *replyqueue) +workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) { workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); - tor_mutex_init_for_cond(&thr->lock); - tor_cond_init(&thr->condition); - TOR_TAILQ_INIT(&thr->work); thr->state = state; thr->reply_queue = replyqueue; + thr->in_pool = pool; if (spawn_func(worker_thread_main, thr) < 0) { log_err(LD_GENERAL, "Can't launch worker thread."); @@ -238,30 +277,6 @@ workerthread_new(void *state, replyqueue_t *replyqueue) return thr; } -/** - * Add an item of work to a single worker thread. See threadpool_queue_work(*) - * for arguments. - */ -static workqueue_entry_t * -workerthread_queue_work(workerthread_t *worker, - int (*fn)(void *, void *), - void (*reply_fn)(void *), - void *arg) -{ - workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); - - tor_mutex_acquire(&worker->lock); - ent->on_thread = worker; - ent->pending = 1; - TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work); - - if (worker->waiting) /* XXXX inside or outside of lock?? */ - tor_cond_signal_one(&worker->condition); - - tor_mutex_release(&worker->lock); - return ent; -} - /** * Queue an item of work for a thread in a thread pool. The function * fn will be run in a worker thread, and will receive as arguments the @@ -285,20 +300,19 @@ threadpool_queue_work(threadpool_t *pool, void (*reply_fn)(void *), void *arg) { - workerthread_t *worker; + workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); + ent->on_pool = pool; + ent->pending = 1; tor_mutex_acquire(&pool->lock); - /* Pick the next thread in random-access order. */ - worker = pool->threads[pool->next_for_work++]; - if (!worker) { - tor_mutex_release(&pool->lock); - return NULL; - } - if (pool->next_for_work >= pool->n_threads) - pool->next_for_work = 0; + + TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work); + tor_mutex_release(&pool->lock); - return workerthread_queue_work(worker, fn, reply_fn, arg); + tor_cond_signal_one(&pool->condition); + + return ent; } /** @@ -309,30 +323,56 @@ threadpool_queue_work(threadpool_t *pool, * arg value is passed to dup_fn once per each thread to * make a copy of it. * + * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update + * will be run. If a new update is scheduled before the old update finishes + * running, then the new will replace the old in any threads that haven't run + * it yet. + * * Return 0 on success, -1 on failure. */ int -threadpool_queue_for_all(threadpool_t *pool, +threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), int (*fn)(void *, void *), - void (*reply_fn)(void *), + void (*free_fn)(void *), void *arg) { - int i = 0; - workerthread_t *worker; - void *arg_copy; - while (1) { - tor_mutex_acquire(&pool->lock); - if (i >= pool->n_threads) { - tor_mutex_release(&pool->lock); - return 0; - } - worker = pool->threads[i++]; - tor_mutex_release(&pool->lock); + int i, n_threads; + void (*old_args_free_fn)(void *arg); + void **old_args; + void **new_args; + + tor_mutex_acquire(&pool->lock); + n_threads = pool->n_threads; + old_args = pool->update_args; + old_args_free_fn = pool->free_update_arg_fn; + + new_args = tor_calloc(n_threads, sizeof(void*)); + for (i = 0; i < n_threads; ++i) { + if (dup_fn) + new_args[i] = dup_fn(arg); + else + new_args[i] = arg; + } + + pool->update_args = new_args; + pool->free_update_arg_fn = free_fn; + pool->update_fn = fn; + ++pool->generation; - arg_copy = dup_fn ? dup_fn(arg) : arg; - /* CHECK*/ workerthread_queue_work(worker, fn, reply_fn, arg_copy); + tor_mutex_release(&pool->lock); + + tor_cond_signal_all(&pool->condition); + + if (old_args) { + for (i = 0; i < n_threads; ++i) { + if (old_args[i] && old_args_free_fn) + old_args_free_fn(old_args[i]); + } + tor_free(old_args); } + + return 0; } /** Launch threads until we have n. */ @@ -346,7 +386,8 @@ threadpool_start_threads(threadpool_t *pool, int n) while (pool->n_threads < n) { void *state = pool->new_thread_state_fn(pool->new_thread_state_arg); - workerthread_t *thr = workerthread_new(state, pool->reply_queue); + workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue); + thr->index = pool->n_threads; if (!thr) { tor_mutex_release(&pool->lock); @@ -375,7 +416,10 @@ threadpool_new(int n_threads, { threadpool_t *pool; pool = tor_malloc_zero(sizeof(threadpool_t)); - tor_mutex_init(&pool->lock); + tor_mutex_init_nonrecursive(&pool->lock); + tor_cond_init(&pool->condition); + TOR_TAILQ_INIT(&pool->work); + pool->new_thread_state_fn = new_thread_state_fn; pool->new_thread_state_arg = arg; pool->free_thread_state_fn = free_thread_state_fn; @@ -447,7 +491,7 @@ replyqueue_process(replyqueue_t *queue) workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); TOR_TAILQ_REMOVE(&queue->answers, work, next_work); tor_mutex_release(&queue->lock); - work->on_thread = NULL; + work->on_pool = NULL; work->reply_fn(work->arg); workqueue_entry_free(work); diff --git a/src/common/workqueue.h b/src/common/workqueue.h index aa1bcc518a..92e82b8a48 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -27,11 +27,11 @@ workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, int (*fn)(void *, void *), void (*reply_fn)(void *), void *arg); -int threadpool_queue_for_all(threadpool_t *pool, - void *(*dup_fn)(void *), - int (*fn)(void *, void *), - void (*reply_fn)(void *), - void *arg); +int threadpool_queue_update(threadpool_t *pool, + void *(*dup_fn)(void *), + int (*fn)(void *, void *), + void (*free_fn)(void *), + void *arg); void *workqueue_entry_cancel(workqueue_entry_t *pending_work); threadpool_t *threadpool_new(int n_threads, replyqueue_t *replyqueue, diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c index 36ca505fe3..3f129ded99 100644 --- a/src/or/cpuworker.c +++ b/src/or/cpuworker.c @@ -170,11 +170,6 @@ update_state_threadfn(void *state_, void *work_) ++state->generation; return WQ_RPL_REPLY; } -static void -update_state_replyfn(void *work_) -{ - tor_free(work_); -} /** Called when the onion key has changed and we need to spawn new * cpuworkers. Close all currently idle cpuworkers, and mark the last @@ -183,11 +178,11 @@ update_state_replyfn(void *work_) void cpuworkers_rotate_keyinfo(void) { - if (threadpool_queue_for_all(threadpool, - worker_state_new, - update_state_threadfn, - update_state_replyfn, - NULL)) { + if (threadpool_queue_update(threadpool, + worker_state_new, + update_state_threadfn, + worker_state_free, + NULL)) { log_warn(LD_OR, "Failed to queue key update for worker threads."); } } diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c index 410f43cce4..8ce4405062 100644 --- a/src/test/test_workqueue.c +++ b/src/test/test_workqueue.c @@ -132,7 +132,6 @@ new_state(void *arg) /* Every thread gets its own keys. not a problem for benchmarking */ st->rsa = crypto_pk_new(); if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) { - puts("keygen failed"); crypto_pk_free(st->rsa); tor_free(st); return NULL; @@ -213,7 +212,6 @@ add_n_work_items(threadpool_t *tp, int n) while (n_queued++ < n) { ent = add_work(tp); if (! ent) { - puts("Couldn't add work."); tor_event_base_loopexit(tor_libevent_get_base(), NULL); return -1; } @@ -238,18 +236,6 @@ add_n_work_items(threadpool_t *tp, int n) } static int shutting_down = 0; -static int n_shutdowns_done = 0; - -static void -shutdown_reply(void *arg) -{ - (void)arg; - tor_assert(shutting_down); - ++n_shutdowns_done; - if (n_shutdowns_done == opt_n_threads) { - tor_event_base_loopexit(tor_libevent_get_base(), NULL); - } -} static void replysock_readable_cb(tor_socket_t sock, short what, void *arg) @@ -297,8 +283,8 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg) n_received+n_successful_cancel == n_sent && n_sent >= opt_n_items) { shutting_down = 1; - threadpool_queue_for_all(tp, NULL, - workqueue_do_shutdown, shutdown_reply, NULL); + threadpool_queue_update(tp, NULL, + workqueue_do_shutdown, NULL, NULL); } } @@ -410,8 +396,9 @@ main(int argc, char **argv) event_base_loop(tor_libevent_get_base(), 0); - if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent || - n_shutdowns_done != opt_n_threads) { + if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent) { + printf("%d vs %d\n", n_sent, opt_n_items); + printf("%d+%d vs %d\n", n_received, n_successful_cancel, n_sent); puts("FAIL"); return 1; } else { -- cgit v1.2.3-54-g00ecf From f52ac5be74b3cb6f657a6d7a1fa7db2c9595728d Mon Sep 17 00:00:00 2001 From: David Goulet Date: Wed, 21 Jan 2015 13:58:18 -0500 Subject: Fix: change copyright year in workqueue and thread tests Signed-off-by: David Goulet --- src/test/test_threads.c | 2 +- src/test/test_workqueue.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src/test') diff --git a/src/test/test_threads.c b/src/test/test_threads.c index c0293048fe..2ac08d4d28 100644 --- a/src/test/test_threads.c +++ b/src/test/test_threads.c @@ -1,6 +1,6 @@ /* Copyright (c) 2001-2004, Roger Dingledine. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. - * Copyright (c) 2007-2013, The Tor Project, Inc. */ + * Copyright (c) 2007-2015, The Tor Project, Inc. */ /* See LICENSE for licensing information */ #include "orconfig.h" diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c index 8ce4405062..aaff5069be 100644 --- a/src/test/test_workqueue.c +++ b/src/test/test_workqueue.c @@ -1,6 +1,6 @@ /* Copyright (c) 2001-2004, Roger Dingledine. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. - * Copyright (c) 2007-2013, The Tor Project, Inc. */ + * Copyright (c) 2007-2015, The Tor Project, Inc. */ /* See LICENSE for licensing information */ #include "or.h" -- cgit v1.2.3-54-g00ecf