summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2015-01-21 14:47:16 -0500
committerNick Mathewson <nickm@torproject.org>2015-01-21 14:47:16 -0500
commit23fc1691b6dff4b0a3ad173809915901eb47fcab (patch)
tree30e39346fb1fbd44ec4b5c71e4c8fbb53848c0b2
parentf0415c16001ebfd06aec7a16bdf8677c7a931b65 (diff)
parent84f5cb749d614deeb66f9032c54cd9885e300493 (diff)
downloadtor-23fc1691b6dff4b0a3ad173809915901eb47fcab.tar.gz
tor-23fc1691b6dff4b0a3ad173809915901eb47fcab.zip
Merge branch 'better_workqueue_v3_squashed'
-rw-r--r--.gitignore2
-rw-r--r--changes/better_workqueues10
-rw-r--r--configure.ac10
-rw-r--r--src/common/compat.c395
-rw-r--r--src/common/compat.h57
-rw-r--r--src/common/compat_pthreads.c285
-rw-r--r--src/common/compat_threads.c303
-rw-r--r--src/common/compat_threads.h115
-rw-r--r--src/common/compat_winthreads.c196
-rw-r--r--src/common/include.am15
-rw-r--r--src/common/workqueue.c490
-rw-r--r--src/common/workqueue.h48
-rw-r--r--src/or/circuitlist.c17
-rw-r--r--src/or/command.c2
-rw-r--r--src/or/config.c2
-rw-r--r--src/or/connection.c24
-rw-r--r--src/or/cpuworker.c746
-rw-r--r--src/or/cpuworker.h10
-rw-r--r--src/or/main.c6
-rw-r--r--src/or/onion.c19
-rw-r--r--src/or/onion.h4
-rw-r--r--src/or/or.h23
-rw-r--r--src/test/include.am17
-rw-r--r--src/test/test.c19
-rw-r--r--src/test/test.h2
-rw-r--r--src/test/test_crypto.c36
-rw-r--r--src/test/test_threads.c316
-rw-r--r--src/test/test_util.c158
-rw-r--r--src/test/test_workqueue.c409
29 files changed, 2591 insertions, 1145 deletions
diff --git a/.gitignore b/.gitignore
index 9ddd0c5385..e63576cfd4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -163,10 +163,12 @@ cscope.*
/src/test/test-bt-cl
/src/test/test-child
/src/test/test-ntor-cl
+/src/test/test_workqueue
/src/test/test.exe
/src/test/test-bt-cl.exe
/src/test/test-child.exe
/src/test/test-ntor-cl.exe
+/src/test/test_workqueue.exe
# /src/tools/
/src/tools/tor-checkkey
diff --git a/changes/better_workqueues b/changes/better_workqueues
new file mode 100644
index 0000000000..32c984cb71
--- /dev/null
+++ b/changes/better_workqueues
@@ -0,0 +1,10 @@
+ o Major features:
+ - Refactor the CPU worker implementation for better performance by
+ avoiding the kernel and lengthening pipelines. The original
+ implementation used sockets to transfer data from the main thread
+ to the worker threads, and didn't allow any thread to be assigned
+ more than a single piece of work at once. The new implementation
+ avoids communications overhead by making requests in shared
+ memory, avoiding kernel IO where possible, and keeping more
+ request in flight at once. Resolves issue #9682.
+
diff --git a/configure.ac b/configure.ac
index efeea0658e..9aac1e8c55 100644
--- a/configure.ac
+++ b/configure.ac
@@ -400,6 +400,10 @@ fi
AC_SEARCH_LIBS(pthread_create, [pthread])
AC_SEARCH_LIBS(pthread_detach, [pthread])
+AM_CONDITIONAL(THREADS_WIN32, test "$enable_threads" = "yes" && test "$bwin32" = "true")
+AM_CONDITIONAL(THREADS_PTHREADS, test "$enable_threads" = "yes" && test "$bwin32" = "false")
+AM_CONDITIONAL(THREADS_NONE, test "$enable_threads" != "yes")
+
dnl -------------------------------------------------------------------
dnl Check for functions before libevent, since libevent-1.2 apparently
dnl exports strlcpy without defining it in a header.
@@ -410,6 +414,7 @@ AC_CHECK_FUNCS(
backtrace \
backtrace_symbols_fd \
clock_gettime \
+ eventfd \
flock \
ftime \
getaddrinfo \
@@ -424,6 +429,8 @@ AC_CHECK_FUNCS(
localtime_r \
lround \
memmem \
+ pipe \
+ pipe2 \
prctl \
rint \
sigaction \
@@ -437,7 +444,7 @@ AC_CHECK_FUNCS(
sysconf \
sysctl \
uname \
- usleep \
+ usleep \
vasprintf \
_vscprintf
)
@@ -965,6 +972,7 @@ AC_CHECK_HEADERS(
netinet/in6.h \
pwd.h \
stdint.h \
+ sys/eventfd.h \
sys/file.h \
sys/ioctl.h \
sys/limits.h \
diff --git a/src/common/compat.c b/src/common/compat.c
index 6d36321193..5575316b2b 100644
--- a/src/common/compat.c
+++ b/src/common/compat.c
@@ -27,7 +27,6 @@
#include "compat.h"
#ifdef _WIN32
-#include <process.h>
#include <windows.h>
#include <sys/locking.h>
#endif
@@ -2544,109 +2543,6 @@ get_uname(void)
* Process control
*/
-#if defined(USE_PTHREADS)
-/** Wraps a void (*)(void*) function and its argument so we can
- * invoke them in a way pthreads would expect.
- */
-typedef struct tor_pthread_data_t {
- void (*func)(void *);
- void *data;
-} tor_pthread_data_t;
-/** Given a tor_pthread_data_t <b>_data</b>, call _data-&gt;func(d-&gt;data)
- * and free _data. Used to make sure we can call functions the way pthread
- * expects. */
-static void *
-tor_pthread_helper_fn(void *_data)
-{
- tor_pthread_data_t *data = _data;
- void (*func)(void*);
- void *arg;
- /* mask signals to worker threads to avoid SIGPIPE, etc */
- sigset_t sigs;
- /* We're in a subthread; don't handle any signals here. */
- sigfillset(&sigs);
- pthread_sigmask(SIG_SETMASK, &sigs, NULL);
-
- func = data->func;
- arg = data->data;
- tor_free(_data);
- func(arg);
- return NULL;
-}
-/**
- * A pthread attribute to make threads start detached.
- */
-static pthread_attr_t attr_detached;
-/** True iff we've called tor_threads_init() */
-static int threads_initialized = 0;
-#endif
-
-/** Minimalist interface to run a void function in the background. On
- * Unix calls fork, on win32 calls beginthread. Returns -1 on failure.
- * func should not return, but rather should call spawn_exit.
- *
- * NOTE: if <b>data</b> is used, it should not be allocated on the stack,
- * since in a multithreaded environment, there is no way to be sure that
- * the caller's stack will still be around when the called function is
- * running.
- */
-int
-spawn_func(void (*func)(void *), void *data)
-{
-#if defined(USE_WIN32_THREADS)
- int rv;
- rv = (int)_beginthread(func, 0, data);
- if (rv == (int)-1)
- return -1;
- return 0;
-#elif defined(USE_PTHREADS)
- pthread_t thread;
- tor_pthread_data_t *d;
- if (PREDICT_UNLIKELY(!threads_initialized))
- tor_threads_init();
- d = tor_malloc(sizeof(tor_pthread_data_t));
- d->data = data;
- d->func = func;
- if (pthread_create(&thread,&attr_detached,tor_pthread_helper_fn,d))
- return -1;
- return 0;
-#else
- pid_t pid;
- pid = fork();
- if (pid<0)
- return -1;
- if (pid==0) {
- /* Child */
- func(data);
- tor_assert(0); /* Should never reach here. */
- return 0; /* suppress "control-reaches-end-of-non-void" warning. */
- } else {
- /* Parent */
- return 0;
- }
-#endif
-}
-
-/** End the current thread/process.
- */
-void
-spawn_exit(void)
-{
-#if defined(USE_WIN32_THREADS)
- _endthread();
- //we should never get here. my compiler thinks that _endthread returns, this
- //is an attempt to fool it.
- tor_assert(0);
- _exit(0);
-#elif defined(USE_PTHREADS)
- pthread_exit(NULL);
-#else
- /* http://www.erlenstar.demon.co.uk/unix/faq_2.html says we should
- * call _exit, not exit, from child processes. */
- _exit(0);
-#endif
-}
-
/** Implementation logic for compute_num_cpus(). */
static int
compute_num_cpus_impl(void)
@@ -2935,280 +2831,6 @@ tor_gmtime_r(const time_t *timep, struct tm *result)
}
#endif
-#if defined(USE_WIN32_THREADS)
-void
-tor_mutex_init(tor_mutex_t *m)
-{
- InitializeCriticalSection(&m->mutex);
-}
-void
-tor_mutex_uninit(tor_mutex_t *m)
-{
- DeleteCriticalSection(&m->mutex);
-}
-void
-tor_mutex_acquire(tor_mutex_t *m)
-{
- tor_assert(m);
- EnterCriticalSection(&m->mutex);
-}
-void
-tor_mutex_release(tor_mutex_t *m)
-{
- LeaveCriticalSection(&m->mutex);
-}
-unsigned long
-tor_get_thread_id(void)
-{
- return (unsigned long)GetCurrentThreadId();
-}
-#elif defined(USE_PTHREADS)
-/** A mutex attribute that we're going to use to tell pthreads that we want
- * "reentrant" mutexes (i.e., once we can re-lock if we're already holding
- * them.) */
-static pthread_mutexattr_t attr_reentrant;
-/** Initialize <b>mutex</b> so it can be locked. Every mutex must be set
- * up with tor_mutex_init() or tor_mutex_new(); not both. */
-void
-tor_mutex_init(tor_mutex_t *mutex)
-{
- int err;
- if (PREDICT_UNLIKELY(!threads_initialized))
- tor_threads_init();
- err = pthread_mutex_init(&mutex->mutex, &attr_reentrant);
- if (PREDICT_UNLIKELY(err)) {
- log_err(LD_GENERAL, "Error %d creating a mutex.", err);
- tor_fragile_assert();
- }
-}
-/** Wait until <b>m</b> is free, then acquire it. */
-void
-tor_mutex_acquire(tor_mutex_t *m)
-{
- int err;
- tor_assert(m);
- err = pthread_mutex_lock(&m->mutex);
- if (PREDICT_UNLIKELY(err)) {
- log_err(LD_GENERAL, "Error %d locking a mutex.", err);
- tor_fragile_assert();
- }
-}
-/** Release the lock <b>m</b> so another thread can have it. */
-void
-tor_mutex_release(tor_mutex_t *m)
-{
- int err;
- tor_assert(m);
- err = pthread_mutex_unlock(&m->mutex);
- if (PREDICT_UNLIKELY(err)) {
- log_err(LD_GENERAL, "Error %d unlocking a mutex.", err);
- tor_fragile_assert();
- }
-}
-/** Clean up the mutex <b>m</b> so that it no longer uses any system
- * resources. Does not free <b>m</b>. This function must only be called on
- * mutexes from tor_mutex_init(). */
-void
-tor_mutex_uninit(tor_mutex_t *m)
-{
- int err;
- tor_assert(m);
- err = pthread_mutex_destroy(&m->mutex);
- if (PREDICT_UNLIKELY(err)) {
- log_err(LD_GENERAL, "Error %d destroying a mutex.", err);
- tor_fragile_assert();
- }
-}
-/** Return an integer representing this thread. */
-unsigned long
-tor_get_thread_id(void)
-{
- union {
- pthread_t thr;
- unsigned long id;
- } r;
- r.thr = pthread_self();
- return r.id;
-}
-#endif
-
-/** Return a newly allocated, ready-for-use mutex. */
-tor_mutex_t *
-tor_mutex_new(void)
-{
- tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t));
- tor_mutex_init(m);
- return m;
-}
-/** Release all storage and system resources held by <b>m</b>. */
-void
-tor_mutex_free(tor_mutex_t *m)
-{
- if (!m)
- return;
- tor_mutex_uninit(m);
- tor_free(m);
-}
-
-/* Conditions. */
-#ifdef USE_PTHREADS
-#if 0
-/** Cross-platform condition implementation. */
-struct tor_cond_t {
- pthread_cond_t cond;
-};
-/** Return a newly allocated condition, with nobody waiting on it. */
-tor_cond_t *
-tor_cond_new(void)
-{
- tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t));
- if (pthread_cond_init(&cond->cond, NULL)) {
- tor_free(cond);
- return NULL;
- }
- return cond;
-}
-/** Release all resources held by <b>cond</b>. */
-void
-tor_cond_free(tor_cond_t *cond)
-{
- if (!cond)
- return;
- if (pthread_cond_destroy(&cond->cond)) {
- log_warn(LD_GENERAL,"Error freeing condition: %s", strerror(errno));
- return;
- }
- tor_free(cond);
-}
-/** Wait until one of the tor_cond_signal functions is called on <b>cond</b>.
- * All waiters on the condition must wait holding the same <b>mutex</b>.
- * Returns 0 on success, negative on failure. */
-int
-tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex)
-{
- return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0;
-}
-/** Wake up one of the waiters on <b>cond</b>. */
-void
-tor_cond_signal_one(tor_cond_t *cond)
-{
- pthread_cond_signal(&cond->cond);
-}
-/** Wake up all of the waiters on <b>cond</b>. */
-void
-tor_cond_signal_all(tor_cond_t *cond)
-{
- pthread_cond_broadcast(&cond->cond);
-}
-#endif
-/** Set up common structures for use by threading. */
-void
-tor_threads_init(void)
-{
- if (!threads_initialized) {
- pthread_mutexattr_init(&attr_reentrant);
- pthread_mutexattr_settype(&attr_reentrant, PTHREAD_MUTEX_RECURSIVE);
- tor_assert(0==pthread_attr_init(&attr_detached));
- tor_assert(0==pthread_attr_setdetachstate(&attr_detached, 1));
- threads_initialized = 1;
- set_main_thread();
- }
-}
-#elif defined(USE_WIN32_THREADS)
-#if 0
-static DWORD cond_event_tls_index;
-struct tor_cond_t {
- CRITICAL_SECTION mutex;
- smartlist_t *events;
-};
-tor_cond_t *
-tor_cond_new(void)
-{
- tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t));
- InitializeCriticalSection(&cond->mutex);
- cond->events = smartlist_new();
- return cond;
-}
-void
-tor_cond_free(tor_cond_t *cond)
-{
- if (!cond)
- return;
- DeleteCriticalSection(&cond->mutex);
- /* XXXX notify? */
- smartlist_free(cond->events);
- tor_free(cond);
-}
-int
-tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex)
-{
- HANDLE event;
- int r;
- tor_assert(cond);
- tor_assert(mutex);
- event = TlsGetValue(cond_event_tls_index);
- if (!event) {
- event = CreateEvent(0, FALSE, FALSE, NULL);
- TlsSetValue(cond_event_tls_index, event);
- }
- EnterCriticalSection(&cond->mutex);
-
- tor_assert(WaitForSingleObject(event, 0) == WAIT_TIMEOUT);
- tor_assert(!smartlist_contains(cond->events, event));
- smartlist_add(cond->events, event);
-
- LeaveCriticalSection(&cond->mutex);
-
- tor_mutex_release(mutex);
- r = WaitForSingleObject(event, INFINITE);
- tor_mutex_acquire(mutex);
-
- switch (r) {
- case WAIT_OBJECT_0: /* we got the mutex normally. */
- break;
- case WAIT_ABANDONED: /* holding thread exited. */
- case WAIT_TIMEOUT: /* Should never happen. */
- tor_assert(0);
- break;
- case WAIT_FAILED:
- log_warn(LD_GENERAL, "Failed to acquire mutex: %d",(int) GetLastError());
- }
- return 0;
-}
-void
-tor_cond_signal_one(tor_cond_t *cond)
-{
- HANDLE event;
- tor_assert(cond);
-
- EnterCriticalSection(&cond->mutex);
-
- if ((event = smartlist_pop_last(cond->events)))
- SetEvent(event);
-
- LeaveCriticalSection(&cond->mutex);
-}
-void
-tor_cond_signal_all(tor_cond_t *cond)
-{
- tor_assert(cond);
-
- EnterCriticalSection(&cond->mutex);
- SMARTLIST_FOREACH(cond->events, HANDLE, event, SetEvent(event));
- smartlist_clear(cond->events);
- LeaveCriticalSection(&cond->mutex);
-}
-#endif
-void
-tor_threads_init(void)
-{
-#if 0
- cond_event_tls_index = TlsAlloc();
-#endif
- set_main_thread();
-}
-#endif
-
#if defined(HAVE_MLOCKALL) && HAVE_DECL_MLOCKALL && defined(RLIMIT_MEMLOCK)
/** Attempt to raise the current and max rlimit to infinity for our process.
* This only needs to be done once and can probably only be done when we have
@@ -3292,23 +2914,6 @@ tor_mlockall(void)
#endif
}
-/** Identity of the "main" thread */
-static unsigned long main_thread_id = -1;
-
-/** Start considering the current thread to be the 'main thread'. This has
- * no effect on anything besides in_main_thread(). */
-void
-set_main_thread(void)
-{
- main_thread_id = tor_get_thread_id();
-}
-/** Return true iff called from the main thread. */
-int
-in_main_thread(void)
-{
- return main_thread_id == tor_get_thread_id();
-}
-
/**
* On Windows, WSAEWOULDBLOCK is not always correct: when you see it,
* you need to ask the socket for its actual errno. Also, you need to
diff --git a/src/common/compat.h b/src/common/compat.h
index 04e8cb267c..23f8614196 100644
--- a/src/common/compat.h
+++ b/src/common/compat.h
@@ -36,9 +36,6 @@
#ifdef HAVE_STRING_H
#include <string.h>
#endif
-#if defined(HAVE_PTHREAD_H) && !defined(_WIN32)
-#include <pthread.h>
-#endif
#include <stdarg.h>
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h>
@@ -642,61 +639,10 @@ char **get_environment(void);
int get_total_system_memory(size_t *mem_out);
-int spawn_func(void (*func)(void *), void *data);
-void spawn_exit(void) ATTR_NORETURN;
-
-#if defined(_WIN32)
-#define USE_WIN32_THREADS
-#elif defined(HAVE_PTHREAD_H) && defined(HAVE_PTHREAD_CREATE)
-#define USE_PTHREADS
-#else
-#error "No threading system was found"
-#endif
-
int compute_num_cpus(void);
-/* Because we use threads instead of processes on most platforms (Windows,
- * Linux, etc), we need locking for them. On platforms with poor thread
- * support or broken gethostbyname_r, these functions are no-ops. */
-
-/** A generic lock structure for multithreaded builds. */
-typedef struct tor_mutex_t {
-#if defined(USE_WIN32_THREADS)
- /** Windows-only: on windows, we implement locks with CRITICAL_SECTIONS. */
- CRITICAL_SECTION mutex;
-#elif defined(USE_PTHREADS)
- /** Pthreads-only: with pthreads, we implement locks with
- * pthread_mutex_t. */
- pthread_mutex_t mutex;
-#else
- /** No-threads only: Dummy variable so that tor_mutex_t takes up space. */
- int _unused;
-#endif
-} tor_mutex_t;
-
int tor_mlockall(void);
-tor_mutex_t *tor_mutex_new(void);
-void tor_mutex_init(tor_mutex_t *m);
-void tor_mutex_acquire(tor_mutex_t *m);
-void tor_mutex_release(tor_mutex_t *m);
-void tor_mutex_free(tor_mutex_t *m);
-void tor_mutex_uninit(tor_mutex_t *m);
-unsigned long tor_get_thread_id(void);
-void tor_threads_init(void);
-
-void set_main_thread(void);
-int in_main_thread(void);
-
-#if 0
-typedef struct tor_cond_t tor_cond_t;
-tor_cond_t *tor_cond_new(void);
-void tor_cond_free(tor_cond_t *cond);
-int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex);
-void tor_cond_signal_one(tor_cond_t *cond);
-void tor_cond_signal_all(tor_cond_t *cond);
-#endif
-
/** Macros for MIN/MAX. Never use these when the arguments could have
* side-effects.
* {With GCC extensions we could probably define a safer MIN/MAX. But
@@ -742,5 +688,8 @@ STATIC int tor_ersatz_socketpair(int family, int type, int protocol,
#endif
#endif
+/* This needs some of the declarations above so we include it here. */
+#include "compat_threads.h"
+
#endif
diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c
new file mode 100644
index 0000000000..e1d4b0e79b
--- /dev/null
+++ b/src/common/compat_pthreads.c
@@ -0,0 +1,285 @@
+/* Copyright (c) 2003-2004, Roger Dingledine
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2015, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "orconfig.h"
+#include <pthread.h>
+#include <signal.h>
+#include <time.h>
+
+#include "compat.h"
+#include "torlog.h"
+#include "util.h"
+
+/** Wraps a void (*)(void*) function and its argument so we can
+ * invoke them in a way pthreads would expect.
+ */
+typedef struct tor_pthread_data_t {
+ void (*func)(void *);
+ void *data;
+} tor_pthread_data_t;
+/** Given a tor_pthread_data_t <b>_data</b>, call _data-&gt;func(d-&gt;data)
+ * and free _data. Used to make sure we can call functions the way pthread
+ * expects. */
+static void *
+tor_pthread_helper_fn(void *_data)
+{
+ tor_pthread_data_t *data = _data;
+ void (*func)(void*);
+ void *arg;
+ /* mask signals to worker threads to avoid SIGPIPE, etc */
+ sigset_t sigs;
+ /* We're in a subthread; don't handle any signals here. */
+ sigfillset(&sigs);
+ pthread_sigmask(SIG_SETMASK, &sigs, NULL);
+
+ func = data->func;
+ arg = data->data;
+ tor_free(_data);
+ func(arg);
+ return NULL;
+}
+/**
+ * A pthread attribute to make threads start detached.
+ */
+static pthread_attr_t attr_detached;
+/** True iff we've called tor_threads_init() */
+static int threads_initialized = 0;
+
+/** Minimalist interface to run a void function in the background. On
+ * Unix calls fork, on win32 calls beginthread. Returns -1 on failure.
+ * func should not return, but rather should call spawn_exit.
+ *
+ * NOTE: if <b>data</b> is used, it should not be allocated on the stack,
+ * since in a multithreaded environment, there is no way to be sure that
+ * the caller's stack will still be around when the called function is
+ * running.
+ */
+int
+spawn_func(void (*func)(void *), void *data)
+{
+ pthread_t thread;
+ tor_pthread_data_t *d;
+ if (PREDICT_UNLIKELY(!threads_initialized))
+ tor_threads_init();
+ d = tor_malloc(sizeof(tor_pthread_data_t));
+ d->data = data;
+ d->func = func;
+ if (pthread_create(&thread,&attr_detached,tor_pthread_helper_fn,d))
+ return -1;
+ return 0;
+}
+
+/** End the current thread/process.
+ */
+void
+spawn_exit(void)
+{
+ pthread_exit(NULL);
+}
+
+/** A mutex attribute that we're going to use to tell pthreads that we want
+ * "recursive" mutexes (i.e., once we can re-lock if we're already holding
+ * them.) */
+static pthread_mutexattr_t attr_recursive;
+
+/** Initialize <b>mutex</b> so it can be locked. Every mutex must be set
+ * up with tor_mutex_init() or tor_mutex_new(); not both. */
+void
+tor_mutex_init(tor_mutex_t *mutex)
+{
+ int err;
+ if (PREDICT_UNLIKELY(!threads_initialized))
+ tor_threads_init();
+ err = pthread_mutex_init(&mutex->mutex, &attr_recursive);
+ if (PREDICT_UNLIKELY(err)) {
+ log_err(LD_GENERAL, "Error %d creating a mutex.", err);
+ tor_fragile_assert();
+ }
+}
+
+/** As tor_mutex_init, but initialize a mutex suitable that may be
+ * non-recursive, if the OS supports that. */
+void
+tor_mutex_init_nonrecursive(tor_mutex_t *mutex)
+{
+ int err;
+ if (PREDICT_UNLIKELY(!threads_initialized))
+ tor_threads_init();
+ err = pthread_mutex_init(&mutex->mutex, NULL);
+ if (PREDICT_UNLIKELY(err)) {
+ log_err(LD_GENERAL, "Error %d creating a mutex.", err);
+ tor_fragile_assert();
+ }
+}
+
+/** Wait until <b>m</b> is free, then acquire it. */
+void
+tor_mutex_acquire(tor_mutex_t *m)
+{
+ int err;
+ tor_assert(m);
+ err = pthread_mutex_lock(&m->mutex);
+ if (PREDICT_UNLIKELY(err)) {
+ log_err(LD_GENERAL, "Error %d locking a mutex.", err);
+ tor_fragile_assert();
+ }
+}
+/** Release the lock <b>m</b> so another thread can have it. */
+void
+tor_mutex_release(tor_mutex_t *m)
+{
+ int err;
+ tor_assert(m);
+ err = pthread_mutex_unlock(&m->mutex);
+ if (PREDICT_UNLIKELY(err)) {
+ log_err(LD_GENERAL, "Error %d unlocking a mutex.", err);
+ tor_fragile_assert();
+ }
+}
+/** Clean up the mutex <b>m</b> so that it no longer uses any system
+ * resources. Does not free <b>m</b>. This function must only be called on
+ * mutexes from tor_mutex_init(). */
+void
+tor_mutex_uninit(tor_mutex_t *m)
+{
+ int err;
+ tor_assert(m);
+ err = pthread_mutex_destroy(&m->mutex);
+ if (PREDICT_UNLIKELY(err)) {
+ log_err(LD_GENERAL, "Error %d destroying a mutex.", err);
+ tor_fragile_assert();
+ }
+}
+/** Return an integer representing this thread. */
+unsigned long
+tor_get_thread_id(void)
+{
+ union {
+ pthread_t thr;
+ unsigned long id;
+ } r;
+ r.thr = pthread_self();
+ return r.id;
+}
+
+/* Conditions. */
+
+/** Initialize an already-allocated condition variable. */
+int
+tor_cond_init(tor_cond_t *cond)
+{
+ pthread_condattr_t condattr;
+
+ memset(cond, 0, sizeof(tor_cond_t));
+ /* Default condition attribute. Might be used if clock monotonic is
+ * available else this won't affect anything. */
+ if (pthread_condattr_init(&condattr)) {
+ return -1;
+ }
+
+#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
+ /* Use monotonic time so when we timedwait() on it, any clock adjustment
+ * won't affect the timeout value. */
+ if (pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC)) {
+ return -1;
+ }
+#endif
+ if (pthread_cond_init(&cond->cond, &condattr)) {
+ return -1;
+ }
+ return 0;
+}
+
+/** Release all resources held by <b>cond</b>, but do not free <b>cond</b>
+ * itself. */
+void
+tor_cond_uninit(tor_cond_t *cond)
+{
+ if (pthread_cond_destroy(&cond->cond)) {
+ log_warn(LD_GENERAL,"Error freeing condition: %s", strerror(errno));
+ return;
+ }
+}
+/** Wait until one of the tor_cond_signal functions is called on <b>cond</b>.
+ * (If <b>tv</b> is set, and that amount of time passes with no signal to
+ * <b>cond</b>, return anyway. All waiters on the condition must wait holding
+ * the same <b>mutex</b>. All signallers should hold that mutex. The mutex
+ * needs to have been allocated with tor_mutex_init_for_cond().
+ *
+ * Returns 0 on success, -1 on failure, 1 on timeout. */
+int
+tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
+{
+ int r;
+ if (tv == NULL) {
+ while (1) {
+ r = pthread_cond_wait(&cond->cond, &mutex->mutex);
+ if (r == EINTR) {
+ /* EINTR should be impossible according to POSIX, but POSIX, like the
+ * Pirate's Code, is apparently treated "more like what you'd call
+ * guidelines than actual rules." */
+ continue;
+ }
+ return r ? -1 : 0;
+ }
+ } else {
+ struct timeval tvnow, tvsum;
+ struct timespec ts;
+ while (1) {
+#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
+ if (clock_gettime(CLOCK_MONOTONIC, &ts) < 0) {
+ return -1;
+ }
+ tvnow.tv_sec = ts.tv_sec;
+ tvnow.tv_usec = ts.tv_nsec / 1000;
+ timeradd(tv, &tvnow, &tvsum);
+#else
+ if (gettimeofday(&tvnow, NULL) < 0)
+ return -1;
+ timeradd(tv, &tvnow, &tvsum);
+#endif /* HAVE_CLOCK_GETTIME, CLOCK_MONOTONIC */
+
+ ts.tv_sec = tvsum.tv_sec;
+ ts.tv_nsec = tvsum.tv_usec * 1000;
+
+ r = pthread_cond_timedwait(&cond->cond, &mutex->mutex, &ts);
+ if (r == 0)
+ return 0;
+ else if (r == ETIMEDOUT)
+ return 1;
+ else if (r == EINTR)
+ continue;
+ else
+ return -1;
+ }
+ }
+}
+/** Wake up one of the waiters on <b>cond</b>. */
+void
+tor_cond_signal_one(tor_cond_t *cond)
+{
+ pthread_cond_signal(&cond->cond);
+}
+/** Wake up all of the waiters on <b>cond</b>. */
+void
+tor_cond_signal_all(tor_cond_t *cond)
+{
+ pthread_cond_broadcast(&cond->cond);
+}
+
+/** Set up common structures for use by threading. */
+void
+tor_threads_init(void)
+{
+ if (!threads_initialized) {
+ pthread_mutexattr_init(&attr_recursive);
+ pthread_mutexattr_settype(&attr_recursive, PTHREAD_MUTEX_RECURSIVE);
+ tor_assert(0==pthread_attr_init(&attr_detached));
+ tor_assert(0==pthread_attr_setdetachstate(&attr_detached, 1));
+ threads_initialized = 1;
+ set_main_thread();
+ }
+}
+
diff --git a/src/common/compat_threads.c b/src/common/compat_threads.c
new file mode 100644
index 0000000000..3b79292cdb
--- /dev/null
+++ b/src/common/compat_threads.c
@@ -0,0 +1,303 @@
+/* Copyright (c) 2003-2004, Roger Dingledine
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2015, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#define _GNU_SOURCE
+
+#include "orconfig.h"
+#define _GNU_SOURCE
+#include <stdlib.h>
+#include "compat.h"
+#include "compat_threads.h"
+
+#include "util.h"
+#include "torlog.h"
+
+#ifdef HAVE_SYS_EVENTFD_H
+#include <sys/eventfd.h>
+#endif
+#ifdef HAVE_FCNTL_H
+#include <fcntl.h>
+#endif
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+
+/** Return a newly allocated, ready-for-use mutex. */
+tor_mutex_t *
+tor_mutex_new(void)
+{
+ tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t));
+ tor_mutex_init(m);
+ return m;
+}
+/** Return a newly allocated, ready-for-use mutex. This one might be
+ * non-recursive, if that's faster. */
+tor_mutex_t *
+tor_mutex_new_nonrecursive(void)
+{
+ tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t));
+ tor_mutex_init_nonrecursive(m);
+ return m;
+}
+/** Release all storage and system resources held by <b>m</b>. */
+void
+tor_mutex_free(tor_mutex_t *m)
+{
+ if (!m)
+ return;
+ tor_mutex_uninit(m);
+ tor_free(m);
+}
+
+/** Allocate and return a new condition variable. */
+tor_cond_t *
+tor_cond_new(void)
+{
+ tor_cond_t *cond = tor_malloc(sizeof(tor_cond_t));
+ if (tor_cond_init(cond)<0)
+ tor_free(cond);
+ return cond;
+}
+
+/** Free all storage held in <b>c</b>. */
+void
+tor_cond_free(tor_cond_t *c)
+{
+ if (!c)
+ return;
+ tor_cond_uninit(c);
+ tor_free(c);
+}
+
+/** Identity of the "main" thread */
+static unsigned long main_thread_id = -1;
+
+/** Start considering the current thread to be the 'main thread'. This has
+ * no effect on anything besides in_main_thread(). */
+void
+set_main_thread(void)
+{
+ main_thread_id = tor_get_thread_id();
+}
+/** Return true iff called from the main thread. */
+int
+in_main_thread(void)
+{
+ return main_thread_id == tor_get_thread_id();
+}
+
+#if defined(HAVE_EVENTFD) || defined(HAVE_PIPE)
+/* non-interruptable versions */
+static int
+write_ni(int fd, const void *buf, size_t n)
+{
+ int r;
+ again:
+ r = write(fd, buf, n);
+ if (r < 0 && errno == EINTR)
+ goto again;
+ return r;
+}
+static int
+read_ni(int fd, void *buf, size_t n)
+{
+ int r;
+ again:
+ r = read(fd, buf, n);
+ if (r < 0 && errno == EINTR)
+ goto again;
+ return r;
+}
+#endif
+
+/* non-interruptable versions */
+static int
+send_ni(int fd, const void *buf, size_t n, int flags)
+{
+ int r;
+ again:
+ r = send(fd, buf, n, flags);
+ if (r < 0 && errno == EINTR)
+ goto again;
+ return r;
+}
+
+static int
+recv_ni(int fd, void *buf, size_t n, int flags)
+{
+ int r;
+ again:
+ r = recv(fd, buf, n, flags);
+ if (r < 0 && errno == EINTR)
+ goto again;
+ return r;
+}
+
+#ifdef HAVE_EVENTFD
+static int
+eventfd_alert(int fd)
+{
+ uint64_t u = 1;
+ int r = write_ni(fd, (void*)&u, sizeof(u));
+ if (r < 0 && errno != EAGAIN)
+ return -1;
+ return 0;
+}
+
+static int
+eventfd_drain(int fd)
+{
+ uint64_t u = 0;
+ int r = read_ni(fd, (void*)&u, sizeof(u));
+ if (r < 0 && errno != EAGAIN)
+ return -1;
+ return 0;
+}
+#endif
+
+#ifdef HAVE_PIPE
+static int
+pipe_alert(int fd)
+{
+ ssize_t r = write(fd, "x", 1);
+ if (r < 0 && errno != EAGAIN)
+ return -1;
+ return 0;
+}
+
+static int
+pipe_drain(int fd)
+{
+ char buf[32];
+ ssize_t r;
+ while ((r = read(fd, buf, sizeof(buf))) >= 0)
+ ;
+ if (r == 0 || errno != EAGAIN)
+ return -1;
+ return 0;
+}
+#endif
+
+static int
+sock_alert(tor_socket_t fd)
+{
+ ssize_t r = send_ni(fd, "x", 1, 0);
+ if (r < 0 && !ERRNO_IS_EAGAIN(tor_socket_errno(fd)))
+ return -1;
+ return 0;
+}
+
+static int
+sock_drain(tor_socket_t fd)
+{
+ char buf[32];
+ ssize_t r;
+ while ((r = recv_ni(fd, buf, sizeof(buf), 0)) >= 0)
+ ;
+ if (r == 0 || !ERRNO_IS_EAGAIN(tor_socket_errno(fd)))
+ return -1;
+ return 0;
+}
+
+/** Allocate a new set of alert sockets, and set the appropriate function
+ * pointers, in <b>socks_out</b>. */
+int
+alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags)
+{
+ tor_socket_t socks[2] = { TOR_INVALID_SOCKET, TOR_INVALID_SOCKET };
+
+#ifdef HAVE_EVENTFD
+ /* First, we try the Linux eventfd() syscall. This gives a 64-bit counter
+ * associated with a single file descriptor. */
+#if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)
+ if (!(flags & ASOCKS_NOEVENTFD2))
+ socks[0] = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+#endif
+ if (socks[0] < 0 && !(flags & ASOCKS_NOEVENTFD)) {
+ socks[0] = eventfd(0,0);
+ if (socks[0] >= 0) {
+ if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 ||
+ set_socket_nonblocking(socks[0]) < 0) {
+ close(socks[0]);
+ return -1;
+ }
+ }
+ }
+ if (socks[0] >= 0) {
+ socks_out->read_fd = socks_out->write_fd = socks[0];
+ socks_out->alert_fn = eventfd_alert;
+ socks_out->drain_fn = eventfd_drain;
+ return 0;
+ }
+#endif
+
+#ifdef HAVE_PIPE2
+ /* Now we're going to try pipes. First type the pipe2() syscall, if we
+ * have it, so we can save some calls... */
+ if (!(flags & ASOCKS_NOPIPE2) &&
+ pipe2(socks, O_NONBLOCK|O_CLOEXEC) == 0) {
+ socks_out->read_fd = socks[0];
+ socks_out->write_fd = socks[1];
+ socks_out->alert_fn = pipe_alert;
+ socks_out->drain_fn = pipe_drain;
+ return 0;
+ }
+#endif
+
+#ifdef HAVE_PIPE
+ /* Now try the regular pipe() syscall. Pipes have a bit lower overhead than
+ * socketpairs, fwict. */
+ if (!(flags & ASOCKS_NOPIPE) &&
+ pipe(socks) == 0) {
+ if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 ||
+ fcntl(socks[1], F_SETFD, FD_CLOEXEC) < 0 ||
+ set_socket_nonblocking(socks[0]) < 0 ||
+ set_socket_nonblocking(socks[1]) < 0) {
+ close(socks[0]);
+ close(socks[1]);
+ return -1;
+ }
+ socks_out->read_fd = socks[0];
+ socks_out->write_fd = socks[1];
+ socks_out->alert_fn = pipe_alert;
+ socks_out->drain_fn = pipe_drain;
+ return 0;
+ }
+#endif
+
+ /* If nothing else worked, fall back on socketpair(). */
+ if (!(flags & ASOCKS_NOSOCKETPAIR) &&
+ tor_socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == 0) {
+ if (set_socket_nonblocking(socks[0]) < 0 ||
+ set_socket_nonblocking(socks[1])) {
+ tor_close_socket(socks[0]);
+ tor_close_socket(socks[1]);
+ return -1;
+ }
+ socks_out->read_fd = socks[0];
+ socks_out->write_fd = socks[1];
+ socks_out->alert_fn = sock_alert;
+ socks_out->drain_fn = sock_drain;
+ return 0;
+ }
+ return -1;
+}
+
+/** Close the sockets in <b>socks</b>. */
+void
+alert_sockets_close(alert_sockets_t *socks)
+{
+ if (socks->alert_fn == sock_alert) {
+ /* they are sockets. */
+ tor_close_socket(socks->read_fd);
+ tor_close_socket(socks->write_fd);
+ } else {
+ close(socks->read_fd);
+ if (socks->write_fd != socks->read_fd)
+ close(socks->write_fd);
+ }
+ socks->read_fd = socks->write_fd = -1;
+}
+
diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h
new file mode 100644
index 0000000000..acf3083f37
--- /dev/null
+++ b/src/common/compat_threads.h
@@ -0,0 +1,115 @@
+/* Copyright (c) 2003-2004, Roger Dingledine
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2015, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_COMPAT_THREADS_H
+#define TOR_COMPAT_THREADS_H
+
+#include "orconfig.h"
+#include "torint.h"
+#include "testsupport.h"
+
+#if defined(HAVE_PTHREAD_H) && !defined(_WIN32)
+#include <pthread.h>
+#endif
+
+#if defined(_WIN32)
+#define USE_WIN32_THREADS
+#elif defined(HAVE_PTHREAD_H) && defined(HAVE_PTHREAD_CREATE)
+#define USE_PTHREADS
+#else
+#error "No threading system was found"
+#endif
+
+int spawn_func(void (*func)(void *), void *data);
+void spawn_exit(void) ATTR_NORETURN;
+
+/* Because we use threads instead of processes on most platforms (Windows,
+ * Linux, etc), we need locking for them. On platforms with poor thread
+ * support or broken gethostbyname_r, these functions are no-ops. */
+
+/** A generic lock structure for multithreaded builds. */
+typedef struct tor_mutex_t {
+#if defined(USE_WIN32_THREADS)
+ /** Windows-only: on windows, we implement locks with CRITICAL_SECTIONS. */
+ CRITICAL_SECTION mutex;
+#elif defined(USE_PTHREADS)
+ /** Pthreads-only: with pthreads, we implement locks with
+ * pthread_mutex_t. */
+ pthread_mutex_t mutex;
+#else
+ /** No-threads only: Dummy variable so that tor_mutex_t takes up space. */
+ int _unused;
+#endif
+} tor_mutex_t;
+
+tor_mutex_t *tor_mutex_new(void);
+tor_mutex_t *tor_mutex_new_nonrecursive(void);
+void tor_mutex_init(tor_mutex_t *m);
+void tor_mutex_init_nonrecursive(tor_mutex_t *m);
+void tor_mutex_acquire(tor_mutex_t *m);
+void tor_mutex_release(tor_mutex_t *m);
+void tor_mutex_free(tor_mutex_t *m);
+void tor_mutex_uninit(tor_mutex_t *m);
+unsigned long tor_get_thread_id(void);
+void tor_threads_init(void);
+
+/** Conditions need nonrecursive mutexes with pthreads. */
+#define tor_mutex_init_for_cond(m) tor_mutex_init_nonrecursive(m)
+
+void set_main_thread(void);
+int in_main_thread(void);
+
+typedef struct tor_cond_t {
+#ifdef USE_PTHREADS
+ pthread_cond_t cond;
+#elif defined(USE_WIN32_THREADS)
+ HANDLE event;
+
+ CRITICAL_SECTION lock;
+ int n_waiting;
+ int n_to_wake;
+ int generation;
+#else
+#error no known condition implementation.
+#endif
+} tor_cond_t;
+
+tor_cond_t *tor_cond_new(void);
+void tor_cond_free(tor_cond_t *cond);
+int tor_cond_init(tor_cond_t *cond);
+void tor_cond_uninit(tor_cond_t *cond);
+int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex,
+ const struct timeval *tv);
+void tor_cond_signal_one(tor_cond_t *cond);
+void tor_cond_signal_all(tor_cond_t *cond);
+
+/** Helper type used to manage waking up the main thread while it's in
+ * the libevent main loop. Used by the work queue code. */
+typedef struct alert_sockets_s {
+ /* XXXX This structure needs a better name. */
+ /** Socket that the main thread should listen for EV_READ events on.
+ * Note that this socket may be a regular fd on a non-Windows platform.
+ */
+ tor_socket_t read_fd;
+ /** Socket to use when alerting the main thread. */
+ tor_socket_t write_fd;
+ /** Function to alert the main thread */
+ int (*alert_fn)(tor_socket_t write_fd);
+ /** Function to make the main thread no longer alerted. */
+ int (*drain_fn)(tor_socket_t read_fd);
+} alert_sockets_t;
+
+/* Flags to disable one or more alert_sockets backends. */
+#define ASOCKS_NOEVENTFD2 (1u<<0)
+#define ASOCKS_NOEVENTFD (1u<<1)
+#define ASOCKS_NOPIPE2 (1u<<2)
+#define ASOCKS_NOPIPE (1u<<3)
+#define ASOCKS_NOSOCKETPAIR (1u<<4)
+
+int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags);
+void alert_sockets_close(alert_sockets_t *socks);
+
+#endif
+
diff --git a/src/common/compat_winthreads.c b/src/common/compat_winthreads.c
new file mode 100644
index 0000000000..71b994c4e4
--- /dev/null
+++ b/src/common/compat_winthreads.c
@@ -0,0 +1,196 @@
+/* Copyright (c) 2003-2004, Roger Dingledine
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2015, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "compat.h"
+#include <windows.h>
+#include <process.h>
+#include "util.h"
+#include "container.h"
+#include "torlog.h"
+#include <process.h>
+
+/* This value is more or less total cargo-cult */
+#define SPIN_COUNT 2000
+
+/** Minimalist interface to run a void function in the background. On
+ * Unix calls fork, on win32 calls beginthread. Returns -1 on failure.
+ * func should not return, but rather should call spawn_exit.
+ *
+ * NOTE: if <b>data</b> is used, it should not be allocated on the stack,
+ * since in a multithreaded environment, there is no way to be sure that
+ * the caller's stack will still be around when the called function is
+ * running.
+ */
+int
+spawn_func(void (*func)(void *), void *data)
+{
+ int rv;
+ rv = (int)_beginthread(func, 0, data);
+ if (rv == (int)-1)
+ return -1;
+ return 0;
+}
+
+/** End the current thread/process.
+ */
+void
+spawn_exit(void)
+{
+ _endthread();
+ //we should never get here. my compiler thinks that _endthread returns, this
+ //is an attempt to fool it.
+ tor_assert(0);
+ _exit(0);
+}
+
+void
+tor_mutex_init(tor_mutex_t *m)
+{
+ InitializeCriticalSection(&m->mutex);
+}
+void
+tor_mutex_init_nonrecursive(tor_mutex_t *m)
+{
+ InitializeCriticalSection(&m->mutex);
+}
+
+void
+tor_mutex_uninit(tor_mutex_t *m)
+{
+ DeleteCriticalSection(&m->mutex);
+}
+void
+tor_mutex_acquire(tor_mutex_t *m)
+{
+ tor_assert(m);
+ EnterCriticalSection(&m->mutex);
+}
+void
+tor_mutex_release(tor_mutex_t *m)
+{
+ LeaveCriticalSection(&m->mutex);
+}
+unsigned long
+tor_get_thread_id(void)
+{
+ return (unsigned long)GetCurrentThreadId();
+}
+
+int
+tor_cond_init(tor_cond_t *cond)
+{
+ memset(cond, 0, sizeof(tor_cond_t));
+ if (InitializeCriticalSectionAndSpinCount(&cond->lock, SPIN_COUNT)==0) {
+ return -1;
+ }
+ if ((cond->event = CreateEvent(NULL,TRUE,FALSE,NULL)) == NULL) {
+ DeleteCriticalSection(&cond->lock);
+ return -1;
+ }
+ cond->n_waiting = cond->n_to_wake = cond->generation = 0;
+ return 0;
+}
+void
+tor_cond_uninit(tor_cond_t *cond)
+{
+ DeleteCriticalSection(&cond->lock);
+ CloseHandle(cond->event);
+}
+
+static void
+tor_cond_signal_impl(tor_cond_t *cond, int broadcast)
+{
+ EnterCriticalSection(&cond->lock);
+ if (broadcast)
+ cond->n_to_wake = cond->n_waiting;
+ else
+ ++cond->n_to_wake;
+ cond->generation++;
+ SetEvent(cond->event);
+ LeaveCriticalSection(&cond->lock);
+}
+void
+tor_cond_signal_one(tor_cond_t *cond)
+{
+ tor_cond_signal_impl(cond, 0);
+}
+void
+tor_cond_signal_all(tor_cond_t *cond)
+{
+ tor_cond_signal_impl(cond, 1);
+}
+
+int
+tor_cond_wait(tor_cond_t *cond, tor_mutex_t *lock_, const struct timeval *tv)
+{
+ CRITICAL_SECTION *lock = &lock_->mutex;
+ int generation_at_start;
+ int waiting = 1;
+ int result = -1;
+ DWORD ms = INFINITE, ms_orig = INFINITE, startTime, endTime;
+ if (tv)
+ ms_orig = ms = tv->tv_sec*1000 + (tv->tv_usec+999)/1000;
+
+ EnterCriticalSection(&cond->lock);
+ ++cond->n_waiting;
+ generation_at_start = cond->generation;
+ LeaveCriticalSection(&cond->lock);
+
+ LeaveCriticalSection(lock);
+
+ startTime = GetTickCount();
+ do {
+ DWORD res;
+ res = WaitForSingleObject(cond->event, ms);
+ EnterCriticalSection(&cond->lock);
+ if (cond->n_to_wake &&
+ cond->generation != generation_at_start) {
+ --cond->n_to_wake;
+ --cond->n_waiting;
+ result = 0;
+ waiting = 0;
+ goto out;
+ } else if (res != WAIT_OBJECT_0) {
+ result = (res==WAIT_TIMEOUT) ? 1 : -1;
+ --cond->n_waiting;
+ waiting = 0;
+ goto out;
+ } else if (ms != INFINITE) {
+ endTime = GetTickCount();
+ if (startTime + ms_orig <= endTime) {
+ result = 1; /* Timeout */
+ --cond->n_waiting;
+ waiting = 0;
+ goto out;
+ } else {
+ ms = startTime + ms_orig - endTime;
+ }
+ }
+ /* If we make it here, we are still waiting. */
+ if (cond->n_to_wake == 0) {
+ /* There is nobody else who should wake up; reset
+ * the event. */
+ ResetEvent(cond->event);
+ }
+ out:
+ LeaveCriticalSection(&cond->lock);
+ } while (waiting);
+
+ EnterCriticalSection(lock);
+
+ EnterCriticalSection(&cond->lock);
+ if (!cond->n_waiting)
+ ResetEvent(cond->event);
+ LeaveCriticalSection(&cond->lock);
+
+ return result;
+}
+
+void
+tor_threads_init(void)
+{
+ set_main_thread();
+}
+
diff --git a/src/common/include.am b/src/common/include.am
index 6441596199..14838ab555 100644
--- a/src/common/include.am
+++ b/src/common/include.am
@@ -54,10 +54,18 @@ endif
LIBDONNA += $(LIBED25519_REF10)
+if THREADS_PTHREADS
+threads_impl_source=src/common/compat_pthreads.c
+endif
+if THREADS_WIN32
+threads_impl_source=src/common/compat_winthreads.c
+endif
+
LIBOR_A_SOURCES = \
src/common/address.c \
src/common/backtrace.c \
src/common/compat.c \
+ src/common/compat_threads.c \
src/common/container.c \
src/common/di_ops.c \
src/common/log.c \
@@ -66,10 +74,12 @@ LIBOR_A_SOURCES = \
src/common/util_codedigest.c \
src/common/util_process.c \
src/common/sandbox.c \
+ src/common/workqueue.c \
src/ext/csiphash.c \
src/ext/trunnel/trunnel.c \
$(libor_extra_source) \
- $(libor_mempool_source)
+ $(libor_mempool_source) \
+ $(threads_impl_source)
LIBOR_CRYPTO_A_SOURCES = \
src/common/aes.c \
@@ -102,7 +112,6 @@ src_common_libor_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
src_common_libor_crypto_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
src_common_libor_event_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
-
COMMONHEADERS = \
src/common/address.h \
src/common/backtrace.h \
@@ -110,6 +119,7 @@ COMMONHEADERS = \
src/common/ciphers.inc \
src/common/compat.h \
src/common/compat_libevent.h \
+ src/common/compat_threads.h \
src/common/container.h \
src/common/crypto.h \
src/common/crypto_curve25519.h \
@@ -128,6 +138,7 @@ COMMONHEADERS = \
src/common/tortls.h \
src/common/util.h \
src/common/util_process.h \
+ src/common/workqueue.h \
$(libor_mempool_header)
noinst_HEADERS+= $(COMMONHEADERS)
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
new file mode 100644
index 0000000000..77a4fbc3f6
--- /dev/null
+++ b/src/common/workqueue.c
@@ -0,0 +1,490 @@
+/* copyright (c) 2013-2015, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "orconfig.h"
+#include "compat.h"
+#include "compat_threads.h"
+#include "util.h"
+#include "workqueue.h"
+#include "tor_queue.h"
+#include "torlog.h"
+
+struct threadpool_s {
+ /** An array of pointers to workerthread_t: one for each running worker
+ * thread. */
+ struct workerthread_s **threads;
+
+ /** Condition variable that we wait on when we have no work, and which
+ * gets signaled when our queue becomes nonempty. */
+ tor_cond_t condition;
+ /** Queue of pending work that we have to do. */
+ TOR_TAILQ_HEAD(, workqueue_entry_s) work;
+
+ /** The current 'update generation' of the threadpool. Any thread that is
+ * at an earlier generation needs to run the update function. */
+ unsigned generation;
+
+ /** Function that should be run for updates on each thread. */
+ int (*update_fn)(void *, void *);
+ /** Function to free update arguments if they can't be run. */
+ void (*free_update_arg_fn)(void *);
+ /** Array of n_threads update arguments. */
+ void **update_args;
+
+ /** Number of elements in threads. */
+ int n_threads;
+ /** Mutex to protect all the above fields. */
+ tor_mutex_t lock;
+
+ /** A reply queue to use when constructing new threads. */
+ replyqueue_t *reply_queue;
+
+ /** Functions used to allocate and free thread state. */
+ void *(*new_thread_state_fn)(void*);
+ void (*free_thread_state_fn)(void*);
+ void *new_thread_state_arg;
+};
+
+struct workqueue_entry_s {
+ /** The next workqueue_entry_t that's pending on the same thread or
+ * reply queue. */
+ TOR_TAILQ_ENTRY(workqueue_entry_s) next_work;
+ /** The threadpool to which this workqueue_entry_t was assigned. This field
+ * is set when the workqueue_entry_t is created, and won't be cleared until
+ * after it's handled in the main thread. */
+ struct threadpool_s *on_pool;
+ /** True iff this entry is waiting for a worker to start processing it. */
+ uint8_t pending;
+ /** Function to run in the worker thread. */
+ int (*fn)(void *state, void *arg);
+ /** Function to run while processing the reply queue. */
+ void (*reply_fn)(void *arg);
+ /** Argument for the above functions. */
+ void *arg;
+};
+
+struct replyqueue_s {
+ /** Mutex to protect the answers field */
+ tor_mutex_t lock;
+ /** Doubly-linked list of answers that the reply queue needs to handle. */
+ TOR_TAILQ_HEAD(, workqueue_entry_s) answers;
+
+ /** Mechanism to wake up the main thread when it is receiving answers. */
+ alert_sockets_t alert;
+};
+
+/** A worker thread represents a single thread in a thread pool. To avoid
+ * contention, each gets its own queue. This breaks the guarantee that that
+ * queued work will get executed strictly in order. */
+typedef struct workerthread_s {
+ /** Which thread it this? In range 0..in_pool->n_threads-1 */
+ int index;
+ /** The pool this thread is a part of. */
+ struct threadpool_s *in_pool;
+ /** User-supplied state field that we pass to the worker functions of each
+ * work item. */
+ void *state;
+ /** Reply queue to which we pass our results. */
+ replyqueue_t *reply_queue;
+ /** The current update generation of this thread */
+ unsigned generation;
+} workerthread_t;
+
+static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
+
+/** Allocate and return a new workqueue_entry_t, set up to run the function
+ * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
+ * thread. See threadpool_queue_work() for full documentation. */
+static workqueue_entry_t *
+workqueue_entry_new(int (*fn)(void*, void*),
+ void (*reply_fn)(void*),
+ void *arg)
+{
+ workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
+ ent->fn = fn;
+ ent->reply_fn = reply_fn;
+ ent->arg = arg;
+ return ent;
+}
+
+/**
+ * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on
+ * any queue.
+ */
+static void
+workqueue_entry_free(workqueue_entry_t *ent)
+{
+ if (!ent)
+ return;
+ memset(ent, 0xf0, sizeof(*ent));
+ tor_free(ent);
+}
+
+/**
+ * Cancel a workqueue_entry_t that has been returned from
+ * threadpool_queue_work.
+ *
+ * You must not call this function on any work whose reply function has been
+ * executed in the main thread; that will cause undefined behavior (probably,
+ * a crash).
+ *
+ * If the work is cancelled, this function return the argument passed to the
+ * work function. It is the caller's responsibility to free this storage.
+ *
+ * This function will have no effect if the worker thread has already executed
+ * or begun to execute the work item. In that case, it will return NULL.
+ */
+void *
+workqueue_entry_cancel(workqueue_entry_t *ent)
+{
+ int cancelled = 0;
+ void *result = NULL;
+ tor_mutex_acquire(&ent->on_pool->lock);
+ if (ent->pending) {
+ TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work);
+ cancelled = 1;
+ result = ent->arg;
+ }
+ tor_mutex_release(&ent->on_pool->lock);
+
+ if (cancelled) {
+ tor_free(ent);
+ }
+ return result;
+}
+
+/**DOCDOC
+
+ must hold lock */
+static int
+worker_thread_has_work(workerthread_t *thread)
+{
+ return !TOR_TAILQ_EMPTY(&thread->in_pool->work) ||
+ thread->generation != thread->in_pool->generation;
+}
+
+/**
+ * Main function for the worker thread.
+ */
+static void
+worker_thread_main(void *thread_)
+{
+ workerthread_t *thread = thread_;
+ threadpool_t *pool = thread->in_pool;
+ workqueue_entry_t *work;
+ int result;
+
+ tor_mutex_acquire(&pool->lock);
+ while (1) {
+ /* lock must be held at this point. */
+ while (worker_thread_has_work(thread)) {
+ /* lock must be held at this point. */
+ if (thread->in_pool->generation != thread->generation) {
+ void *arg = thread->in_pool->update_args[thread->index];
+ thread->in_pool->update_args[thread->index] = NULL;
+ int (*update_fn)(void*,void*) = thread->in_pool->update_fn;
+ thread->generation = thread->in_pool->generation;
+ tor_mutex_release(&pool->lock);
+
+ int r = update_fn(thread->state, arg);
+
+ if (r < 0) {
+ return;
+ }
+
+ tor_mutex_acquire(&pool->lock);
+ continue;
+ }
+ work = TOR_TAILQ_FIRST(&pool->work);
+ TOR_TAILQ_REMOVE(&pool->work, work, next_work);
+ work->pending = 0;
+ tor_mutex_release(&pool->lock);
+
+ /* We run the work function without holding the thread lock. This
+ * is the main thread's first opportunity to give us more work. */
+ result = work->fn(thread->state, work->arg);
+
+ /* Queue the reply for the main thread. */
+ queue_reply(thread->reply_queue, work);
+
+ /* We may need to exit the thread. */
+ if (result >= WQ_RPL_ERROR) {
+ return;
+ }
+ tor_mutex_acquire(&pool->lock);
+ }
+ /* At this point the lock is held, and there is no work in this thread's
+ * queue. */
+
+ /* TODO: support an idle-function */
+
+ /* Okay. Now, wait till somebody has work for us. */
+ if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
+ log_warn(LD_GENERAL, "Fail tor_cond_wait.");
+ }
+ }
+}
+
+/** Put a reply on the reply queue. The reply must not currently be on
+ * any thread's work queue. */
+static void
+queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
+{
+ int was_empty;
+ tor_mutex_acquire(&queue->lock);
+ was_empty = TOR_TAILQ_EMPTY(&queue->answers);
+ TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
+ tor_mutex_release(&queue->lock);
+
+ if (was_empty) {
+ if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
+ /* XXXX complain! */
+ }
+ }
+}
+
+/** Allocate and start a new worker thread to use state object <b>state</b>,
+ * and send responses to <b>replyqueue</b>. */
+static workerthread_t *
+workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
+{
+ workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
+ thr->state = state;
+ thr->reply_queue = replyqueue;
+ thr->in_pool = pool;
+
+ if (spawn_func(worker_thread_main, thr) < 0) {
+ log_err(LD_GENERAL, "Can't launch worker thread.");
+ return NULL;
+ }
+
+ return thr;
+}
+
+/**
+ * Queue an item of work for a thread in a thread pool. The function
+ * <b>fn</b> will be run in a worker thread, and will receive as arguments the
+ * thread's state object, and the provided object <b>arg</b>. It must return
+ * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
+ *
+ * Regardless of its return value, the function <b>reply_fn</b> will later be
+ * run in the main thread when it invokes replyqueue_process(), and will
+ * receive as its argument the same <b>arg</b> object. It's the reply
+ * function's responsibility to free the work object.
+ *
+ * On success, return a workqueue_entry_t object that can be passed to
+ * workqueue_entry_cancel(). On failure, return NULL.
+ *
+ * Note that because each thread has its own work queue, work items may not
+ * be executed strictly in order.
+ */
+workqueue_entry_t *
+threadpool_queue_work(threadpool_t *pool,
+ int (*fn)(void *, void *),
+ void (*reply_fn)(void *),
+ void *arg)
+{
+ workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
+ ent->on_pool = pool;
+ ent->pending = 1;
+
+ tor_mutex_acquire(&pool->lock);
+
+ TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work);
+
+ tor_mutex_release(&pool->lock);
+
+ tor_cond_signal_one(&pool->condition);
+
+ return ent;
+}
+
+/**
+ * Queue a copy of a work item for every thread in a pool. This can be used,
+ * for example, to tell the threads to update some parameter in their states.
+ *
+ * Arguments are as for <b>threadpool_queue_work</b>, except that the
+ * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
+ * make a copy of it.
+ *
+ * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update
+ * will be run. If a new update is scheduled before the old update finishes
+ * running, then the new will replace the old in any threads that haven't run
+ * it yet.
+ *
+ * Return 0 on success, -1 on failure.
+ */
+int
+threadpool_queue_update(threadpool_t *pool,
+ void *(*dup_fn)(void *),
+ int (*fn)(void *, void *),
+ void (*free_fn)(void *),
+ void *arg)
+{
+ int i, n_threads;
+ void (*old_args_free_fn)(void *arg);
+ void **old_args;
+ void **new_args;
+
+ tor_mutex_acquire(&pool->lock);
+ n_threads = pool->n_threads;
+ old_args = pool->update_args;
+ old_args_free_fn = pool->free_update_arg_fn;
+
+ new_args = tor_calloc(n_threads, sizeof(void*));
+ for (i = 0; i < n_threads; ++i) {
+ if (dup_fn)
+ new_args[i] = dup_fn(arg);
+ else
+ new_args[i] = arg;
+ }
+
+ pool->update_args = new_args;
+ pool->free_update_arg_fn = free_fn;
+ pool->update_fn = fn;
+ ++pool->generation;
+
+ tor_mutex_release(&pool->lock);
+
+ tor_cond_signal_all(&pool->condition);
+
+ if (old_args) {
+ for (i = 0; i < n_threads; ++i) {
+ if (old_args[i] && old_args_free_fn)
+ old_args_free_fn(old_args[i]);
+ }
+ tor_free(old_args);
+ }
+
+ return 0;
+}
+
+/** Launch threads until we have <b>n</b>. */
+static int
+threadpool_start_threads(threadpool_t *pool, int n)
+{
+ tor_mutex_acquire(&pool->lock);
+
+ if (pool->n_threads < n)
+ pool->threads = tor_realloc(pool->threads, sizeof(workerthread_t*)*n);
+
+ while (pool->n_threads < n) {
+ void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
+ workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue);
+ thr->index = pool->n_threads;
+
+ if (!thr) {
+ tor_mutex_release(&pool->lock);
+ return -1;
+ }
+ pool->threads[pool->n_threads++] = thr;
+ }
+ tor_mutex_release(&pool->lock);
+
+ return 0;
+}
+
+/**
+ * Construct a new thread pool with <b>n</b> worker threads, configured to
+ * send their output to <b>replyqueue</b>. The threads' states will be
+ * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
+ * as its argument. When the threads close, they will call
+ * <b>free_thread_state_fn</b> on their states.
+ */
+threadpool_t *
+threadpool_new(int n_threads,
+ replyqueue_t *replyqueue,
+ void *(*new_thread_state_fn)(void*),
+ void (*free_thread_state_fn)(void*),
+ void *arg)
+{
+ threadpool_t *pool;
+ pool = tor_malloc_zero(sizeof(threadpool_t));
+ tor_mutex_init_nonrecursive(&pool->lock);
+ tor_cond_init(&pool->condition);
+ TOR_TAILQ_INIT(&pool->work);
+
+ pool->new_thread_state_fn = new_thread_state_fn;
+ pool->new_thread_state_arg = arg;
+ pool->free_thread_state_fn = free_thread_state_fn;
+ pool->reply_queue = replyqueue;
+
+ if (threadpool_start_threads(pool, n_threads) < 0) {
+ tor_mutex_uninit(&pool->lock);
+ tor_free(pool);
+ return NULL;
+ }
+
+ return pool;
+}
+
+/** Return the reply queue associated with a given thread pool. */
+replyqueue_t *
+threadpool_get_replyqueue(threadpool_t *tp)
+{
+ return tp->reply_queue;
+}
+
+/** Allocate a new reply queue. Reply queues are used to pass results from
+ * worker threads to the main thread. Since the main thread is running an
+ * IO-centric event loop, it needs to get woken up with means other than a
+ * condition variable. */
+replyqueue_t *
+replyqueue_new(uint32_t alertsocks_flags)
+{
+ replyqueue_t *rq;
+
+ rq = tor_malloc_zero(sizeof(replyqueue_t));
+ if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
+ tor_free(rq);
+ return NULL;
+ }
+
+ tor_mutex_init(&rq->lock);
+ TOR_TAILQ_INIT(&rq->answers);
+
+ return rq;
+}
+
+/**
+ * Return the "read socket" for a given reply queue. The main thread should
+ * listen for read events on this socket, and call replyqueue_process() every
+ * time it triggers.
+ */
+tor_socket_t
+replyqueue_get_socket(replyqueue_t *rq)
+{
+ return rq->alert.read_fd;
+}
+
+/**
+ * Process all pending replies on a reply queue. The main thread should call
+ * this function every time the socket returned by replyqueue_get_socket() is
+ * readable.
+ */
+void
+replyqueue_process(replyqueue_t *queue)
+{
+ if (queue->alert.drain_fn(queue->alert.read_fd) < 0) {
+ static ratelim_t warn_limit = RATELIM_INIT(7200);
+ log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
+ "Failure from drain_fd");
+ }
+
+ tor_mutex_acquire(&queue->lock);
+ while (!TOR_TAILQ_EMPTY(&queue->answers)) {
+ /* lock must be held at this point.*/
+ workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
+ TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
+ tor_mutex_release(&queue->lock);
+ work->on_pool = NULL;
+
+ work->reply_fn(work->arg);
+ workqueue_entry_free(work);
+
+ tor_mutex_acquire(&queue->lock);
+ }
+
+ tor_mutex_release(&queue->lock);
+}
+
diff --git a/src/common/workqueue.h b/src/common/workqueue.h
new file mode 100644
index 0000000000..92e82b8a48
--- /dev/null
+++ b/src/common/workqueue.h
@@ -0,0 +1,48 @@
+/* Copyright (c) 2013, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_WORKQUEUE_H
+#define TOR_WORKQUEUE_H
+
+#include "compat.h"
+
+/** A replyqueue is used to tell the main thread about the outcome of
+ * work that we queued for the the workers. */
+typedef struct replyqueue_s replyqueue_t;
+/** A thread-pool manages starting threads and passing work to them. */
+typedef struct threadpool_s threadpool_t;
+/** A workqueue entry represents a request that has been passed to a thread
+ * pool. */
+typedef struct workqueue_entry_s workqueue_entry_t;
+
+/** Possible return value from a work function: indicates success. */
+#define WQ_RPL_REPLY 0
+/** Possible return value from a work function: indicates fatal error */
+#define WQ_RPL_ERROR 1
+/** Possible return value from a work function: indicates thread is shutting
+ * down. */
+#define WQ_RPL_SHUTDOWN 2
+
+workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
+ int (*fn)(void *, void *),
+ void (*reply_fn)(void *),
+ void *arg);
+int threadpool_queue_update(threadpool_t *pool,
+ void *(*dup_fn)(void *),
+ int (*fn)(void *, void *),
+ void (*free_fn)(void *),
+ void *arg);
+void *workqueue_entry_cancel(workqueue_entry_t *pending_work);
+threadpool_t *threadpool_new(int n_threads,
+ replyqueue_t *replyqueue,
+ void *(*new_thread_state_fn)(void*),
+ void (*free_thread_state_fn)(void*),
+ void *arg);
+replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp);
+
+replyqueue_t *replyqueue_new(uint32_t alertsocks_flags);
+tor_socket_t replyqueue_get_socket(replyqueue_t *rq);
+void replyqueue_process(replyqueue_t *queue);
+
+#endif
+
diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c
index 36ba3bffb7..d964e66922 100644
--- a/src/or/circuitlist.c
+++ b/src/or/circuitlist.c
@@ -745,6 +745,7 @@ circuit_free(circuit_t *circ)
{
void *mem;
size_t memlen;
+ int should_free = 1;
if (!circ)
return;
@@ -784,6 +785,8 @@ circuit_free(circuit_t *circ)
memlen = sizeof(or_circuit_t);
tor_assert(circ->magic == OR_CIRCUIT_MAGIC);
+ should_free = (ocirc->workqueue_entry == NULL);
+
crypto_cipher_free(ocirc->p_crypto);
crypto_digest_free(ocirc->p_digest);
crypto_cipher_free(ocirc->n_crypto);
@@ -826,8 +829,18 @@ circuit_free(circuit_t *circ)
* "active" checks will be violated. */
cell_queue_clear(&circ->n_chan_cells);
- memwipe(mem, 0xAA, memlen); /* poison memory */
- tor_free(mem);
+ if (should_free) {
+ memwipe(mem, 0xAA, memlen); /* poison memory */
+ tor_free(mem);
+ } else {
+ /* If we made it here, this is an or_circuit_t that still has a pending
+ * cpuworker request which we weren't able to cancel. Instead, set up
+ * the magic value so that when the reply comes back, we'll know to discard
+ * the reply and free this structure.
+ */
+ memwipe(mem, 0xAA, memlen);
+ circ->magic = DEAD_CIRCUIT_MAGIC;
+ }
}
/** Deallocate the linked list circ-><b>cpath</b>, and remove the cpath from
diff --git a/src/or/command.c b/src/or/command.c
index 6dde2a9b7e..c4a0f9baeb 100644
--- a/src/or/command.c
+++ b/src/or/command.c
@@ -310,7 +310,7 @@ command_process_create_cell(cell_t *cell, channel_t *chan)
/* hand it off to the cpuworkers, and then return. */
if (connection_or_digest_is_known_relay(chan->identity_digest))
rep_hist_note_circuit_handshake_requested(create_cell->handshake_type);
- if (assign_onionskin_to_cpuworker(NULL, circ, create_cell) < 0) {
+ if (assign_onionskin_to_cpuworker(circ, create_cell) < 0) {
log_debug(LD_GENERAL,"Failed to hand off onionskin. Closing.");
circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_RESOURCELIMIT);
return;
diff --git a/src/or/config.c b/src/or/config.c
index baff4198ee..91fbe970d9 100644
--- a/src/or/config.c
+++ b/src/or/config.c
@@ -1730,7 +1730,7 @@ options_act(const or_options_t *old_options)
if (have_completed_a_circuit() || !any_predicted_circuits(time(NULL)))
inform_testing_reachability();
}
- cpuworkers_rotate();
+ cpuworkers_rotate_keyinfo();
if (dns_reset())
return -1;
} else {
diff --git a/src/or/connection.c b/src/or/connection.c
index ccd823131d..97fdee732e 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -29,7 +29,6 @@
#include "connection_edge.h"
#include "connection_or.h"
#include "control.h"
-#include "cpuworker.h"
#include "directory.h"
#include "dirserv.h"
#include "dns.h"
@@ -130,7 +129,6 @@ conn_type_to_string(int type)
case CONN_TYPE_AP: return "Socks";
case CONN_TYPE_DIR_LISTENER: return "Directory listener";
case CONN_TYPE_DIR: return "Directory";
- case CONN_TYPE_CPUWORKER: return "CPU worker";
case CONN_TYPE_CONTROL_LISTENER: return "Control listener";
case CONN_TYPE_CONTROL: return "Control";
case CONN_TYPE_EXT_OR: return "Extended OR";
@@ -213,12 +211,6 @@ conn_state_to_string(int type, int state)
case DIR_CONN_STATE_SERVER_WRITING: return "writing";
}
break;
- case CONN_TYPE_CPUWORKER:
- switch (state) {
- case CPUWORKER_STATE_IDLE: return "idle";
- case CPUWORKER_STATE_BUSY_ONION: return "busy with onion";
- }
- break;
case CONN_TYPE_CONTROL:
switch (state) {
case CONTROL_CONN_STATE_OPEN: return "open (protocol v1)";
@@ -248,7 +240,6 @@ connection_type_uses_bufferevent(connection_t *conn)
case CONN_TYPE_CONTROL:
case CONN_TYPE_OR:
case CONN_TYPE_EXT_OR:
- case CONN_TYPE_CPUWORKER:
return 1;
default:
return 0;
@@ -2417,7 +2408,6 @@ connection_mark_all_noncontrol_connections(void)
if (conn->marked_for_close)
continue;
switch (conn->type) {
- case CONN_TYPE_CPUWORKER:
case CONN_TYPE_CONTROL_LISTENER:
case CONN_TYPE_CONTROL:
break;
@@ -4511,8 +4501,6 @@ connection_process_inbuf(connection_t *conn, int package_partial)
package_partial);
case CONN_TYPE_DIR:
return connection_dir_process_inbuf(TO_DIR_CONN(conn));
- case CONN_TYPE_CPUWORKER:
- return connection_cpu_process_inbuf(conn);
case CONN_TYPE_CONTROL:
return connection_control_process_inbuf(TO_CONTROL_CONN(conn));
default:
@@ -4572,8 +4560,6 @@ connection_finished_flushing(connection_t *conn)
return connection_edge_finished_flushing(TO_EDGE_CONN(conn));
case CONN_TYPE_DIR:
return connection_dir_finished_flushing(TO_DIR_CONN(conn));
- case CONN_TYPE_CPUWORKER:
- return connection_cpu_finished_flushing(conn);
case CONN_TYPE_CONTROL:
return connection_control_finished_flushing(TO_CONTROL_CONN(conn));
default:
@@ -4629,8 +4615,6 @@ connection_reached_eof(connection_t *conn)
return connection_edge_reached_eof(TO_EDGE_CONN(conn));
case CONN_TYPE_DIR:
return connection_dir_reached_eof(TO_DIR_CONN(conn));
- case CONN_TYPE_CPUWORKER:
- return connection_cpu_reached_eof(conn);
case CONN_TYPE_CONTROL:
return connection_control_reached_eof(TO_CONTROL_CONN(conn));
default:
@@ -4836,10 +4820,6 @@ assert_connection_ok(connection_t *conn, time_t now)
tor_assert(conn->purpose >= DIR_PURPOSE_MIN_);
tor_assert(conn->purpose <= DIR_PURPOSE_MAX_);
break;
- case CONN_TYPE_CPUWORKER:
- tor_assert(conn->state >= CPUWORKER_STATE_MIN_);
- tor_assert(conn->state <= CPUWORKER_STATE_MAX_);
- break;
case CONN_TYPE_CONTROL:
tor_assert(conn->state >= CONTROL_CONN_STATE_MIN_);
tor_assert(conn->state <= CONTROL_CONN_STATE_MAX_);
@@ -4940,9 +4920,7 @@ proxy_type_to_string(int proxy_type)
}
/** Call connection_free_() on every connection in our array, and release all
- * storage held by connection.c. This is used by cpuworkers and dnsworkers
- * when they fork, so they don't keep resources held open (especially
- * sockets).
+ * storage held by connection.c.
*
* Don't do the checks in connection_free(), because they will
* fail.
diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c
index 340fbec620..39d2079994 100644
--- a/src/or/cpuworker.c
+++ b/src/or/cpuworker.c
@@ -5,84 +5,98 @@
/**
* \file cpuworker.c
- * \brief Implements a farm of 'CPU worker' processes to perform
- * CPU-intensive tasks in another thread or process, to not
- * interrupt the main thread.
+ * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
+ * out to subprocesses.
*
* Right now, we only use this for processing onionskins.
**/
#include "or.h"
-#include "buffers.h"
#include "channel.h"
-#include "channeltls.h"
#include "circuitbuild.h"
#include "circuitlist.h"
-#include "config.h"
-#include "connection.h"
#include "connection_or.h"
+#include "config.h"
#include "cpuworker.h"
#include "main.h"
#include "onion.h"
#include "rephist.h"
#include "router.h"
+#include "workqueue.h"
-/** The maximum number of cpuworker processes we will keep around. */
-#define MAX_CPUWORKERS 16
-/** The minimum number of cpuworker processes we will keep around. */
-#define MIN_CPUWORKERS 1
-
-/** The tag specifies which circuit this onionskin was from. */
-#define TAG_LEN 12
+#ifdef HAVE_EVENT2_EVENT_H
+#include <event2/event.h>
+#else
+#include <event.h>
+#endif
-/** How many cpuworkers we have running right now. */
-static int num_cpuworkers=0;
-/** How many of the running cpuworkers have an assigned task right now. */
-static int num_cpuworkers_busy=0;
-/** We need to spawn new cpuworkers whenever we rotate the onion keys
- * on platforms where execution contexts==processes. This variable stores
- * the last time we got a key rotation event. */
-static time_t last_rotation_time=0;
+static void queue_pending_tasks(void);
-static void cpuworker_main(void *data) ATTR_NORETURN;
-static int spawn_cpuworker(void);
-static void spawn_enough_cpuworkers(void);
-static void process_pending_task(connection_t *cpuworker);
+typedef struct worker_state_s {
+ int generation;
+ server_onion_keys_t *onion_keys;
+} worker_state_t;
-/** Initialize the cpuworker subsystem.
- */
-void
-cpu_init(void)
+static void *
+worker_state_new(void *arg)
{
- cpuworkers_rotate();
+ worker_state_t *ws;
+ (void)arg;
+ ws = tor_malloc_zero(sizeof(worker_state_t));
+ ws->onion_keys = server_onion_keys_new();
+ return ws;
}
-
-/** Called when we're done sending a request to a cpuworker. */
-int
-connection_cpu_finished_flushing(connection_t *conn)
+static void
+worker_state_free(void *arg)
{
- tor_assert(conn);
- tor_assert(conn->type == CONN_TYPE_CPUWORKER);
- return 0;
+ worker_state_t *ws = arg;
+ server_onion_keys_free(ws->onion_keys);
+ tor_free(ws);
}
-/** Pack global_id and circ_id; set *tag to the result. (See note on
- * cpuworker_main for wire format.) */
+static replyqueue_t *replyqueue = NULL;
+static threadpool_t *threadpool = NULL;
+static struct event *reply_event = NULL;
+
+static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
+
+static int total_pending_tasks = 0;
+static int max_pending_tasks = 128;
+
static void
-tag_pack(uint8_t *tag, uint64_t chan_id, circid_t circ_id)
+replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
{
- /*XXXX RETHINK THIS WHOLE MESS !!!! !NM NM NM NM*/
- /*XXXX DOUBLEPLUSTHIS!!!! AS AS AS AS*/
- set_uint64(tag, chan_id);
- set_uint32(tag+8, circ_id);
+ replyqueue_t *rq = arg;
+ (void) sock;
+ (void) events;
+ replyqueue_process(rq);
}
-/** Unpack <b>tag</b> into addr, port, and circ_id.
+/** Initialize the cpuworker subsystem.
*/
-static void
-tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id)
+void
+cpu_init(void)
{
- *chan_id = get_uint64(tag);
- *circ_id = get_uint32(tag+8);
+ if (!replyqueue) {
+ replyqueue = replyqueue_new(0);
+ }
+ if (!reply_event) {
+ reply_event = tor_event_new(tor_libevent_get_base(),
+ replyqueue_get_socket(replyqueue),
+ EV_READ|EV_PERSIST,
+ replyqueue_process_cb,
+ replyqueue);
+ event_add(reply_event, NULL);
+ }
+ if (!threadpool) {
+ threadpool = threadpool_new(get_num_cpus(get_options()),
+ replyqueue,
+ worker_state_new,
+ worker_state_free,
+ NULL);
+ }
+ /* Total voodoo. Can we make this more sensible? */
+ max_pending_tasks = get_num_cpus(get_options()) * 64;
+ crypto_seed_weak_rng(&request_sample_rng);
}
/** Magic numbers to make sure our cpuworker_requests don't grow any
@@ -94,10 +108,6 @@ tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id)
typedef struct cpuworker_request_t {
/** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
uint32_t magic;
- /** Opaque tag to identify the job */
- uint8_t tag[TAG_LEN];
- /** Task code. Must be one of CPUWORKER_TASK_* */
- uint8_t task;
/** Flag: Are we timing this request? */
unsigned timed : 1;
@@ -114,8 +124,7 @@ typedef struct cpuworker_request_t {
typedef struct cpuworker_reply_t {
/** Magic number; must be CPUWORKER_REPLY_MAGIC. */
uint32_t magic;
- /** Opaque tag to identify the job; matches the request's tag.*/
- uint8_t tag[TAG_LEN];
+
/** True iff we got a successful request. */
uint8_t success;
@@ -142,42 +151,39 @@ typedef struct cpuworker_reply_t {
uint8_t rend_auth_material[DIGEST_LEN];
} cpuworker_reply_t;
-/** Called when the onion key has changed and we need to spawn new
- * cpuworkers. Close all currently idle cpuworkers, and mark the last
- * rotation time as now.
- */
-void
-cpuworkers_rotate(void)
+typedef struct cpuworker_job_u {
+ or_circuit_t *circ;
+ union {
+ cpuworker_request_t request;
+ cpuworker_reply_t reply;
+ } u;
+} cpuworker_job_t;
+
+static int
+update_state_threadfn(void *state_, void *work_)
{
- connection_t *cpuworker;
- while ((cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
- CPUWORKER_STATE_IDLE))) {
- connection_mark_for_close(cpuworker);
- --num_cpuworkers;
- }
- last_rotation_time = time(NULL);
- if (server_mode(get_options()))
- spawn_enough_cpuworkers();
+ worker_state_t *state = state_;
+ worker_state_t *update = work_;
+ server_onion_keys_free(state->onion_keys);
+ state->onion_keys = update->onion_keys;
+ update->onion_keys = NULL;
+ ++state->generation;
+ return WQ_RPL_REPLY;
}
-/** If the cpuworker closes the connection,
- * mark it as closed and spawn a new one as needed. */
-int
-connection_cpu_reached_eof(connection_t *conn)
+/** Called when the onion key has changed so update all CPU worker(s) with
+ * new function pointers with which a new state will be generated.
+ */
+void
+cpuworkers_rotate_keyinfo(void)
{
- log_warn(LD_GENERAL,"Read eof. CPU worker died unexpectedly.");
- if (conn->state != CPUWORKER_STATE_IDLE) {
- /* the circ associated with this cpuworker will have to wait until
- * it gets culled in run_connection_housekeeping(), since we have
- * no way to find out which circ it was. */
- log_warn(LD_GENERAL,"...and it left a circuit queued; abandoning circ.");
- num_cpuworkers_busy--;
+ if (threadpool_queue_update(threadpool,
+ worker_state_new,
+ update_state_threadfn,
+ worker_state_free,
+ NULL)) {
+ log_warn(LD_OR, "Failed to queue key update for worker threads.");
}
- num_cpuworkers--;
- spawn_enough_cpuworkers(); /* try to regrow. hope we don't end up
- spinning. */
- connection_mark_for_close(conn);
- return 0;
}
/** Indexed by handshake type: how many onionskins have we processed and
@@ -197,8 +203,6 @@ static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
* time. (microseconds) */
#define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
-static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
-
/** Return true iff we'd like to measure a handshake of type
* <b>onionskin_type</b>. Call only from the main thread. */
static int
@@ -286,428 +290,266 @@ cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
onionskin_type_name, (unsigned)overhead, relative_overhead*100);
}
-/** Called when we get data from a cpuworker. If the answer is not complete,
- * wait for a complete answer. If the answer is complete,
- * process it as appropriate.
- */
-int
-connection_cpu_process_inbuf(connection_t *conn)
-{
- uint64_t chan_id;
- circid_t circ_id;
- channel_t *p_chan = NULL;
- circuit_t *circ;
-
- tor_assert(conn);
- tor_assert(conn->type == CONN_TYPE_CPUWORKER);
-
- if (!connection_get_inbuf_len(conn))
- return 0;
-
- if (conn->state == CPUWORKER_STATE_BUSY_ONION) {
- cpuworker_reply_t rpl;
- if (connection_get_inbuf_len(conn) < sizeof(cpuworker_reply_t))
- return 0; /* not yet */
- tor_assert(connection_get_inbuf_len(conn) == sizeof(cpuworker_reply_t));
-
- connection_fetch_from_buf((void*)&rpl,sizeof(cpuworker_reply_t),conn);
-
- tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
-
- if (rpl.timed && rpl.success &&
- rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
- /* Time how long this request took. The handshake_type check should be
- needless, but let's leave it in to be safe. */
- struct timeval tv_end, tv_diff;
- int64_t usec_roundtrip;
- tor_gettimeofday(&tv_end);
- timersub(&tv_end, &rpl.started_at, &tv_diff);
- usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
- if (usec_roundtrip >= 0 &&
- usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
- ++onionskins_n_processed[rpl.handshake_type];
- onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
- onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
- if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
- /* Scale down every 500000 handshakes. On a busy server, that's
- * less impressive than it sounds. */
- onionskins_n_processed[rpl.handshake_type] /= 2;
- onionskins_usec_internal[rpl.handshake_type] /= 2;
- onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
- }
- }
- }
- /* parse out the circ it was talking about */
- tag_unpack(rpl.tag, &chan_id, &circ_id);
- circ = NULL;
- log_debug(LD_OR,
- "Unpacking cpuworker reply, chan_id is " U64_FORMAT
- ", circ_id is %u",
- U64_PRINTF_ARG(chan_id), (unsigned)circ_id);
- p_chan = channel_find_by_global_id(chan_id);
-
- if (p_chan)
- circ = circuit_get_by_circid_channel(circ_id, p_chan);
-
- if (rpl.success == 0) {
- log_debug(LD_OR,
- "decoding onionskin failed. "
- "(Old key or bad software.) Closing.");
- if (circ)
- circuit_mark_for_close(circ, END_CIRC_REASON_TORPROTOCOL);
- goto done_processing;
- }
- if (!circ) {
- /* This happens because somebody sends us a destroy cell and the
- * circuit goes away, while the cpuworker is working. This is also
- * why our tag doesn't include a pointer to the circ, because we'd
- * never know if it's still valid.
- */
- log_debug(LD_OR,"processed onion for a circ that's gone. Dropping.");
- goto done_processing;
- }
- tor_assert(! CIRCUIT_IS_ORIGIN(circ));
- if (onionskin_answer(TO_OR_CIRCUIT(circ),
- &rpl.created_cell,
- (const char*)rpl.keys,
- rpl.rend_auth_material) < 0) {
- log_warn(LD_OR,"onionskin_answer failed. Closing.");
- circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
- goto done_processing;
- }
- log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
- } else {
- tor_assert(0); /* don't ask me to do handshakes yet */
- }
-
- done_processing:
- conn->state = CPUWORKER_STATE_IDLE;
- num_cpuworkers_busy--;
- if (conn->timestamp_created < last_rotation_time) {
- connection_mark_for_close(conn);
- num_cpuworkers--;
- spawn_enough_cpuworkers();
- } else {
- process_pending_task(conn);
- }
- return 0;
-}
-
-/** Implement a cpuworker. 'data' is an fdarray as returned by socketpair.
- * Read and writes from fdarray[1]. Reads requests, writes answers.
- *
- * Request format:
- * cpuworker_request_t.
- * Response format:
- * cpuworker_reply_t
- */
+/** Handle a reply from the worker threads. */
static void
-cpuworker_main(void *data)
+cpuworker_onion_handshake_replyfn(void *work_)
{
- /* For talking to the parent thread/process */
- tor_socket_t *fdarray = data;
- tor_socket_t fd;
-
- /* variables for onion processing */
- server_onion_keys_t onion_keys;
- cpuworker_request_t req;
+ cpuworker_job_t *job = work_;
cpuworker_reply_t rpl;
-
- fd = fdarray[1]; /* this side is ours */
- tor_free(data);
-
- setup_server_onion_keys(&onion_keys);
-
- for (;;) {
- if (read_all(fd, (void *)&req, sizeof(req), 1) != sizeof(req)) {
- log_info(LD_OR, "read request failed. Exiting.");
- goto end;
- }
- tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
-
- memset(&rpl, 0, sizeof(rpl));
-
- if (req.task == CPUWORKER_TASK_ONION) {
- const create_cell_t *cc = &req.create_cell;
- created_cell_t *cell_out = &rpl.created_cell;
- struct timeval tv_start = {0,0}, tv_end;
- int n;
- rpl.timed = req.timed;
- rpl.started_at = req.started_at;
- rpl.handshake_type = cc->handshake_type;
- if (req.timed)
- tor_gettimeofday(&tv_start);
- n = onion_skin_server_handshake(cc->handshake_type,
- cc->onionskin, cc->handshake_len,
- &onion_keys,
- cell_out->reply,
- rpl.keys, CPATH_KEY_MATERIAL_LEN,
- rpl.rend_auth_material);
- if (n < 0) {
- /* failure */
- log_debug(LD_OR,"onion_skin_server_handshake failed.");
- memset(&rpl, 0, sizeof(rpl));
- memcpy(rpl.tag, req.tag, TAG_LEN);
- rpl.success = 0;
- } else {
- /* success */
- log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
- memcpy(rpl.tag, req.tag, TAG_LEN);
- cell_out->handshake_len = n;
- switch (cc->cell_type) {
- case CELL_CREATE:
- cell_out->cell_type = CELL_CREATED; break;
- case CELL_CREATE2:
- cell_out->cell_type = CELL_CREATED2; break;
- case CELL_CREATE_FAST:
- cell_out->cell_type = CELL_CREATED_FAST; break;
- default:
- tor_assert(0);
- goto end;
- }
- rpl.success = 1;
- }
- rpl.magic = CPUWORKER_REPLY_MAGIC;
- if (req.timed) {
- struct timeval tv_diff;
- int64_t usec;
- tor_gettimeofday(&tv_end);
- timersub(&tv_end, &tv_start, &tv_diff);
- usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
- if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
- rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
- else
- rpl.n_usec = (uint32_t) usec;
+ or_circuit_t *circ = NULL;
+
+ --total_pending_tasks;
+
+ /* Could avoid this, but doesn't matter. */
+ memcpy(&rpl, &job->u.reply, sizeof(rpl));
+
+ tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
+
+ if (rpl.timed && rpl.success &&
+ rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
+ /* Time how long this request took. The handshake_type check should be
+ needless, but let's leave it in to be safe. */
+ struct timeval tv_end, tv_diff;
+ int64_t usec_roundtrip;
+ tor_gettimeofday(&tv_end);
+ timersub(&tv_end, &rpl.started_at, &tv_diff);
+ usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
+ if (usec_roundtrip >= 0 &&
+ usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
+ ++onionskins_n_processed[rpl.handshake_type];
+ onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
+ onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
+ if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
+ /* Scale down every 500000 handshakes. On a busy server, that's
+ * less impressive than it sounds. */
+ onionskins_n_processed[rpl.handshake_type] /= 2;
+ onionskins_usec_internal[rpl.handshake_type] /= 2;
+ onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
}
- if (write_all(fd, (void*)&rpl, sizeof(rpl), 1) != sizeof(rpl)) {
- log_err(LD_BUG,"writing response buf failed. Exiting.");
- goto end;
- }
- log_debug(LD_OR,"finished writing response.");
- } else if (req.task == CPUWORKER_TASK_SHUTDOWN) {
- log_info(LD_OR,"Clean shutdown: exiting");
- goto end;
}
- memwipe(&req, 0, sizeof(req));
- memwipe(&rpl, 0, sizeof(req));
}
- end:
- memwipe(&req, 0, sizeof(req));
- memwipe(&rpl, 0, sizeof(req));
- release_server_onion_keys(&onion_keys);
- tor_close_socket(fd);
- crypto_thread_cleanup();
- spawn_exit();
-}
-/** Launch a new cpuworker. Return 0 if we're happy, -1 if we failed.
- */
-static int
-spawn_cpuworker(void)
-{
- tor_socket_t *fdarray;
- tor_socket_t fd;
- connection_t *conn;
- int err;
-
- fdarray = tor_calloc(2, sizeof(tor_socket_t));
- if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fdarray)) < 0) {
- log_warn(LD_NET, "Couldn't construct socketpair for cpuworker: %s",
- tor_socket_strerror(-err));
- tor_free(fdarray);
- return -1;
- }
+ circ = job->circ;
- tor_assert(SOCKET_OK(fdarray[0]));
- tor_assert(SOCKET_OK(fdarray[1]));
+ log_debug(LD_OR,
+ "Unpacking cpuworker reply %p, circ=%p, success=%d",
+ job, circ, rpl.success);
- fd = fdarray[0];
- if (spawn_func(cpuworker_main, (void*)fdarray) < 0) {
- tor_close_socket(fdarray[0]);
- tor_close_socket(fdarray[1]);
- tor_free(fdarray);
- return -1;
+ if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
+ /* The circuit was supposed to get freed while the reply was
+ * pending. Instead, it got left for us to free so that we wouldn't freak
+ * out when the job->circ field wound up pointing to nothing. */
+ log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
+ circ->base_.magic = 0;
+ tor_free(circ);
+ goto done_processing;
}
- log_debug(LD_OR,"just spawned a cpu worker.");
- conn = connection_new(CONN_TYPE_CPUWORKER, AF_UNIX);
+ circ->workqueue_entry = NULL;
- /* set up conn so it's got all the data we need to remember */
- conn->s = fd;
- conn->address = tor_strdup("localhost");
- tor_addr_make_unspec(&conn->addr);
-
- if (set_socket_nonblocking(fd) == -1) {
- connection_free(conn); /* this closes fd */
- return -1;
+ if (rpl.success == 0) {
+ log_debug(LD_OR,
+ "decoding onionskin failed. "
+ "(Old key or bad software.) Closing.");
+ if (circ)
+ circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
+ goto done_processing;
}
- if (connection_add(conn) < 0) { /* no space, forget it */
- log_warn(LD_NET,"connection_add for cpuworker failed. Giving up.");
- connection_free(conn); /* this closes fd */
- return -1;
+ if (onionskin_answer(circ,
+ &rpl.created_cell,
+ (const char*)rpl.keys,
+ rpl.rend_auth_material) < 0) {
+ log_warn(LD_OR,"onionskin_answer failed. Closing.");
+ circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
+ goto done_processing;
}
+ log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
- conn->state = CPUWORKER_STATE_IDLE;
- connection_start_reading(conn);
- return 0; /* success */
+ done_processing:
+ memwipe(&rpl, 0, sizeof(rpl));
+ memwipe(job, 0, sizeof(*job));
+ tor_free(job);
+ queue_pending_tasks();
}
-/** If we have too few or too many active cpuworkers, try to spawn new ones
- * or kill idle ones.
- */
-static void
-spawn_enough_cpuworkers(void)
+/** Implementation function for onion handshake requests. */
+static int
+cpuworker_onion_handshake_threadfn(void *state_, void *work_)
{
- int num_cpuworkers_needed = get_num_cpus(get_options());
- int reseed = 0;
+ worker_state_t *state = state_;
+ cpuworker_job_t *job = work_;
- if (num_cpuworkers_needed < MIN_CPUWORKERS)
- num_cpuworkers_needed = MIN_CPUWORKERS;
- if (num_cpuworkers_needed > MAX_CPUWORKERS)
- num_cpuworkers_needed = MAX_CPUWORKERS;
+ /* variables for onion processing */
+ server_onion_keys_t *onion_keys = state->onion_keys;
+ cpuworker_request_t req;
+ cpuworker_reply_t rpl;
- while (num_cpuworkers < num_cpuworkers_needed) {
- if (spawn_cpuworker() < 0) {
- log_warn(LD_GENERAL,"Cpuworker spawn failed. Will try again later.");
- return;
+ memcpy(&req, &job->u.request, sizeof(req));
+
+ tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
+ memset(&rpl, 0, sizeof(rpl));
+
+ const create_cell_t *cc = &req.create_cell;
+ created_cell_t *cell_out = &rpl.created_cell;
+ struct timeval tv_start = {0,0}, tv_end;
+ int n;
+ rpl.timed = req.timed;
+ rpl.started_at = req.started_at;
+ rpl.handshake_type = cc->handshake_type;
+ if (req.timed)
+ tor_gettimeofday(&tv_start);
+ n = onion_skin_server_handshake(cc->handshake_type,
+ cc->onionskin, cc->handshake_len,
+ onion_keys,
+ cell_out->reply,
+ rpl.keys, CPATH_KEY_MATERIAL_LEN,
+ rpl.rend_auth_material);
+ if (n < 0) {
+ /* failure */
+ log_debug(LD_OR,"onion_skin_server_handshake failed.");
+ memset(&rpl, 0, sizeof(rpl));
+ rpl.success = 0;
+ } else {
+ /* success */
+ log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
+ cell_out->handshake_len = n;
+ switch (cc->cell_type) {
+ case CELL_CREATE:
+ cell_out->cell_type = CELL_CREATED; break;
+ case CELL_CREATE2:
+ cell_out->cell_type = CELL_CREATED2; break;
+ case CELL_CREATE_FAST:
+ cell_out->cell_type = CELL_CREATED_FAST; break;
+ default:
+ tor_assert(0);
+ return WQ_RPL_SHUTDOWN;
}
- num_cpuworkers++;
- reseed++;
+ rpl.success = 1;
}
+ rpl.magic = CPUWORKER_REPLY_MAGIC;
+ if (req.timed) {
+ struct timeval tv_diff;
+ int64_t usec;
+ tor_gettimeofday(&tv_end);
+ timersub(&tv_end, &tv_start, &tv_diff);
+ usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
+ if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
+ rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
+ else
+ rpl.n_usec = (uint32_t) usec;
+ }
+
+ memcpy(&job->u.reply, &rpl, sizeof(rpl));
- if (reseed)
- crypto_seed_weak_rng(&request_sample_rng);
+ memwipe(&req, 0, sizeof(req));
+ memwipe(&rpl, 0, sizeof(req));
+ return WQ_RPL_REPLY;
}
-/** Take a pending task from the queue and assign it to 'cpuworker'. */
+/** Take pending tasks from the queue and assign them to cpuworkers. */
static void
-process_pending_task(connection_t *cpuworker)
+queue_pending_tasks(void)
{
or_circuit_t *circ;
create_cell_t *onionskin = NULL;
- tor_assert(cpuworker);
-
- /* for now only process onion tasks */
-
- circ = onion_next_task(&onionskin);
- if (!circ)
- return;
- if (assign_onionskin_to_cpuworker(cpuworker, circ, onionskin))
- log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
-}
+ while (total_pending_tasks < max_pending_tasks) {
+ circ = onion_next_task(&onionskin);
-/** How long should we let a cpuworker stay busy before we give
- * up on it and decide that we have a bug or infinite loop?
- * This value is high because some servers with low memory/cpu
- * sometimes spend an hour or more swapping, and Tor starves. */
-#define CPUWORKER_BUSY_TIMEOUT (60*60*12)
+ if (!circ)
+ return;
-/** We have a bug that I can't find. Sometimes, very rarely, cpuworkers get
- * stuck in the 'busy' state, even though the cpuworker process thinks of
- * itself as idle. I don't know why. But here's a workaround to kill any
- * cpuworker that's been busy for more than CPUWORKER_BUSY_TIMEOUT.
- */
-static void
-cull_wedged_cpuworkers(void)
-{
- time_t now = time(NULL);
- smartlist_t *conns = get_connection_array();
- SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
- if (!conn->marked_for_close &&
- conn->type == CONN_TYPE_CPUWORKER &&
- conn->state == CPUWORKER_STATE_BUSY_ONION &&
- conn->timestamp_lastwritten + CPUWORKER_BUSY_TIMEOUT < now) {
- log_notice(LD_BUG,
- "closing wedged cpuworker. Can somebody find the bug?");
- num_cpuworkers_busy--;
- num_cpuworkers--;
- connection_mark_for_close(conn);
- }
- } SMARTLIST_FOREACH_END(conn);
+ if (assign_onionskin_to_cpuworker(circ, onionskin))
+ log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
+ }
}
/** Try to tell a cpuworker to perform the public key operations necessary to
* respond to <b>onionskin</b> for the circuit <b>circ</b>.
*
- * If <b>cpuworker</b> is defined, assert that he's idle, and use him. Else,
- * look for an idle cpuworker and use him. If none idle, queue task onto the
- * pending onion list and return. Return 0 if we successfully assign the
- * task, or -1 on failure.
+ * Return 0 if we successfully assign the task, or -1 on failure.
*/
int
-assign_onionskin_to_cpuworker(connection_t *cpuworker,
- or_circuit_t *circ,
+assign_onionskin_to_cpuworker(or_circuit_t *circ,
create_cell_t *onionskin)
{
+ workqueue_entry_t *queue_entry;
+ cpuworker_job_t *job;
cpuworker_request_t req;
- time_t now = approx_time();
- static time_t last_culled_cpuworkers = 0;
int should_time;
- /* Checking for wedged cpuworkers requires a linear search over all
- * connections, so let's do it only once a minute.
- */
-#define CULL_CPUWORKERS_INTERVAL 60
-
- if (last_culled_cpuworkers + CULL_CPUWORKERS_INTERVAL <= now) {
- cull_wedged_cpuworkers();
- spawn_enough_cpuworkers();
- last_culled_cpuworkers = now;
+ if (!circ->p_chan) {
+ log_info(LD_OR,"circ->p_chan gone. Failing circ.");
+ tor_free(onionskin);
+ return -1;
}
- if (1) {
- if (num_cpuworkers_busy == num_cpuworkers) {
- log_debug(LD_OR,"No idle cpuworkers. Queuing.");
- if (onion_pending_add(circ, onionskin) < 0) {
- tor_free(onionskin);
- return -1;
- }
- return 0;
- }
-
- if (!cpuworker)
- cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
- CPUWORKER_STATE_IDLE);
-
- tor_assert(cpuworker);
-
- if (!circ->p_chan) {
- log_info(LD_OR,"circ->p_chan gone. Failing circ.");
+ if (total_pending_tasks >= max_pending_tasks) {
+ log_debug(LD_OR,"No idle cpuworkers. Queuing.");
+ if (onion_pending_add(circ, onionskin) < 0) {
tor_free(onionskin);
return -1;
}
+ return 0;
+ }
- if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest))
- rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
+ if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest))
+ rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
- should_time = should_time_request(onionskin->handshake_type);
- memset(&req, 0, sizeof(req));
- req.magic = CPUWORKER_REQUEST_MAGIC;
- tag_pack(req.tag, circ->p_chan->global_identifier,
- circ->p_circ_id);
- req.timed = should_time;
+ should_time = should_time_request(onionskin->handshake_type);
+ memset(&req, 0, sizeof(req));
+ req.magic = CPUWORKER_REQUEST_MAGIC;
+ req.timed = should_time;
- cpuworker->state = CPUWORKER_STATE_BUSY_ONION;
- /* touch the lastwritten timestamp, since that's how we check to
- * see how long it's been since we asked the question, and sometimes
- * we check before the first call to connection_handle_write(). */
- cpuworker->timestamp_lastwritten = now;
- num_cpuworkers_busy++;
+ memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
- req.task = CPUWORKER_TASK_ONION;
- memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
+ tor_free(onionskin);
- tor_free(onionskin);
+ if (should_time)
+ tor_gettimeofday(&req.started_at);
- if (should_time)
- tor_gettimeofday(&req.started_at);
+ job = tor_malloc_zero(sizeof(cpuworker_job_t));
+ job->circ = circ;
+ memcpy(&job->u.request, &req, sizeof(req));
+ memwipe(&req, 0, sizeof(req));
- connection_write_to_buf((void*)&req, sizeof(req), cpuworker);
- memwipe(&req, 0, sizeof(req));
+ ++total_pending_tasks;
+ queue_entry = threadpool_queue_work(threadpool,
+ cpuworker_onion_handshake_threadfn,
+ cpuworker_onion_handshake_replyfn,
+ job);
+ if (!queue_entry) {
+ log_warn(LD_BUG, "Couldn't queue work on threadpool");
+ tor_free(job);
+ return -1;
}
+
+ log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
+ job, queue_entry, job->circ);
+
+ circ->workqueue_entry = queue_entry;
+
return 0;
}
+/** If <b>circ</b> has a pending handshake that hasn't been processed yet,
+ * remove it from the worker queue. */
+void
+cpuworker_cancel_circ_handshake(or_circuit_t *circ)
+{
+ cpuworker_job_t *job;
+ if (circ->workqueue_entry == NULL)
+ return;
+
+ job = workqueue_entry_cancel(circ->workqueue_entry);
+ if (job) {
+ /* It successfully cancelled. */
+ memwipe(job, 0xe0, sizeof(*job));
+ tor_free(job);
+ }
+
+ circ->workqueue_entry = NULL;
+}
+
diff --git a/src/or/cpuworker.h b/src/or/cpuworker.h
index 2a2b37a975..70a595e472 100644
--- a/src/or/cpuworker.h
+++ b/src/or/cpuworker.h
@@ -13,19 +13,17 @@
#define TOR_CPUWORKER_H
void cpu_init(void);
-void cpuworkers_rotate(void);
-int connection_cpu_finished_flushing(connection_t *conn);
-int connection_cpu_reached_eof(connection_t *conn);
-int connection_cpu_process_inbuf(connection_t *conn);
+void cpuworkers_rotate_keyinfo(void);
+
struct create_cell_t;
-int assign_onionskin_to_cpuworker(connection_t *cpuworker,
- or_circuit_t *circ,
+int assign_onionskin_to_cpuworker(or_circuit_t *circ,
struct create_cell_t *onionskin);
uint64_t estimated_usec_for_onionskins(uint32_t n_requests,
uint16_t onionskin_type);
void cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
const char *onionskin_type_name);
+void cpuworker_cancel_circ_handshake(or_circuit_t *circ);
#endif
diff --git a/src/or/main.c b/src/or/main.c
index abf3230c4c..136043c117 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -1271,7 +1271,7 @@ run_scheduled_events(time_t now)
get_onion_key_set_at()+MIN_ONION_KEY_LIFETIME < now) {
log_info(LD_GENERAL,"Rotating onion key.");
rotate_onion_key();
- cpuworkers_rotate();
+ cpuworkers_rotate_keyinfo();
if (router_rebuild_descriptor(1)<0) {
log_info(LD_CONFIG, "Couldn't rebuild router descriptor");
}
@@ -1960,9 +1960,9 @@ do_hup(void)
* force a retry there. */
if (server_mode(options)) {
- /* Restart cpuworker and dnsworker processes, so they get up-to-date
+ /* Update cpuworker and dnsworker processes, so they get up-to-date
* configuration options. */
- cpuworkers_rotate();
+ cpuworkers_rotate_keyinfo();
dns_reset();
}
return 0;
diff --git a/src/or/onion.c b/src/or/onion.c
index 3723a3e11e..43fb63c832 100644
--- a/src/or/onion.c
+++ b/src/or/onion.c
@@ -295,6 +295,8 @@ onion_pending_remove(or_circuit_t *circ)
victim = circ->onionqueue_entry;
if (victim)
onion_queue_entry_remove(victim);
+
+ cpuworker_cancel_circ_handshake(circ);
}
/** Remove a queue entry <b>victim</b> from the queue, unlinking it from
@@ -339,25 +341,25 @@ clear_pending_onions(void)
/* ============================================================ */
-/** Fill in a server_onion_keys_t object at <b>keys</b> with all of the keys
+/** Return a new server_onion_keys_t object with all of the keys
* and other info we might need to do onion handshakes. (We make a copy of
* our keys for each cpuworker to avoid race conditions with the main thread,
* and to avoid locking) */
-void
-setup_server_onion_keys(server_onion_keys_t *keys)
+server_onion_keys_t *
+server_onion_keys_new(void)
{
- memset(keys, 0, sizeof(server_onion_keys_t));
+ server_onion_keys_t *keys = tor_malloc_zero(sizeof(server_onion_keys_t));
memcpy(keys->my_identity, router_get_my_id_digest(), DIGEST_LEN);
dup_onion_keys(&keys->onion_key, &keys->last_onion_key);
keys->curve25519_key_map = construct_ntor_key_map();
keys->junk_keypair = tor_malloc_zero(sizeof(curve25519_keypair_t));
curve25519_keypair_generate(keys->junk_keypair, 0);
+ return keys;
}
-/** Release all storage held in <b>keys</b>, but do not free <b>keys</b>
- * itself (as it's likely to be stack-allocated.) */
+/** Release all storage held in <b>keys</b>. */
void
-release_server_onion_keys(server_onion_keys_t *keys)
+server_onion_keys_free(server_onion_keys_t *keys)
{
if (! keys)
return;
@@ -366,7 +368,8 @@ release_server_onion_keys(server_onion_keys_t *keys)
crypto_pk_free(keys->last_onion_key);
ntor_key_map_free(keys->curve25519_key_map);
tor_free(keys->junk_keypair);
- memset(keys, 0, sizeof(server_onion_keys_t));
+ memwipe(keys, 0, sizeof(server_onion_keys_t));
+ tor_free(keys);
}
/** Release whatever storage is held in <b>state</b>, depending on its
diff --git a/src/or/onion.h b/src/or/onion.h
index 35619879e4..96050083f8 100644
--- a/src/or/onion.h
+++ b/src/or/onion.h
@@ -30,8 +30,8 @@ typedef struct server_onion_keys_t {
#define MAX_ONIONSKIN_CHALLENGE_LEN 255
#define MAX_ONIONSKIN_REPLY_LEN 255
-void setup_server_onion_keys(server_onion_keys_t *keys);
-void release_server_onion_keys(server_onion_keys_t *keys);
+server_onion_keys_t *server_onion_keys_new(void);
+void server_onion_keys_free(server_onion_keys_t *keys);
void onion_handshake_state_release(onion_handshake_state_t *state);
diff --git a/src/or/or.h b/src/or/or.h
index f821d0b405..ef217fbca8 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -213,8 +213,7 @@ typedef enum {
#define CONN_TYPE_DIR_LISTENER 8
/** Type for HTTP connections to the directory server. */
#define CONN_TYPE_DIR 9
-/** Connection from the main process to a CPU worker process. */
-#define CONN_TYPE_CPUWORKER 10
+/* Type 10 is unused. */
/** Type for listening for connections from user interface process. */
#define CONN_TYPE_CONTROL_LISTENER 11
/** Type for connections from user interface process. */
@@ -276,17 +275,6 @@ typedef enum {
/** State for any listener connection. */
#define LISTENER_STATE_READY 0
-#define CPUWORKER_STATE_MIN_ 1
-/** State for a connection to a cpuworker process that's idle. */
-#define CPUWORKER_STATE_IDLE 1
-/** State for a connection to a cpuworker process that's processing a
- * handshake. */
-#define CPUWORKER_STATE_BUSY_ONION 2
-#define CPUWORKER_STATE_MAX_ 2
-
-#define CPUWORKER_TASK_ONION CPUWORKER_STATE_BUSY_ONION
-#define CPUWORKER_TASK_SHUTDOWN 255
-
#define OR_CONN_STATE_MIN_ 1
/** State for a connection to an OR: waiting for connect() to finish. */
#define OR_CONN_STATE_CONNECTING 1
@@ -2707,8 +2695,14 @@ typedef struct {
time_t expiry_time;
} cpath_build_state_t;
+/** "magic" value for an origin_circuit_t */
#define ORIGIN_CIRCUIT_MAGIC 0x35315243u
+/** "magic" value for an or_circuit_t */
#define OR_CIRCUIT_MAGIC 0x98ABC04Fu
+/** "magic" value for a circuit that would have been freed by circuit_free,
+ * but which we're keeping around until a cpuworker reply arrives. See
+ * circuit_free() for more documentation. */
+#define DEAD_CIRCUIT_MAGIC 0xdeadc14c
struct create_cell_t;
@@ -3128,6 +3122,9 @@ typedef struct or_circuit_t {
/** Pointer to an entry on the onion queue, if this circuit is waiting for a
* chance to give an onionskin to a cpuworker. Used only in onion.c */
struct onion_queue_t *onionqueue_entry;
+ /** Pointer to a workqueue entry, if this circuit has given an onionskin to
+ * a cpuworker and is waiting for a response. Used only in cpuworker.c */
+ struct workqueue_entry_s *workqueue_entry;
/** The circuit_id used in the previous (backward) hop of this circuit. */
circid_t p_circ_id;
diff --git a/src/test/include.am b/src/test/include.am
index 134c2ff56c..3c59a8b3c7 100644
--- a/src/test/include.am
+++ b/src/test/include.am
@@ -2,7 +2,7 @@ TESTS += src/test/test
noinst_PROGRAMS+= src/test/bench
if UNITTESTS_ENABLED
-noinst_PROGRAMS+= src/test/test src/test/test-child
+noinst_PROGRAMS+= src/test/test src/test/test-child src/test/test_workqueue
endif
src_test_AM_CPPFLAGS = -DSHARE_DATADIR="\"$(datadir)\"" \
@@ -47,6 +47,7 @@ src_test_test_SOURCES = \
src/test/test_routerkeys.c \
src/test/test_scheduler.c \
src/test/test_socks.c \
+ src/test/test_threads.c \
src/test/test_util.c \
src/test/test_config.c \
src/test/test_hs.c \
@@ -63,6 +64,11 @@ src_test_test_CPPFLAGS= $(src_test_AM_CPPFLAGS)
src_test_bench_SOURCES = \
src/test/bench.c
+src_test_test_workqueue_SOURCES = \
+ src/test/test_workqueue.c
+src_test_test_workqueue_CPPFLAGS= $(src_test_AM_CPPFLAGS)
+src_test_test_workqueue_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
+
src_test_test_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \
@TOR_LDFLAGS_libevent@
src_test_test_LDADD = src/or/libtor-testing.a src/common/libor-testing.a \
@@ -81,6 +87,15 @@ src_test_bench_LDADD = src/or/libtor.a src/common/libor.a \
@TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ \
@TOR_SYSTEMD_LIBS@
+src_test_test_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \
+ @TOR_LDFLAGS_libevent@
+src_test_test_workqueue_LDADD = src/or/libtor-testing.a \
+ src/common/libor-testing.a \
+ src/common/libor-crypto-testing.a $(LIBDONNA) \
+ src/common/libor-event-testing.a \
+ @TOR_ZLIB_LIBS@ @TOR_LIB_MATH@ @TOR_LIBEVENT_LIBS@ \
+ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@
+
noinst_HEADERS+= \
src/test/fakechans.h \
src/test/test.h \
diff --git a/src/test/test.c b/src/test/test.c
index 65d15f12bd..fc5290f0b9 100644
--- a/src/test/test.c
+++ b/src/test/test.c
@@ -1258,6 +1258,23 @@ test_stats(void *arg)
tor_free(s);
}
+static void *
+passthrough_test_setup(const struct testcase_t *testcase)
+{
+ return testcase->setup_data;
+}
+static int
+passthrough_test_cleanup(const struct testcase_t *testcase, void *ptr)
+{
+ (void)testcase;
+ (void)ptr;
+ return 1;
+}
+
+const struct testcase_setup_t passthrough_setup = {
+ passthrough_test_setup, passthrough_test_cleanup
+};
+
#define ENT(name) \
{ #name, test_ ## name , 0, NULL, NULL }
#define FORK(name) \
@@ -1297,6 +1314,7 @@ extern struct testcase_t cell_queue_tests[];
extern struct testcase_t options_tests[];
extern struct testcase_t socks_tests[];
extern struct testcase_t entrynodes_tests[];
+extern struct testcase_t thread_tests[];
extern struct testcase_t extorport_tests[];
extern struct testcase_t controller_event_tests[];
extern struct testcase_t logging_tests[];
@@ -1324,6 +1342,7 @@ static struct testgroup_t testgroups[] = {
{ "container/", container_tests },
{ "util/", util_tests },
{ "util/logging/", logging_tests },
+ { "util/thread/", thread_tests },
{ "cellfmt/", cell_format_tests },
{ "cellqueue/", cell_queue_tests },
{ "dir/", dir_tests },
diff --git a/src/test/test.h b/src/test/test.h
index 48037a5ba3..b8057c59bf 100644
--- a/src/test/test.h
+++ b/src/test/test.h
@@ -158,5 +158,7 @@ crypto_pk_t *pk_generate(int idx);
#define NS_MOCK(name) MOCK(name, NS(name))
#define NS_UNMOCK(name) UNMOCK(name)
+extern const struct testcase_setup_t passthrough_setup;
+
#endif
diff --git a/src/test/test_crypto.c b/src/test/test_crypto.c
index 4a5a12c50a..8426c715a4 100644
--- a/src/test/test_crypto.c
+++ b/src/test/test_crypto.c
@@ -1975,30 +1975,14 @@ test_crypto_siphash(void *arg)
;
}
-static void *
-pass_data_setup_fn(const struct testcase_t *testcase)
-{
- return testcase->setup_data;
-}
-static int
-pass_data_cleanup_fn(const struct testcase_t *testcase, void *ptr)
-{
- (void)ptr;
- (void)testcase;
- return 1;
-}
-static const struct testcase_setup_t pass_data = {
- pass_data_setup_fn, pass_data_cleanup_fn
-};
-
#define CRYPTO_LEGACY(name) \
{ #name, test_crypto_ ## name , 0, NULL, NULL }
struct testcase_t crypto_tests[] = {
CRYPTO_LEGACY(formats),
CRYPTO_LEGACY(rng),
- { "aes_AES", test_crypto_aes, TT_FORK, &pass_data, (void*)"aes" },
- { "aes_EVP", test_crypto_aes, TT_FORK, &pass_data, (void*)"evp" },
+ { "aes_AES", test_crypto_aes, TT_FORK, &passthrough_setup, (void*)"aes" },
+ { "aes_EVP", test_crypto_aes, TT_FORK, &passthrough_setup, (void*)"evp" },
CRYPTO_LEGACY(sha),
CRYPTO_LEGACY(pk),
{ "pk_fingerprints", test_crypto_pk_fingerprints, TT_FORK, NULL, NULL },
@@ -2006,23 +1990,25 @@ struct testcase_t crypto_tests[] = {
CRYPTO_LEGACY(dh),
CRYPTO_LEGACY(s2k_rfc2440),
#ifdef HAVE_LIBSCRYPT_H
- { "s2k_scrypt", test_crypto_s2k_general, 0, &pass_data,
+ { "s2k_scrypt", test_crypto_s2k_general, 0, &passthrough_setup,
(void*)"scrypt" },
- { "s2k_scrypt_low", test_crypto_s2k_general, 0, &pass_data,
+ { "s2k_scrypt_low", test_crypto_s2k_general, 0, &passthrough_setup,
(void*)"scrypt-low" },
#endif
- { "s2k_pbkdf2", test_crypto_s2k_general, 0, &pass_data,
+ { "s2k_pbkdf2", test_crypto_s2k_general, 0, &passthrough_setup,
(void*)"pbkdf2" },
- { "s2k_rfc2440_general", test_crypto_s2k_general, 0, &pass_data,
+ { "s2k_rfc2440_general", test_crypto_s2k_general, 0, &passthrough_setup,
(void*)"rfc2440" },
- { "s2k_rfc2440_legacy", test_crypto_s2k_general, 0, &pass_data,
+ { "s2k_rfc2440_legacy", test_crypto_s2k_general, 0, &passthrough_setup,
(void*)"rfc2440-legacy" },
{ "s2k_errors", test_crypto_s2k_errors, 0, NULL, NULL },
{ "scrypt_vectors", test_crypto_scrypt_vectors, 0, NULL, NULL },
{ "pbkdf2_vectors", test_crypto_pbkdf2_vectors, 0, NULL, NULL },
{ "pwbox", test_crypto_pwbox, 0, NULL, NULL },
- { "aes_iv_AES", test_crypto_aes_iv, TT_FORK, &pass_data, (void*)"aes" },
- { "aes_iv_EVP", test_crypto_aes_iv, TT_FORK, &pass_data, (void*)"evp" },
+ { "aes_iv_AES", test_crypto_aes_iv, TT_FORK, &passthrough_setup,
+ (void*)"aes" },
+ { "aes_iv_EVP", test_crypto_aes_iv, TT_FORK, &passthrough_setup,
+ (void*)"evp" },
CRYPTO_LEGACY(base32_decode),
{ "kdf_TAP", test_crypto_kdf_TAP, 0, NULL, NULL },
{ "hkdf_sha256", test_crypto_hkdf_sha256, 0, NULL, NULL },
diff --git a/src/test/test_threads.c b/src/test/test_threads.c
new file mode 100644
index 0000000000..2ac08d4d28
--- /dev/null
+++ b/src/test/test_threads.c
@@ -0,0 +1,316 @@
+/* Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2015, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "orconfig.h"
+#include "or.h"
+#include "compat_threads.h"
+#include "test.h"
+
+/** mutex for thread test to stop the threads hitting data at the same time. */
+static tor_mutex_t *thread_test_mutex_ = NULL;
+/** mutexes for the thread test to make sure that the threads have to
+ * interleave somewhat. */
+static tor_mutex_t *thread_test_start1_ = NULL,
+ *thread_test_start2_ = NULL;
+/** Shared strmap for the thread test. */
+static strmap_t *thread_test_strmap_ = NULL;
+/** The name of thread1 for the thread test */
+static char *thread1_name_ = NULL;
+/** The name of thread2 for the thread test */
+static char *thread2_name_ = NULL;
+
+static int thread_fns_failed = 0;
+
+static unsigned long thread_fn_tid1, thread_fn_tid2;
+
+static void thread_test_func_(void* _s) ATTR_NORETURN;
+
+/** How many iterations have the threads in the unit test run? */
+static int t1_count = 0, t2_count = 0;
+
+/** Helper function for threading unit tests: This function runs in a
+ * subthread. It grabs its own mutex (start1 or start2) to make sure that it
+ * should start, then it repeatedly alters _test_thread_strmap protected by
+ * thread_test_mutex_. */
+static void
+thread_test_func_(void* _s)
+{
+ char *s = _s;
+ int i, *count;
+ tor_mutex_t *m;
+ char buf[64];
+ char **cp;
+ if (!strcmp(s, "thread 1")) {
+ m = thread_test_start1_;
+ cp = &thread1_name_;
+ count = &t1_count;
+ thread_fn_tid1 = tor_get_thread_id();
+ } else {
+ m = thread_test_start2_;
+ cp = &thread2_name_;
+ count = &t2_count;
+ thread_fn_tid2 = tor_get_thread_id();
+ }
+
+ tor_snprintf(buf, sizeof(buf), "%lu", tor_get_thread_id());
+ *cp = tor_strdup(buf);
+
+ tor_mutex_acquire(m);
+
+ for (i=0; i<10000; ++i) {
+ tor_mutex_acquire(thread_test_mutex_);
+ strmap_set(thread_test_strmap_, "last to run", *cp);
+ ++*count;
+ tor_mutex_release(thread_test_mutex_);
+ }
+ tor_mutex_acquire(thread_test_mutex_);
+ strmap_set(thread_test_strmap_, s, *cp);
+ if (in_main_thread())
+ ++thread_fns_failed;
+ tor_mutex_release(thread_test_mutex_);
+
+ tor_mutex_release(m);
+
+ spawn_exit();
+}
+
+/** Run unit tests for threading logic. */
+static void
+test_threads_basic(void *arg)
+{
+ char *s1 = NULL, *s2 = NULL;
+ int done = 0, timedout = 0;
+ time_t started;
+#ifndef _WIN32
+ struct timeval tv;
+ tv.tv_sec=0;
+ tv.tv_usec=100*1000;
+#endif
+ (void) arg;
+
+ set_main_thread();
+
+ thread_test_mutex_ = tor_mutex_new();
+ thread_test_start1_ = tor_mutex_new();
+ thread_test_start2_ = tor_mutex_new();
+ thread_test_strmap_ = strmap_new();
+ s1 = tor_strdup("thread 1");
+ s2 = tor_strdup("thread 2");
+ tor_mutex_acquire(thread_test_start1_);
+ tor_mutex_acquire(thread_test_start2_);
+ spawn_func(thread_test_func_, s1);
+ spawn_func(thread_test_func_, s2);
+ tor_mutex_release(thread_test_start2_);
+ tor_mutex_release(thread_test_start1_);
+ started = time(NULL);
+ while (!done) {
+ tor_mutex_acquire(thread_test_mutex_);
+ strmap_assert_ok(thread_test_strmap_);
+ if (strmap_get(thread_test_strmap_, "thread 1") &&
+ strmap_get(thread_test_strmap_, "thread 2")) {
+ done = 1;
+ } else if (time(NULL) > started + 150) {
+ timedout = done = 1;
+ }
+ tor_mutex_release(thread_test_mutex_);
+#ifndef _WIN32
+ /* Prevent the main thread from starving the worker threads. */
+ select(0, NULL, NULL, NULL, &tv);
+#endif
+ }
+ tor_mutex_acquire(thread_test_start1_);
+ tor_mutex_release(thread_test_start1_);
+ tor_mutex_acquire(thread_test_start2_);
+ tor_mutex_release(thread_test_start2_);
+
+ tor_mutex_free(thread_test_mutex_);
+
+ if (timedout) {
+ printf("\nTimed out: %d %d", t1_count, t2_count);
+ tt_assert(strmap_get(thread_test_strmap_, "thread 1"));
+ tt_assert(strmap_get(thread_test_strmap_, "thread 2"));
+ tt_assert(!timedout);
+ }
+
+ /* different thread IDs. */
+ tt_assert(strcmp(strmap_get(thread_test_strmap_, "thread 1"),
+ strmap_get(thread_test_strmap_, "thread 2")));
+ tt_assert(!strcmp(strmap_get(thread_test_strmap_, "thread 1"),
+ strmap_get(thread_test_strmap_, "last to run")) ||
+ !strcmp(strmap_get(thread_test_strmap_, "thread 2"),
+ strmap_get(thread_test_strmap_, "last to run")));
+
+ tt_int_op(thread_fns_failed, ==, 0);
+ tt_int_op(thread_fn_tid1, !=, thread_fn_tid2);
+
+ done:
+ tor_free(s1);
+ tor_free(s2);
+ tor_free(thread1_name_);
+ tor_free(thread2_name_);
+ if (thread_test_strmap_)
+ strmap_free(thread_test_strmap_, NULL);
+ if (thread_test_start1_)
+ tor_mutex_free(thread_test_start1_);
+ if (thread_test_start2_)
+ tor_mutex_free(thread_test_start2_);
+}
+
+typedef struct cv_testinfo_s {
+ tor_cond_t *cond;
+ tor_mutex_t *mutex;
+ int value;
+ int addend;
+ int shutdown;
+ int n_shutdown;
+ int n_wakeups;
+ int n_timeouts;
+ int n_threads;
+ const struct timeval *tv;
+} cv_testinfo_t;
+
+static cv_testinfo_t *
+cv_testinfo_new(void)
+{
+ cv_testinfo_t *i = tor_malloc_zero(sizeof(*i));
+ i->cond = tor_cond_new();
+ i->mutex = tor_mutex_new_nonrecursive();
+ return i;
+}
+
+static void
+cv_testinfo_free(cv_testinfo_t *i)
+{
+ if (!i)
+ return;
+ tor_cond_free(i->cond);
+ tor_mutex_free(i->mutex);
+ tor_free(i);
+}
+
+static void cv_test_thr_fn_(void *arg) ATTR_NORETURN;
+
+static void
+cv_test_thr_fn_(void *arg)
+{
+ cv_testinfo_t *i = arg;
+ int tid, r;
+
+ tor_mutex_acquire(i->mutex);
+ tid = i->n_threads++;
+ tor_mutex_release(i->mutex);
+ (void) tid;
+
+ tor_mutex_acquire(i->mutex);
+ while (1) {
+ if (i->addend) {
+ i->value += i->addend;
+ i->addend = 0;
+ }
+
+ if (i->shutdown) {
+ ++i->n_shutdown;
+ i->shutdown = 0;
+ tor_mutex_release(i->mutex);
+ spawn_exit();
+ }
+ r = tor_cond_wait(i->cond, i->mutex, i->tv);
+ ++i->n_wakeups;
+ if (r == 1) {
+ ++i->n_timeouts;
+ tor_mutex_release(i->mutex);
+ spawn_exit();
+ }
+ }
+}
+
+static void
+test_threads_conditionvar(void *arg)
+{
+ cv_testinfo_t *ti=NULL;
+ const struct timeval msec100 = { 0, 100*1000 };
+ const int timeout = !strcmp(arg, "tv");
+
+ ti = cv_testinfo_new();
+ if (timeout) {
+ ti->tv = &msec100;
+ }
+ spawn_func(cv_test_thr_fn_, ti);
+ spawn_func(cv_test_thr_fn_, ti);
+ spawn_func(cv_test_thr_fn_, ti);
+ spawn_func(cv_test_thr_fn_, ti);
+
+ tor_mutex_acquire(ti->mutex);
+ ti->addend = 7;
+ ti->shutdown = 1;
+ tor_cond_signal_one(ti->cond);
+ tor_mutex_release(ti->mutex);
+
+#define SPIN() \
+ while (1) { \
+ tor_mutex_acquire(ti->mutex); \
+ if (ti->addend == 0) { \
+ break; \
+ } \
+ tor_mutex_release(ti->mutex); \
+ }
+
+ SPIN();
+
+ ti->addend = 30;
+ ti->shutdown = 1;
+ tor_cond_signal_all(ti->cond);
+ tor_mutex_release(ti->mutex);
+ SPIN();
+
+ ti->addend = 1000;
+ if (! timeout) ti->shutdown = 1;
+ tor_cond_signal_one(ti->cond);
+ tor_mutex_release(ti->mutex);
+ SPIN();
+ ti->addend = 300;
+ if (! timeout) ti->shutdown = 1;
+ tor_cond_signal_all(ti->cond);
+ tor_mutex_release(ti->mutex);
+
+ SPIN();
+ tor_mutex_release(ti->mutex);
+
+ tt_int_op(ti->value, ==, 1337);
+ if (!timeout) {
+ tt_int_op(ti->n_shutdown, ==, 4);
+ } else {
+#ifdef _WIN32
+ Sleep(500); /* msec */
+#elif defined(HAVE_USLEEP)
+ usleep(500*1000); /* usec */
+#else
+ {
+ struct tv = { 0, 500*1000 };
+ select(0, NULL, NULL, NULL, &tv);
+ }
+#endif
+ tor_mutex_acquire(ti->mutex);
+ tt_int_op(ti->n_shutdown, ==, 2);
+ tt_int_op(ti->n_timeouts, ==, 2);
+ tor_mutex_release(ti->mutex);
+ }
+
+ done:
+ cv_testinfo_free(ti);
+}
+
+#define THREAD_TEST(name) \
+ { #name, test_threads_##name, TT_FORK, NULL, NULL }
+
+struct testcase_t thread_tests[] = {
+ THREAD_TEST(basic),
+ { "conditionvar", test_threads_conditionvar, TT_FORK,
+ &passthrough_setup, (void*)"no-tv" },
+ { "conditionvar_timeout", test_threads_conditionvar, TT_FORK,
+ &passthrough_setup, (void*)"tv" },
+ END_OF_TESTCASES
+};
+
diff --git a/src/test/test_util.c b/src/test/test_util.c
index 2ee1d6dfc1..b53c8fc7a3 100644
--- a/src/test/test_util.c
+++ b/src/test/test_util.c
@@ -1607,142 +1607,6 @@ test_util_pow2(void *arg)
;
}
-/** mutex for thread test to stop the threads hitting data at the same time. */
-static tor_mutex_t *thread_test_mutex_ = NULL;
-/** mutexes for the thread test to make sure that the threads have to
- * interleave somewhat. */
-static tor_mutex_t *thread_test_start1_ = NULL,
- *thread_test_start2_ = NULL;
-/** Shared strmap for the thread test. */
-static strmap_t *thread_test_strmap_ = NULL;
-/** The name of thread1 for the thread test */
-static char *thread1_name_ = NULL;
-/** The name of thread2 for the thread test */
-static char *thread2_name_ = NULL;
-
-static void thread_test_func_(void* _s) ATTR_NORETURN;
-
-/** How many iterations have the threads in the unit test run? */
-static int t1_count = 0, t2_count = 0;
-
-/** Helper function for threading unit tests: This function runs in a
- * subthread. It grabs its own mutex (start1 or start2) to make sure that it
- * should start, then it repeatedly alters _test_thread_strmap protected by
- * thread_test_mutex_. */
-static void
-thread_test_func_(void* _s)
-{
- char *s = _s;
- int i, *count;
- tor_mutex_t *m;
- char buf[64];
- char **cp;
- if (!strcmp(s, "thread 1")) {
- m = thread_test_start1_;
- cp = &thread1_name_;
- count = &t1_count;
- } else {
- m = thread_test_start2_;
- cp = &thread2_name_;
- count = &t2_count;
- }
-
- tor_snprintf(buf, sizeof(buf), "%lu", tor_get_thread_id());
- *cp = tor_strdup(buf);
-
- tor_mutex_acquire(m);
-
- for (i=0; i<10000; ++i) {
- tor_mutex_acquire(thread_test_mutex_);
- strmap_set(thread_test_strmap_, "last to run", *cp);
- ++*count;
- tor_mutex_release(thread_test_mutex_);
- }
- tor_mutex_acquire(thread_test_mutex_);
- strmap_set(thread_test_strmap_, s, *cp);
- tor_mutex_release(thread_test_mutex_);
-
- tor_mutex_release(m);
-
- spawn_exit();
-}
-
-/** Run unit tests for threading logic. */
-static void
-test_util_threads(void *arg)
-{
- char *s1 = NULL, *s2 = NULL;
- int done = 0, timedout = 0;
- time_t started;
-#ifndef _WIN32
- struct timeval tv;
- tv.tv_sec=0;
- tv.tv_usec=100*1000;
-#endif
- (void)arg;
- thread_test_mutex_ = tor_mutex_new();
- thread_test_start1_ = tor_mutex_new();
- thread_test_start2_ = tor_mutex_new();
- thread_test_strmap_ = strmap_new();
- s1 = tor_strdup("thread 1");
- s2 = tor_strdup("thread 2");
- tor_mutex_acquire(thread_test_start1_);
- tor_mutex_acquire(thread_test_start2_);
- spawn_func(thread_test_func_, s1);
- spawn_func(thread_test_func_, s2);
- tor_mutex_release(thread_test_start2_);
- tor_mutex_release(thread_test_start1_);
- started = time(NULL);
- while (!done) {
- tor_mutex_acquire(thread_test_mutex_);
- strmap_assert_ok(thread_test_strmap_);
- if (strmap_get(thread_test_strmap_, "thread 1") &&
- strmap_get(thread_test_strmap_, "thread 2")) {
- done = 1;
- } else if (time(NULL) > started + 150) {
- timedout = done = 1;
- }
- tor_mutex_release(thread_test_mutex_);
-#ifndef _WIN32
- /* Prevent the main thread from starving the worker threads. */
- select(0, NULL, NULL, NULL, &tv);
-#endif
- }
- tor_mutex_acquire(thread_test_start1_);
- tor_mutex_release(thread_test_start1_);
- tor_mutex_acquire(thread_test_start2_);
- tor_mutex_release(thread_test_start2_);
-
- tor_mutex_free(thread_test_mutex_);
-
- if (timedout) {
- printf("\nTimed out: %d %d", t1_count, t2_count);
- tt_assert(strmap_get(thread_test_strmap_, "thread 1"));
- tt_assert(strmap_get(thread_test_strmap_, "thread 2"));
- tt_assert(!timedout);
- }
-
- /* different thread IDs. */
- tt_assert(strcmp(strmap_get(thread_test_strmap_, "thread 1"),
- strmap_get(thread_test_strmap_, "thread 2")));
- tt_assert(!strcmp(strmap_get(thread_test_strmap_, "thread 1"),
- strmap_get(thread_test_strmap_, "last to run")) ||
- !strcmp(strmap_get(thread_test_strmap_, "thread 2"),
- strmap_get(thread_test_strmap_, "last to run")));
-
- done:
- tor_free(s1);
- tor_free(s2);
- tor_free(thread1_name_);
- tor_free(thread2_name_);
- if (thread_test_strmap_)
- strmap_free(thread_test_strmap_, NULL);
- if (thread_test_start1_)
- tor_mutex_free(thread_test_start1_);
- if (thread_test_start2_)
- tor_mutex_free(thread_test_start2_);
-}
-
/** Run unit tests for compression functions */
static void
test_util_gzip(void *arg)
@@ -4783,23 +4647,6 @@ test_util_socket(void *arg)
tor_close_socket(fd4);
}
-static void *
-socketpair_test_setup(const struct testcase_t *testcase)
-{
- return testcase->setup_data;
-}
-static int
-socketpair_test_cleanup(const struct testcase_t *testcase, void *ptr)
-{
- (void)testcase;
- (void)ptr;
- return 1;
-}
-
-static const struct testcase_setup_t socketpair_setup = {
- socketpair_test_setup, socketpair_test_cleanup
-};
-
/* Test for socketpair and ersatz_socketpair(). We test them both, since
* the latter is a tolerably good way to exersize tor_accept_socket(). */
static void
@@ -4928,7 +4775,6 @@ struct testcase_t util_tests[] = {
UTIL_LEGACY(memarea),
UTIL_LEGACY(control_formats),
UTIL_LEGACY(mmap),
- UTIL_LEGACY(threads),
UTIL_LEGACY(sscanf),
UTIL_LEGACY(format_time_interval),
UTIL_LEGACY(path_is_relative),
@@ -4975,10 +4821,10 @@ struct testcase_t util_tests[] = {
UTIL_TEST(mathlog, 0),
UTIL_TEST(weak_random, 0),
UTIL_TEST(socket, TT_FORK),
- { "socketpair", test_util_socketpair, TT_FORK, &socketpair_setup,
+ { "socketpair", test_util_socketpair, TT_FORK, &passthrough_setup,
(void*)"0" },
{ "socketpair_ersatz", test_util_socketpair, TT_FORK,
- &socketpair_setup, (void*)"1" },
+ &passthrough_setup, (void*)"1" },
UTIL_TEST(max_mem, 0),
UTIL_TEST(hostname_validation, 0),
UTIL_TEST(ipv4_validation, 0),
diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c
new file mode 100644
index 0000000000..aaff5069be
--- /dev/null
+++ b/src/test/test_workqueue.c
@@ -0,0 +1,409 @@
+/* Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2015, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "or.h"
+#include "compat_threads.h"
+#include "onion.h"
+#include "workqueue.h"
+#include "crypto.h"
+#include "crypto_curve25519.h"
+#include "compat_libevent.h"
+
+#include <stdio.h>
+#ifdef HAVE_EVENT2_EVENT_H
+#include <event2/event.h>
+#else
+#include <event.h>
+#endif
+
+static int opt_verbose = 0;
+static int opt_n_threads = 8;
+static int opt_n_items = 10000;
+static int opt_n_inflight = 1000;
+static int opt_n_lowwater = 250;
+static int opt_n_cancel = 0;
+static int opt_ratio_rsa = 5;
+
+#ifdef TRACK_RESPONSES
+tor_mutex_t bitmap_mutex;
+int handled_len;
+bitarray_t *handled;
+#endif
+
+typedef struct state_s {
+ int magic;
+ int n_handled;
+ crypto_pk_t *rsa;
+ curve25519_secret_key_t ecdh;
+ int is_shutdown;
+} state_t;
+
+typedef struct rsa_work_s {
+ int serial;
+ uint8_t msg[128];
+ uint8_t msglen;
+} rsa_work_t;
+
+typedef struct ecdh_work_s {
+ int serial;
+ union {
+ curve25519_public_key_t pk;
+ uint8_t msg[32];
+ } u;
+} ecdh_work_t;
+
+static void
+mark_handled(int serial)
+{
+#ifdef TRACK_RESPONSES
+ tor_mutex_acquire(&bitmap_mutex);
+ tor_assert(serial < handled_len);
+ tor_assert(! bitarray_is_set(handled, serial));
+ bitarray_set(handled, serial);
+ tor_mutex_release(&bitmap_mutex);
+#else
+ (void)serial;
+#endif
+}
+
+static int
+workqueue_do_rsa(void *state, void *work)
+{
+ rsa_work_t *rw = work;
+ state_t *st = state;
+ crypto_pk_t *rsa = st->rsa;
+ uint8_t sig[256];
+ int len;
+
+ tor_assert(st->magic == 13371337);
+
+ len = crypto_pk_private_sign(rsa, (char*)sig, 256,
+ (char*)rw->msg, rw->msglen);
+ if (len < 0) {
+ rw->msglen = 0;
+ return WQ_RPL_ERROR;
+ }
+
+ memset(rw->msg, 0, sizeof(rw->msg));
+ rw->msglen = len;
+ memcpy(rw->msg, sig, len);
+ ++st->n_handled;
+
+ mark_handled(rw->serial);
+
+ return WQ_RPL_REPLY;
+}
+
+static int
+workqueue_do_shutdown(void *state, void *work)
+{
+ (void)state;
+ (void)work;
+ crypto_pk_free(((state_t*)state)->rsa);
+ tor_free(state);
+ return WQ_RPL_SHUTDOWN;
+}
+
+static int
+workqueue_do_ecdh(void *state, void *work)
+{
+ ecdh_work_t *ew = work;
+ uint8_t output[CURVE25519_OUTPUT_LEN];
+ state_t *st = state;
+
+ tor_assert(st->magic == 13371337);
+
+ curve25519_handshake(output, &st->ecdh, &ew->u.pk);
+ memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN);
+ ++st->n_handled;
+ mark_handled(ew->serial);
+ return WQ_RPL_REPLY;
+}
+
+static void *
+new_state(void *arg)
+{
+ state_t *st;
+ (void)arg;
+
+ st = tor_malloc(sizeof(*st));
+ /* Every thread gets its own keys. not a problem for benchmarking */
+ st->rsa = crypto_pk_new();
+ if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) {
+ crypto_pk_free(st->rsa);
+ tor_free(st);
+ return NULL;
+ }
+ curve25519_secret_key_generate(&st->ecdh, 0);
+ st->magic = 13371337;
+ return st;
+}
+
+static void
+free_state(void *arg)
+{
+ state_t *st = arg;
+ crypto_pk_free(st->rsa);
+ tor_free(st);
+}
+
+static tor_weak_rng_t weak_rng;
+static int n_sent = 0;
+static int rsa_sent = 0;
+static int ecdh_sent = 0;
+static int n_received = 0;
+
+#ifdef TRACK_RESPONSES
+bitarray_t *received;
+#endif
+
+static void
+handle_reply(void *arg)
+{
+#ifdef TRACK_RESPONSES
+ rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */
+ tor_assert(! bitarray_is_set(received, rw->serial));
+ bitarray_set(received,rw->serial);
+#endif
+
+ tor_free(arg);
+ ++n_received;
+}
+
+static workqueue_entry_t *
+add_work(threadpool_t *tp)
+{
+ int add_rsa =
+ opt_ratio_rsa == 0 ||
+ tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0;
+
+ if (add_rsa) {
+ rsa_work_t *w = tor_malloc_zero(sizeof(*w));
+ w->serial = n_sent++;
+ crypto_rand((char*)w->msg, 20);
+ w->msglen = 20;
+ ++rsa_sent;
+ return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w);
+ } else {
+ ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
+ w->serial = n_sent++;
+ /* Not strictly right, but this is just for benchmarks. */
+ crypto_rand((char*)w->u.pk.public_key, 32);
+ ++ecdh_sent;
+ return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w);
+ }
+}
+
+static int n_failed_cancel = 0;
+static int n_successful_cancel = 0;
+
+static int
+add_n_work_items(threadpool_t *tp, int n)
+{
+ int n_queued = 0;
+ int n_try_cancel = 0, i;
+ workqueue_entry_t **to_cancel;
+ workqueue_entry_t *ent;
+
+ to_cancel = tor_malloc(sizeof(workqueue_entry_t*) * opt_n_cancel);
+
+ while (n_queued++ < n) {
+ ent = add_work(tp);
+ if (! ent) {
+ tor_event_base_loopexit(tor_libevent_get_base(), NULL);
+ return -1;
+ }
+ if (n_try_cancel < opt_n_cancel &&
+ tor_weak_random_range(&weak_rng, n) < opt_n_cancel) {
+ to_cancel[n_try_cancel++] = ent;
+ }
+ }
+
+ for (i = 0; i < n_try_cancel; ++i) {
+ void *work = workqueue_entry_cancel(to_cancel[i]);
+ if (! work) {
+ n_failed_cancel++;
+ } else {
+ n_successful_cancel++;
+ tor_free(work);
+ }
+ }
+
+ tor_free(to_cancel);
+ return 0;
+}
+
+static int shutting_down = 0;
+
+static void
+replysock_readable_cb(tor_socket_t sock, short what, void *arg)
+{
+ threadpool_t *tp = arg;
+ replyqueue_t *rq = threadpool_get_replyqueue(tp);
+
+ int old_r = n_received;
+ (void) sock;
+ (void) what;
+
+ replyqueue_process(rq);
+ if (old_r == n_received)
+ return;
+
+ if (opt_verbose) {
+ printf("%d / %d", n_received, n_sent);
+ if (opt_n_cancel)
+ printf(" (%d cancelled, %d uncancellable)",
+ n_successful_cancel, n_failed_cancel);
+ puts("");
+ }
+#ifdef TRACK_RESPONSES
+ tor_mutex_acquire(&bitmap_mutex);
+ for (i = 0; i < opt_n_items; ++i) {
+ if (bitarray_is_set(received, i))
+ putc('o', stdout);
+ else if (bitarray_is_set(handled, i))
+ putc('!', stdout);
+ else
+ putc('.', stdout);
+ }
+ puts("");
+ tor_mutex_release(&bitmap_mutex);
+#endif
+
+ if (n_sent - (n_received+n_successful_cancel) < opt_n_lowwater) {
+ int n_to_send = n_received + opt_n_inflight - n_sent;
+ if (n_to_send > opt_n_items - n_sent)
+ n_to_send = opt_n_items - n_sent;
+ add_n_work_items(tp, n_to_send);
+ }
+
+ if (shutting_down == 0 &&
+ n_received+n_successful_cancel == n_sent &&
+ n_sent >= opt_n_items) {
+ shutting_down = 1;
+ threadpool_queue_update(tp, NULL,
+ workqueue_do_shutdown, NULL, NULL);
+ }
+}
+
+static void
+help(void)
+{
+ puts(
+ "Options:\n"
+ " -N <items> Run this many items of work\n"
+ " -T <threads> Use this many threads\n"
+ " -I <inflight> Have no more than this many requests queued at once\n"
+ " -L <lowwater> Add items whenever fewer than this many are pending\n"
+ " -C <cancel> Try to cancel N items of every batch that we add\n"
+ " -R <ratio> Make one out of this many items be a slow (RSA) one\n"
+ " --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n"
+ " Disable one of the alert_socket backends.");
+}
+
+int
+main(int argc, char **argv)
+{
+ replyqueue_t *rq;
+ threadpool_t *tp;
+ int i;
+ tor_libevent_cfg evcfg;
+ struct event *ev;
+ uint32_t as_flags = 0;
+
+ for (i = 1; i < argc; ++i) {
+ if (!strcmp(argv[i], "-v")) {
+ opt_verbose = 1;
+ } else if (!strcmp(argv[i], "-T") && i+1<argc) {
+ opt_n_threads = atoi(argv[++i]);
+ } else if (!strcmp(argv[i], "-N") && i+1<argc) {
+ opt_n_items = atoi(argv[++i]);
+ } else if (!strcmp(argv[i], "-I") && i+1<argc) {
+ opt_n_inflight = atoi(argv[++i]);
+ } else if (!strcmp(argv[i], "-L") && i+1<argc) {
+ opt_n_lowwater = atoi(argv[++i]);
+ } else if (!strcmp(argv[i], "-R") && i+1<argc) {
+ opt_ratio_rsa = atoi(argv[++i]);
+ } else if (!strcmp(argv[i], "-C") && i+1<argc) {
+ opt_n_cancel = atoi(argv[++i]);
+ } else if (!strcmp(argv[i], "--no-eventfd2")) {
+ as_flags |= ASOCKS_NOEVENTFD2;
+ } else if (!strcmp(argv[i], "--no-eventfd")) {
+ as_flags |= ASOCKS_NOEVENTFD;
+ } else if (!strcmp(argv[i], "--no-pipe2")) {
+ as_flags |= ASOCKS_NOPIPE2;
+ } else if (!strcmp(argv[i], "--no-pipe")) {
+ as_flags |= ASOCKS_NOPIPE;
+ } else if (!strcmp(argv[i], "--no-socketpair")) {
+ as_flags |= ASOCKS_NOSOCKETPAIR;
+ } else if (!strcmp(argv[i], "-h")) {
+ help();
+ return 0;
+ } else {
+ help();
+ return 1;
+ }
+ }
+ if (opt_n_threads < 1 ||
+ opt_n_items < 1 || opt_n_inflight < 1 || opt_n_lowwater < 0 ||
+ opt_n_cancel > opt_n_inflight ||
+ opt_ratio_rsa < 0) {
+ help();
+ return 1;
+ }
+
+ init_logging(1);
+ crypto_global_init(1, NULL, NULL);
+ crypto_seed_rng(1);
+
+ rq = replyqueue_new(as_flags);
+ tor_assert(rq);
+ tp = threadpool_new(opt_n_threads,
+ rq, new_state, free_state, NULL);
+ tor_assert(tp);
+
+ crypto_seed_weak_rng(&weak_rng);
+
+ memset(&evcfg, 0, sizeof(evcfg));
+ tor_libevent_initialize(&evcfg);
+
+ ev = tor_event_new(tor_libevent_get_base(),
+ replyqueue_get_socket(rq), EV_READ|EV_PERSIST,
+ replysock_readable_cb, tp);
+
+ event_add(ev, NULL);
+
+#ifdef TRACK_RESPONSES
+ handled = bitarray_init_zero(opt_n_items);
+ received = bitarray_init_zero(opt_n_items);
+ tor_mutex_init(&bitmap_mutex);
+ handled_len = opt_n_items;
+#endif
+
+ for (i = 0; i < opt_n_inflight; ++i) {
+ if (! add_work(tp)) {
+ puts("Couldn't add work.");
+ return 1;
+ }
+ }
+
+ {
+ struct timeval limit = { 30, 0 };
+ tor_event_base_loopexit(tor_libevent_get_base(), &limit);
+ }
+
+ event_base_loop(tor_libevent_get_base(), 0);
+
+ if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent) {
+ printf("%d vs %d\n", n_sent, opt_n_items);
+ printf("%d+%d vs %d\n", n_received, n_successful_cancel, n_sent);
+ puts("FAIL");
+ return 1;
+ } else {
+ puts("OK");
+ return 0;
+ }
+}
+