summaryrefslogtreecommitdiff
path: root/src/test/test_workqueue.c
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2013-09-28 00:33:10 -0400
committerNick Mathewson <nickm@torproject.org>2015-01-14 11:17:46 -0500
commite5f8c772f4c468a20da8b9176c2b276ac76bbe78 (patch)
tree22b763ad1eaf3903ce95b5b061cdac5bcac6047d /src/test/test_workqueue.c
parentebbc177005eaf9bd949daba657b2c703a7bd1769 (diff)
downloadtor-e5f8c772f4c468a20da8b9176c2b276ac76bbe78.tar.gz
tor-e5f8c772f4c468a20da8b9176c2b276ac76bbe78.zip
Test and fix workqueue_entry_cancel().
Diffstat (limited to 'src/test/test_workqueue.c')
-rw-r--r--src/test/test_workqueue.c77
1 files changed, 63 insertions, 14 deletions
diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c
index cbf9d81950..6de6f03c33 100644
--- a/src/test/test_workqueue.c
+++ b/src/test/test_workqueue.c
@@ -23,6 +23,7 @@ static int opt_n_threads = 8;
static int opt_n_items = 10000;
static int opt_n_inflight = 1000;
static int opt_n_lowwater = 250;
+static int opt_n_cancel = 0;
static int opt_ratio_rsa = 5;
#ifdef TRACK_RESPONSES
@@ -172,27 +173,68 @@ handle_reply(void *arg)
++n_received;
}
-static int
+static workqueue_entry_t *
add_work(threadpool_t *tp)
{
int add_rsa =
opt_ratio_rsa == 0 ||
tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0;
+
if (add_rsa) {
rsa_work_t *w = tor_malloc_zero(sizeof(*w));
w->serial = n_sent++;
crypto_rand((char*)w->msg, 20);
w->msglen = 20;
++rsa_sent;
- return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL;
+ return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w);
} else {
ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
w->serial = n_sent++;
/* Not strictly right, but this is just for benchmarks. */
crypto_rand((char*)w->u.pk.public_key, 32);
++ecdh_sent;
- return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL;
+ return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w);
+ }
+}
+
+static int n_failed_cancel = 0;
+static int n_successful_cancel = 0;
+
+static int
+add_n_work_items(threadpool_t *tp, int n)
+{
+ int n_queued = 0;
+ int n_try_cancel = 0, i;
+ workqueue_entry_t **to_cancel;
+ workqueue_entry_t *ent;
+
+ to_cancel = tor_malloc(sizeof(workqueue_entry_t*) * opt_n_cancel);
+
+ while (n_queued++ < n) {
+ ent = add_work(tp);
+ if (! ent) {
+ puts("Couldn't add work.");
+ tor_event_base_loopexit(tor_libevent_get_base(), NULL);
+ return -1;
+ }
+ if (n_try_cancel < opt_n_cancel &&
+ tor_weak_random_range(&weak_rng, n) < opt_n_cancel) {
+ to_cancel[n_try_cancel++] = ent;
+ }
+ }
+
+ for (i = 0; i < n_try_cancel; ++i) {
+ void *work = workqueue_entry_cancel(to_cancel[i]);
+ if (! work) {
+ n_failed_cancel++;
+ } else {
+ n_successful_cancel++;
+ tor_free(work);
+ }
}
+
+ tor_free(to_cancel);
+ return 0;
}
static int shutting_down = 0;
@@ -223,8 +265,13 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg)
if (old_r == n_received)
return;
- if (opt_verbose)
- printf("%d / %d\n", n_received, n_sent);
+ if (opt_verbose) {
+ printf("%d / %d", n_received, n_sent);
+ if (opt_n_cancel)
+ printf(" (%d cancelled, %d uncancellable)",
+ n_successful_cancel, n_failed_cancel);
+ puts("");
+ }
#ifdef TRACK_RESPONSES
tor_mutex_acquire(&bitmap_mutex);
for (i = 0; i < opt_n_items; ++i) {
@@ -239,16 +286,14 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg)
tor_mutex_release(&bitmap_mutex);
#endif
- if (n_sent - n_received < opt_n_lowwater) {
- while (n_sent < n_received + opt_n_inflight && n_sent < opt_n_items) {
- if (! add_work(tp)) {
- puts("Couldn't add work.");
- tor_event_base_loopexit(tor_libevent_get_base(), NULL);
- }
- }
+ if (n_sent - (n_received+n_successful_cancel) < opt_n_lowwater) {
+ int n_to_send = n_received + opt_n_inflight - n_sent;
+ if (n_to_send > opt_n_items - n_sent)
+ n_to_send = opt_n_items - n_sent;
+ add_n_work_items(tp, n_to_send);
}
- if (shutting_down == 0 && n_received == n_sent && n_sent >= opt_n_items) {
+ if (shutting_down == 0 && n_received+n_successful_cancel == n_sent && n_sent >= opt_n_items) {
shutting_down = 1;
threadpool_queue_for_all(tp, NULL, workqueue_do_shutdown, shutdown_reply, NULL);
}
@@ -263,6 +308,7 @@ help(void)
" -T <threads> Use this many threads\n"
" -I <inflight> Have no more than this many requests queued at once\n"
" -L <lowwater> Add items whenever fewer than this many are pending\n"
+ " -C <cancel> Try to cancel N items of every batch that we add\n"
" -R <ratio> Make one out of this many items be a slow (RSA) one\n"
" --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n"
" Disable one of the alert_socket backends.");
@@ -291,6 +337,8 @@ main(int argc, char **argv)
opt_n_lowwater = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-R") && i+1<argc) {
opt_ratio_rsa = atoi(argv[++i]);
+ } else if (!strcmp(argv[i], "-C") && i+1<argc) {
+ opt_n_cancel = atoi(argv[++i]);
} else if (!strcmp(argv[i], "--no-eventfd2")) {
as_flags |= ASOCKS_NOEVENTFD2;
} else if (!strcmp(argv[i], "--no-eventfd")) {
@@ -311,6 +359,7 @@ main(int argc, char **argv)
}
if (opt_n_threads < 1 ||
opt_n_items < 1 || opt_n_inflight < 1 || opt_n_lowwater < 0 ||
+ opt_n_cancel > opt_n_inflight ||
opt_ratio_rsa < 0) {
help();
return 1;
@@ -358,7 +407,7 @@ main(int argc, char **argv)
event_base_loop(tor_libevent_get_base(), 0);
- if (n_sent != opt_n_items || n_received != n_sent ||
+ if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent ||
n_shutdowns_done != opt_n_threads) {
puts("FAIL");
return 1;