summaryrefslogtreecommitdiff
path: root/src/test/test_workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/test_workqueue.c')
-rw-r--r--src/test/test_workqueue.c342
1 files changed, 342 insertions, 0 deletions
diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c
new file mode 100644
index 0000000000..4077fb27a8
--- /dev/null
+++ b/src/test/test_workqueue.c
@@ -0,0 +1,342 @@
+/* Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2013, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "or.h"
+#include "compat_threads.h"
+#include "onion.h"
+#include "workqueue.h"
+#include "crypto.h"
+#include "crypto_curve25519.h"
+#include "compat_libevent.h"
+
+#include <stdio.h>
+#ifdef HAVE_EVENT2_EVENT_H
+#include <event2/event.h>
+#else
+#include <event.h>
+#endif
+
+static int opt_verbose = 0;
+static int opt_n_threads = 8;
+static int opt_n_items = 10000;
+static int opt_n_inflight = 1000;
+static int opt_n_lowwater = 250;
+static int opt_ratio_rsa = 5;
+
+#ifdef TRACK_RESPONSES
+tor_mutex_t bitmap_mutex;
+int handled_len;
+bitarray_t *handled;
+#endif
+
+typedef struct state_s {
+ int magic;
+ int n_handled;
+ crypto_pk_t *rsa;
+ curve25519_secret_key_t ecdh;
+} state_t;
+
+typedef struct rsa_work_s {
+ int serial;
+ uint8_t msg[128];
+ uint8_t msglen;
+} rsa_work_t;
+
+typedef struct ecdh_work_s {
+ int serial;
+ union {
+ curve25519_public_key_t pk;
+ uint8_t msg[32];
+ } u;
+} ecdh_work_t;
+
+static void
+mark_handled(int serial)
+{
+#ifdef TRACK_RESPONSES
+ tor_mutex_acquire(&bitmap_mutex);
+ tor_assert(serial < handled_len);
+ tor_assert(! bitarray_is_set(handled, serial));
+ bitarray_set(handled, serial);
+ tor_mutex_release(&bitmap_mutex);
+#else
+ (void)serial;
+#endif
+}
+
+static int
+workqueue_do_rsa(void *state, void *work)
+{
+ rsa_work_t *rw = work;
+ state_t *st = state;
+ crypto_pk_t *rsa = st->rsa;
+ uint8_t sig[256];
+ int len;
+
+ tor_assert(st->magic == 13371337);
+
+ len = crypto_pk_private_sign(rsa, (char*)sig, 256,
+ (char*)rw->msg, rw->msglen);
+ if (len < 0) {
+ rw->msglen = 0;
+ return WQ_RPL_ERROR;
+ }
+
+ memset(rw->msg, 0, sizeof(rw->msg));
+ rw->msglen = len;
+ memcpy(rw->msg, sig, len);
+ ++st->n_handled;
+
+ mark_handled(rw->serial);
+
+ return WQ_RPL_REPLY;
+}
+
+#if 0
+static int
+workqueue_do_shutdown(void *state, void *work)
+{
+ (void)state;
+ (void)work;
+ (void)cmd;
+ crypto_pk_free(((state_t*)state)->rsa);
+ tor_free(state);
+ return WQ_RPL_SHUTDOWN;
+}
+#endif
+
+static int
+workqueue_do_ecdh(void *state, void *work)
+{
+ ecdh_work_t *ew = work;
+ uint8_t output[CURVE25519_OUTPUT_LEN];
+ state_t *st = state;
+
+ tor_assert(st->magic == 13371337);
+
+ curve25519_handshake(output, &st->ecdh, &ew->u.pk);
+ memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN);
+ ++st->n_handled;
+ mark_handled(ew->serial);
+ return WQ_RPL_REPLY;
+}
+
+static void *
+new_state(void *arg)
+{
+ state_t *st;
+ (void)arg;
+
+ st = tor_malloc(sizeof(*st));
+ /* Every thread gets its own keys. not a problem for benchmarking */
+ st->rsa = crypto_pk_new();
+ if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) {
+ puts("keygen failed");
+ crypto_pk_free(st->rsa);
+ tor_free(st);
+ return NULL;
+ }
+ curve25519_secret_key_generate(&st->ecdh, 0);
+ st->magic = 13371337;
+ return st;
+}
+
+static void
+free_state(void *arg)
+{
+ state_t *st = arg;
+ crypto_pk_free(st->rsa);
+ tor_free(st);
+}
+
+static tor_weak_rng_t weak_rng;
+static int n_sent = 0;
+static int rsa_sent = 0;
+static int ecdh_sent = 0;
+static int n_received = 0;
+
+#ifdef TRACK_RESPONSES
+bitarray_t *received;
+#endif
+
+static void
+handle_reply(void *arg)
+{
+#ifdef TRACK_RESPONSES
+ rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */
+ tor_assert(! bitarray_is_set(received, rw->serial));
+ bitarray_set(received,rw->serial);
+#endif
+
+ tor_free(arg);
+ ++n_received;
+}
+
+static int
+add_work(threadpool_t *tp)
+{
+ int add_rsa =
+ opt_ratio_rsa == 0 ||
+ tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0;
+ if (add_rsa) {
+ rsa_work_t *w = tor_malloc_zero(sizeof(*w));
+ w->serial = n_sent++;
+ crypto_rand((char*)w->msg, 20);
+ w->msglen = 20;
+ ++rsa_sent;
+ return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL;
+ } else {
+ ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
+ w->serial = n_sent++;
+ /* Not strictly right, but this is just for benchmarks. */
+ crypto_rand((char*)w->u.pk.public_key, 32);
+ ++ecdh_sent;
+ return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL;
+ }
+}
+
+static void
+replysock_readable_cb(tor_socket_t sock, short what, void *arg)
+{
+ threadpool_t *tp = arg;
+ replyqueue_t *rq = threadpool_get_replyqueue(tp);
+
+ int old_r = n_received;
+ (void) sock;
+ (void) what;
+
+ replyqueue_process(rq);
+ if (old_r == n_received)
+ return;
+
+ if (opt_verbose)
+ printf("%d / %d\n", n_received, n_sent);
+#ifdef TRACK_RESPONSES
+ tor_mutex_acquire(&bitmap_mutex);
+ for (i = 0; i < opt_n_items; ++i) {
+ if (bitarray_is_set(received, i))
+ putc('o', stdout);
+ else if (bitarray_is_set(handled, i))
+ putc('!', stdout);
+ else
+ putc('.', stdout);
+ }
+ puts("");
+ tor_mutex_release(&bitmap_mutex);
+#endif
+
+ if (n_sent - n_received < opt_n_lowwater) {
+ while (n_sent < n_received + opt_n_inflight && n_sent < opt_n_items) {
+ if (! add_work(tp)) {
+ puts("Couldn't add work.");
+ tor_event_base_loopexit(tor_libevent_get_base(), NULL);
+ }
+ }
+ }
+
+ if (n_received == n_sent && n_sent >= opt_n_items) {
+ tor_event_base_loopexit(tor_libevent_get_base(), NULL);
+ }
+}
+
+static void
+help(void)
+{
+ puts(
+ "Options:\n"
+ " -N <items> Run this many items of work\n"
+ " -T <threads> Use this many threads\n"
+ " -I <inflight> Have no more than this many requests queued at once\n"
+ " -L <lowwater> Add items whenever fewer than this many are pending.\n"
+ " -R <ratio> Make one out of this many items be a slow (RSA) one");
+}
+
+int
+main(int argc, char **argv)
+{
+ replyqueue_t *rq;
+ threadpool_t *tp;
+ int i;
+ tor_libevent_cfg evcfg;
+ struct event *ev;
+
+ for (i = 1; i < argc; ++i) {
+ if (!strcmp(argv[i], "-v")) {
+ opt_verbose = 1;
+ } else if (!strcmp(argv[i], "-T") && i+1<argc) {
+ opt_n_threads = atoi(argv[++i]);
+ } else if (!strcmp(argv[i], "-N") && i+1<argc) {
+ opt_n_items = atoi(argv[++i]);
+ } else if (!strcmp(argv[i], "-I") && i+1<argc) {
+ opt_n_inflight = atoi(argv[++i]);
+ } else if (!strcmp(argv[i], "-L") && i+1<argc) {
+ opt_n_lowwater = atoi(argv[++i]);
+ } else if (!strcmp(argv[i], "-R") && i+1<argc) {
+ opt_ratio_rsa = atoi(argv[++i]);
+ } else if (!strcmp(argv[i], "-h")) {
+ help();
+ return 0;
+ } else {
+ help();
+ return 1;
+ }
+ }
+ if (opt_n_threads < 1 ||
+ opt_n_items < 1 || opt_n_inflight < 1 || opt_n_lowwater < 0 ||
+ opt_ratio_rsa < 0) {
+ help();
+ return 1;
+ }
+
+ init_logging(1);
+ crypto_global_init(1, NULL, NULL);
+ crypto_seed_rng(1);
+
+ rq = replyqueue_new();
+ tor_assert(rq);
+ tp = threadpool_new(opt_n_threads,
+ rq, new_state, free_state, NULL);
+ tor_assert(tp);
+
+ crypto_seed_weak_rng(&weak_rng);
+
+ memset(&evcfg, 0, sizeof(evcfg));
+ tor_libevent_initialize(&evcfg);
+
+ ev = tor_event_new(tor_libevent_get_base(),
+ replyqueue_get_socket(rq), EV_READ|EV_PERSIST,
+ replysock_readable_cb, tp);
+
+ event_add(ev, NULL);
+
+#ifdef TRACK_RESPONSES
+ handled = bitarray_init_zero(opt_n_items);
+ received = bitarray_init_zero(opt_n_items);
+ tor_mutex_init(&bitmap_mutex);
+ handled_len = opt_n_items;
+#endif
+
+ for (i = 0; i < opt_n_inflight; ++i) {
+ if (! add_work(tp)) {
+ puts("Couldn't add work.");
+ return 1;
+ }
+ }
+
+ {
+ struct timeval limit = { 30, 0 };
+ tor_event_base_loopexit(tor_libevent_get_base(), &limit);
+ }
+
+ event_base_loop(tor_libevent_get_base(), 0);
+
+ if (n_sent != opt_n_items || n_received != n_sent) {
+ puts("FAIL");
+ return 1;
+ } else {
+ puts("OK");
+ return 0;
+ }
+}