diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/common/include.am | 2 | ||||
-rw-r--r-- | src/common/workqueue.c | 347 | ||||
-rw-r--r-- | src/common/workqueue.h | 37 | ||||
-rw-r--r-- | src/test/bench_workqueue.c | 298 | ||||
-rw-r--r-- | src/test/include.am | 13 |
5 files changed, 696 insertions, 1 deletions
diff --git a/src/common/include.am b/src/common/include.am index e4eeba6bbf..14838ab555 100644 --- a/src/common/include.am +++ b/src/common/include.am @@ -74,6 +74,7 @@ LIBOR_A_SOURCES = \ src/common/util_codedigest.c \ src/common/util_process.c \ src/common/sandbox.c \ + src/common/workqueue.c \ src/ext/csiphash.c \ src/ext/trunnel/trunnel.c \ $(libor_extra_source) \ @@ -137,6 +138,7 @@ COMMONHEADERS = \ src/common/tortls.h \ src/common/util.h \ src/common/util_process.h \ + src/common/workqueue.h \ $(libor_mempool_header) noinst_HEADERS+= $(COMMONHEADERS) diff --git a/src/common/workqueue.c b/src/common/workqueue.c new file mode 100644 index 0000000000..ea8dcb0f9b --- /dev/null +++ b/src/common/workqueue.c @@ -0,0 +1,347 @@ +/* Copyright (c) 2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "orconfig.h" +#include "compat.h" +#include "compat_threads.h" +#include "util.h" +#include "workqueue.h" +#include "tor_queue.h" +#include "torlog.h" + +#ifdef HAVE_UNISTD_H +// XXXX move wherever we move the write/send stuff +#include <unistd.h> +#endif + +/* + design: + + each thread has its own queue, try to keep at least elements min..max cycles + worth of work on each queue. + +keep array of threads; round-robin between them. + + When out of work, work-steal. + + alert threads with condition variables. + + alert main thread with fd, since it's libevent. + + + */ + +typedef struct workqueue_entry_s { + TOR_SIMPLEQ_ENTRY(workqueue_entry_s) next_work; + int (*fn)(int status, void *state, void *arg); + void (*reply_fn)(void *arg); + void *arg; +} workqueue_entry_t; + +struct replyqueue_s { + tor_mutex_t lock; + TOR_SIMPLEQ_HEAD(, workqueue_entry_s) answers; + + void (*alert_fn)(struct replyqueue_s *); // lock not held on this, next 2. + tor_socket_t write_sock; + tor_socket_t read_sock; +}; + +typedef struct workerthread_s { + tor_mutex_t lock; + tor_cond_t condition; + TOR_SIMPLEQ_HEAD(, workqueue_entry_s) work; + unsigned is_running; + unsigned is_shut_down; + unsigned waiting; + void *state; + replyqueue_t *reply_queue; +} workerthread_t; + +struct threadpool_s { + workerthread_t **threads; + int next_for_work; + + tor_mutex_t lock; + int n_threads; + + replyqueue_t *reply_queue; + + void *(*new_thread_state_fn)(void*); + void (*free_thread_state_fn)(void*); + void *new_thread_state_arg; + +}; + +static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); + +static workqueue_entry_t * +workqueue_entry_new(int (*fn)(int, void*, void*), + void (*reply_fn)(void*), + void *arg) +{ + workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t)); + ent->fn = fn; + ent->reply_fn = reply_fn; + ent->arg = arg; + return ent; +} + +static void +workqueue_entry_free(workqueue_entry_t *ent) +{ + if (!ent) + return; + tor_free(ent); +} + +static void +worker_thread_main(void *thread_) +{ + workerthread_t *thread = thread_; + workqueue_entry_t *work; + int result; + + tor_mutex_acquire(&thread->lock); + + thread->is_running = 1; + while (1) { + /* lock held. */ + while (!TOR_SIMPLEQ_EMPTY(&thread->work)) { + /* lock held. */ + + work = TOR_SIMPLEQ_FIRST(&thread->work); + TOR_SIMPLEQ_REMOVE_HEAD(&thread->work, next_work); + tor_mutex_release(&thread->lock); + + result = work->fn(WQ_CMD_RUN, thread->state, work->arg); + + if (result == WQ_RPL_QUEUE) { + queue_reply(thread->reply_queue, work); + } else { + workqueue_entry_free(work); + } + + tor_mutex_acquire(&thread->lock); + if (result >= WQ_RPL_ERROR) { + thread->is_running = 0; + thread->is_shut_down = 1; + tor_mutex_release(&thread->lock); + return; + } + } + /* Lock held; no work in this thread's queue. */ + + /* TODO: Try work-stealing. */ + + /* TODO: support an idle-function */ + + thread->waiting = 1; + if (tor_cond_wait(&thread->condition, &thread->lock, NULL) < 0) + /* ERR */ + thread->waiting = 0; + } +} + +static void +queue_reply(replyqueue_t *queue, workqueue_entry_t *work) +{ + int was_empty; + tor_mutex_acquire(&queue->lock); + was_empty = TOR_SIMPLEQ_EMPTY(&queue->answers); + TOR_SIMPLEQ_INSERT_TAIL(&queue->answers, work, next_work); + tor_mutex_release(&queue->lock); + + if (was_empty) { + queue->alert_fn(queue); + } +} + + +static void +alert_by_fd(replyqueue_t *queue) +{ + /* XXX extract this into new function */ +#ifndef _WIN32 + (void) send(queue->write_sock, "x", 1, 0); +#else + (void) write(queue->write_sock, "x", 1); +#endif +} + +static workerthread_t * +workerthread_new(void *state, replyqueue_t *replyqueue) +{ + workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); + tor_mutex_init_for_cond(&thr->lock); + tor_cond_init(&thr->condition); + TOR_SIMPLEQ_INIT(&thr->work); + thr->state = state; + thr->reply_queue = replyqueue; + + if (spawn_func(worker_thread_main, thr) < 0) { + log_err(LD_GENERAL, "Can't launch worker thread."); + return NULL; + } + + return thr; +} + +void * +threadpool_queue_work(threadpool_t *pool, + int (*fn)(int, void *, void *), + void (*reply_fn)(void *), + void *arg) +{ + workqueue_entry_t *ent; + workerthread_t *worker; + + tor_mutex_acquire(&pool->lock); + worker = pool->threads[pool->next_for_work++]; + if (!worker) { + tor_mutex_release(&pool->lock); + return NULL; + } + if (pool->next_for_work >= pool->n_threads) + pool->next_for_work = 0; + tor_mutex_release(&pool->lock); + + + ent = workqueue_entry_new(fn, reply_fn, arg); + + tor_mutex_acquire(&worker->lock); + TOR_SIMPLEQ_INSERT_TAIL(&worker->work, ent, next_work); + + if (worker->waiting) /* XXXX inside or outside of lock?? */ + tor_cond_signal_one(&worker->condition); + + tor_mutex_release(&worker->lock); + + return ent; +} + +int +threadpool_start_threads(threadpool_t *pool, int n) +{ + tor_mutex_acquire(&pool->lock); + + if (pool->n_threads < n) + pool->threads = tor_realloc(pool->threads, sizeof(workerthread_t*)*n); + + while (pool->n_threads < n) { + void *state = pool->new_thread_state_fn(pool->new_thread_state_arg); + workerthread_t *thr = workerthread_new(state, pool->reply_queue); + + if (!thr) { + tor_mutex_release(&pool->lock); + return -1; + } + pool->threads[pool->n_threads++] = thr; + } + tor_mutex_release(&pool->lock); + + return 0; +} + +threadpool_t * +threadpool_new(int n_threads, + replyqueue_t *replyqueue, + void *(*new_thread_state_fn)(void*), + void (*free_thread_state_fn)(void*), + void *arg) +{ + threadpool_t *pool; + pool = tor_malloc_zero(sizeof(threadpool_t)); + tor_mutex_init(&pool->lock); + pool->new_thread_state_fn = new_thread_state_fn; + pool->new_thread_state_arg = arg; + pool->free_thread_state_fn = free_thread_state_fn; + pool->reply_queue = replyqueue; + + if (threadpool_start_threads(pool, n_threads) < 0) { + tor_mutex_uninit(&pool->lock); + tor_free(pool); + return NULL; + } + + return pool; +} + +replyqueue_t * +threadpool_get_replyqueue(threadpool_t *tp) +{ + return tp->reply_queue; +} + +replyqueue_t * +replyqueue_new(void) +{ + tor_socket_t pair[2]; + replyqueue_t *rq; + int r; + + /* XXX extract this into new function */ +#ifdef _WIN32 + r = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, pair); +#else + r = pipe(pair); +#endif + if (r < 0) + return NULL; + + set_socket_nonblocking(pair[0]); /* the read-size should be nonblocking. */ +#if defined(FD_CLOEXEC) + fcntl(pair[0], F_SETFD, FD_CLOEXEC); + fcntl(pair[1], F_SETFD, FD_CLOEXEC); +#endif + + rq = tor_malloc_zero(sizeof(replyqueue_t)); + + tor_mutex_init(&rq->lock); + TOR_SIMPLEQ_INIT(&rq->answers); + + rq->read_sock = pair[0]; + rq->write_sock = pair[1]; + rq->alert_fn = alert_by_fd; + + return rq; +} + +tor_socket_t +replyqueue_get_socket(replyqueue_t *rq) +{ + return rq->read_sock; +} + +void +replyqueue_process(replyqueue_t *queue) +{ + ssize_t r; + + /* XXX extract this into new function */ + do { + char buf[64]; +#ifdef _WIN32 + r = recv(queue->read_sock, buf, sizeof(buf), 0); +#else + r = read(queue->read_sock, buf, sizeof(buf)); +#endif + } while (r > 0); + + /* XXXX freak out on r == 0, or r == "error, not retryable". */ + + tor_mutex_acquire(&queue->lock); + while (!TOR_SIMPLEQ_EMPTY(&queue->answers)) { + /* lock held. */ + workqueue_entry_t *work = TOR_SIMPLEQ_FIRST(&queue->answers); + TOR_SIMPLEQ_REMOVE_HEAD(&queue->answers, next_work); + tor_mutex_release(&queue->lock); + + work->reply_fn(work->arg); + workqueue_entry_free(work); + + tor_mutex_acquire(&queue->lock); + } + + tor_mutex_release(&queue->lock); +} diff --git a/src/common/workqueue.h b/src/common/workqueue.h new file mode 100644 index 0000000000..e502734b84 --- /dev/null +++ b/src/common/workqueue.h @@ -0,0 +1,37 @@ +/* Copyright (c) 2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_WORKQUEUE_H +#define TOR_WORKQUEUE_H + +#include "compat.h" + +typedef struct replyqueue_s replyqueue_t; +typedef struct threadpool_s threadpool_t; + + +#define WQ_CMD_RUN 0 +#define WQ_CMD_CANCEL 1 + +#define WQ_RPL_QUEUE 0 +#define WQ_RPL_NOQUEUE 1 +#define WQ_RPL_ERROR 2 +#define WQ_RPL_SHUTDOWN 3 + +void *threadpool_queue_work(threadpool_t *pool, + int (*fn)(int, void *, void *), + void (*reply_fn)(void *), + void *arg); +int threadpool_start_threads(threadpool_t *pool, int n); +threadpool_t *threadpool_new(int n_threads, + replyqueue_t *replyqueue, + void *(*new_thread_state_fn)(void*), + void (*free_thread_state_fn)(void*), + void *arg); +replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp); + +replyqueue_t *replyqueue_new(void); +tor_socket_t replyqueue_get_socket(replyqueue_t *rq); +void replyqueue_process(replyqueue_t *queue); + +#endif diff --git a/src/test/bench_workqueue.c b/src/test/bench_workqueue.c new file mode 100644 index 0000000000..1bdfbefb3e --- /dev/null +++ b/src/test/bench_workqueue.c @@ -0,0 +1,298 @@ +/* Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "or.h" +#include "compat_threads.h" +#include "onion.h" +#include "workqueue.h" +#include "crypto.h" +#include "crypto_curve25519.h" +#include "compat_libevent.h" + +#include <stdio.h> +#ifdef HAVE_EVENT2_EVENT_H +#include <event2/event.h> +#else +#include <event.h> +#endif + +#ifdef TRACK_RESPONSES +tor_mutex_t bitmap_mutex; +int handled_len; +bitarray_t *handled; +#endif + +#define N_ITEMS 10000 +#define N_INFLIGHT 1000 +#define RELAUNCH_AT 250 + +typedef struct state_s { + int magic; + int n_handled; + crypto_pk_t *rsa; + curve25519_secret_key_t ecdh; +} state_t; + +typedef struct rsa_work_s { + int serial; + uint8_t msg[128]; + uint8_t msglen; +} rsa_work_t; + +typedef struct ecdh_work_s { + int serial; + union { + curve25519_public_key_t pk; + uint8_t msg[32]; + } u; +} ecdh_work_t; + +static void +mark_handled(int serial) +{ +#ifdef TRACK_RESPONSES + tor_mutex_acquire(&bitmap_mutex); + tor_assert(serial < handled_len); + tor_assert(! bitarray_is_set(handled, serial)); + bitarray_set(handled, serial); + tor_mutex_release(&bitmap_mutex); +#else + (void)serial; +#endif +} + +static int +workqueue_do_rsa(int cmd, void *state, void *work) +{ + rsa_work_t *rw = work; + state_t *st = state; + crypto_pk_t *rsa = st->rsa; + uint8_t sig[256]; + int len; + + tor_assert(st->magic == 13371337); + + if (cmd == WQ_CMD_CANCEL) { + tor_free(work); + return WQ_RPL_NOQUEUE; + } + + len = crypto_pk_private_sign(rsa, (char*)sig, 256, + (char*)rw->msg, rw->msglen); + if (len < 0) { + tor_free(work); + return WQ_RPL_NOQUEUE; + } + + memset(rw->msg, 0, sizeof(rw->msg)); + rw->msglen = len; + memcpy(rw->msg, sig, len); + ++st->n_handled; + + mark_handled(rw->serial); + + return WQ_RPL_QUEUE; +} + +#if 0 +static int +workqueue_do_shutdown(int cmd, void *state, void *work) +{ + (void)state; + (void)work; + (void)cmd; + crypto_pk_free(((state_t*)state)->rsa); + tor_free(state); + return WQ_RPL_SHUTDOWN; +} +#endif + +static int +workqueue_do_ecdh(int cmd, void *state, void *work) +{ + ecdh_work_t *ew = work; + uint8_t output[CURVE25519_OUTPUT_LEN]; + state_t *st = state; + + tor_assert(st->magic == 13371337); + + if (cmd == WQ_CMD_CANCEL) { + tor_free(work); + return WQ_RPL_NOQUEUE; + } + + curve25519_handshake(output, &st->ecdh, &ew->u.pk); + memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN); + ++st->n_handled; + mark_handled(ew->serial); + return WQ_RPL_QUEUE; +} + +static void * +new_state(void *arg) +{ + state_t *st; + (void)arg; + + st = tor_malloc(sizeof(*st)); + /* Every thread gets its own keys. not a problem for benchmarking */ + st->rsa = crypto_pk_new(); + if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) { + puts("keygen failed"); + crypto_pk_free(st->rsa); + tor_free(st); + return NULL; + } + curve25519_secret_key_generate(&st->ecdh, 0); + st->magic = 13371337; + return st; +} + +static void +free_state(void *arg) +{ + state_t *st = arg; + crypto_pk_free(st->rsa); + tor_free(st); +} + +static tor_weak_rng_t weak_rng; +static int n_sent = 0; +static int rsa_sent = 0; +static int ecdh_sent = 0; +static int n_received = 0; + +#ifdef TRACK_RESPONSES +bitarray_t *received; +#endif + +static void +handle_reply(void *arg) +{ +#ifdef TRACK_RESPONSES + rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */ + tor_assert(! bitarray_is_set(received, rw->serial)); + bitarray_set(received,rw->serial); +#endif + + tor_free(arg); + ++n_received; +} + +static int +add_work(threadpool_t *tp) +{ + int add_rsa = tor_weak_random_range(&weak_rng, 5) == 0; + if (add_rsa) { + rsa_work_t *w = tor_malloc_zero(sizeof(*w)); + w->serial = n_sent++; + crypto_rand((char*)w->msg, 20); + w->msglen = 20; + ++rsa_sent; + return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL; + } else { + ecdh_work_t *w = tor_malloc_zero(sizeof(*w)); + w->serial = n_sent++; + /* Not strictly right, but this is just for benchmarks. */ + crypto_rand((char*)w->u.pk.public_key, 32); + ++ecdh_sent; + return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL; + } +} + +static void +replysock_readable_cb(tor_socket_t sock, short what, void *arg) +{ + threadpool_t *tp = arg; + replyqueue_t *rq = threadpool_get_replyqueue(tp); + + int old_r = n_received; + (void) sock; + (void) what; + + replyqueue_process(rq); + if (old_r == n_received) + return; + + printf("%d / %d\n", n_received, n_sent); +#ifdef TRACK_RESPONSES + tor_mutex_acquire(&bitmap_mutex); + for (i = 0; i < N_ITEMS; ++i) { + if (bitarray_is_set(received, i)) + putc('o', stdout); + else if (bitarray_is_set(handled, i)) + putc('!', stdout); + else + putc('.', stdout); + } + puts(""); + tor_mutex_release(&bitmap_mutex); +#endif + + if (n_sent - n_received < RELAUNCH_AT) { + while (n_sent < n_received + N_INFLIGHT && n_sent < N_ITEMS) { + if (! add_work(tp)) { + puts("Couldn't add work."); + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + } + } + } + + if (n_received == n_sent && n_sent >= N_ITEMS) { + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + } +} + +int +main(int argc, char **argv) +{ + replyqueue_t *rq; + threadpool_t *tp; + int i; + tor_libevent_cfg evcfg; + struct event *ev; + + (void)argc; + (void)argv; + + init_logging(1); + crypto_global_init(1, NULL, NULL); + crypto_seed_rng(1); + + rq = replyqueue_new(); + tor_assert(rq); + tp = threadpool_new(16, + rq, new_state, free_state, NULL); + tor_assert(tp); + + crypto_seed_weak_rng(&weak_rng); + + memset(&evcfg, 0, sizeof(evcfg)); + tor_libevent_initialize(&evcfg); + + ev = tor_event_new(tor_libevent_get_base(), + replyqueue_get_socket(rq), EV_READ|EV_PERSIST, + replysock_readable_cb, tp); + + event_add(ev, NULL); + +#ifdef TRACK_RESPONSES + handled = bitarray_init_zero(N_ITEMS); + received = bitarray_init_zero(N_ITEMS); + tor_mutex_init(&bitmap_mutex); + handled_len = N_ITEMS; +#endif + + for (i = 0; i < N_INFLIGHT; ++i) { + if (! add_work(tp)) { + puts("Couldn't add work."); + return 1; + } + } + + event_base_loop(tor_libevent_get_base(), 0); + + return 0; +} diff --git a/src/test/include.am b/src/test/include.am index b9b381fdae..6ad1b552b7 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -1,6 +1,6 @@ TESTS += src/test/test -noinst_PROGRAMS+= src/test/bench +noinst_PROGRAMS+= src/test/bench src/test/bench_workqueue if UNITTESTS_ENABLED noinst_PROGRAMS+= src/test/test src/test/test-child endif @@ -62,6 +62,9 @@ src_test_test_CPPFLAGS= $(src_test_AM_CPPFLAGS) src_test_bench_SOURCES = \ src/test/bench.c +src_test_bench_workqueue_SOURCES = \ + src/test/bench_workqueue.c + src_test_test_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ @TOR_LDFLAGS_libevent@ src_test_test_LDADD = src/or/libtor-testing.a src/common/libor-testing.a \ @@ -80,6 +83,14 @@ src_test_bench_LDADD = src/or/libtor.a src/common/libor.a \ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ \ @TOR_SYSTEMD_LIBS@ +src_test_bench_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ + @TOR_LDFLAGS_libevent@ +src_test_bench_workqueue_LDADD = src/or/libtor.a src/common/libor.a \ + src/common/libor-crypto.a $(LIBDONNA) \ + src/common/libor-event.a \ + @TOR_ZLIB_LIBS@ @TOR_LIB_MATH@ @TOR_LIBEVENT_LIBS@ \ + @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ + noinst_HEADERS+= \ src/test/fakechans.h \ src/test/test.h \ |