diff options
Diffstat (limited to 'src/common')
-rw-r--r-- | src/common/address.c | 245 | ||||
-rw-r--r-- | src/common/address.h | 53 | ||||
-rw-r--r-- | src/common/compat.c | 396 | ||||
-rw-r--r-- | src/common/compat.h | 57 | ||||
-rw-r--r-- | src/common/compat_libevent.h | 1 | ||||
-rw-r--r-- | src/common/compat_pthreads.c | 287 | ||||
-rw-r--r-- | src/common/compat_threads.c | 302 | ||||
-rw-r--r-- | src/common/compat_threads.h | 115 | ||||
-rw-r--r-- | src/common/compat_winthreads.c | 196 | ||||
-rw-r--r-- | src/common/include.am | 15 | ||||
-rw-r--r-- | src/common/tortls.c | 22 | ||||
-rw-r--r-- | src/common/util.c | 68 | ||||
-rw-r--r-- | src/common/util.h | 5 | ||||
-rw-r--r-- | src/common/workqueue.c | 490 | ||||
-rw-r--r-- | src/common/workqueue.h | 48 |
15 files changed, 1769 insertions, 531 deletions
diff --git a/src/common/address.c b/src/common/address.c index a80926049a..3b4be1d601 100644 --- a/src/common/address.c +++ b/src/common/address.c @@ -8,24 +8,26 @@ * \brief Functions to use and manipulate the tor_addr_t structure. **/ -#include "orconfig.h" -#include "compat.h" -#include "util.h" -#include "address.h" -#include "torlog.h" -#include "container.h" -#include "sandbox.h" +#define ADDRESS_PRIVATE #ifdef _WIN32 -#include <process.h> -#include <windows.h> -#include <winsock2.h> /* For access to structs needed by GetAdaptersAddresses */ #undef _WIN32_WINNT #define _WIN32_WINNT 0x0501 +#include <process.h> +#include <winsock2.h> +#include <windows.h> #include <iphlpapi.h> #endif +#include "orconfig.h" +#include "compat.h" +#include "util.h" +#include "address.h" +#include "torlog.h" +#include "container.h" +#include "sandbox.h" + #ifdef HAVE_SYS_TIME_H #include <sys/time.h> #endif @@ -121,8 +123,17 @@ tor_addr_to_sockaddr(const tor_addr_t *a, } } +/** Set address <b>a</b> to zero. This address belongs to + * the AF_UNIX family. */ +static void +tor_addr_make_af_unix(tor_addr_t *a) +{ + memset(a, 0, sizeof(*a)); + a->family = AF_UNIX; +} + /** Set the tor_addr_t in <b>a</b> to contain the socket address contained in - * <b>sa</b>. */ + * <b>sa</b>. Return 0 on success and -1 on failure. */ int tor_addr_from_sockaddr(tor_addr_t *a, const struct sockaddr *sa, uint16_t *port_out) @@ -142,6 +153,9 @@ tor_addr_from_sockaddr(tor_addr_t *a, const struct sockaddr *sa, tor_addr_from_in6(a, &sin6->sin6_addr); if (port_out) *port_out = ntohs(sin6->sin6_port); + } else if (sa->sa_family == AF_UNIX) { + tor_addr_make_af_unix(a); + return 0; } else { tor_addr_make_unspec(a); return -1; @@ -421,6 +435,10 @@ tor_addr_to_str(char *dest, const tor_addr_t *addr, size_t len, int decorate) ptr = dest; } break; + case AF_UNIX: + tor_snprintf(dest, len, "AF_UNIX"); + ptr = dest; + break; default: return NULL; } @@ -816,6 +834,8 @@ tor_addr_is_null(const tor_addr_t *addr) } case AF_INET: return (tor_addr_to_ipv4n(addr) == 0); + case AF_UNIX: + return 1; case AF_UNSPEC: return 1; default: @@ -1207,26 +1227,17 @@ typedef ULONG (WINAPI *GetAdaptersAddresses_fn_t)( ULONG, ULONG, PVOID, PIP_ADAPTER_ADDRESSES, PULONG); #endif -/** Try to ask our network interfaces what addresses they are bound to. - * Return a new smartlist of tor_addr_t on success, and NULL on failure. - * (An empty smartlist indicates that we successfully learned that we have no - * addresses.) Log failure messages at <b>severity</b>. */ -static smartlist_t * -get_interface_addresses_raw(int severity) +#ifdef HAVE_IFADDRS_TO_SMARTLIST +/* + * Convert a linked list consisting of <b>ifaddrs</b> structures + * into smartlist of <b>tor_addr_t</b> structures. + */ +STATIC smartlist_t * +ifaddrs_to_smartlist(const struct ifaddrs *ifa) { -#if defined(HAVE_GETIFADDRS) - /* Most free Unixy systems provide getifaddrs, which gives us a linked list - * of struct ifaddrs. */ - struct ifaddrs *ifa = NULL; + smartlist_t *result = smartlist_new(); const struct ifaddrs *i; - smartlist_t *result; - if (getifaddrs(&ifa) < 0) { - log_fn(severity, LD_NET, "Unable to call getifaddrs(): %s", - strerror(errno)); - return NULL; - } - result = smartlist_new(); for (i = ifa; i; i = i->ifa_next) { tor_addr_t tmp; if ((i->ifa_flags & (IFF_UP | IFF_RUNNING)) != (IFF_UP | IFF_RUNNING)) @@ -1241,9 +1252,72 @@ get_interface_addresses_raw(int severity) smartlist_add(result, tor_memdup(&tmp, sizeof(tmp))); } + return result; +} + +/** Use getiffaddrs() function to get list of current machine + * network interface addresses. Represent the result by smartlist of + * <b>tor_addr_t</b> structures. + */ +STATIC smartlist_t * +get_interface_addresses_ifaddrs(int severity) +{ + + /* Most free Unixy systems provide getifaddrs, which gives us a linked list + * of struct ifaddrs. */ + struct ifaddrs *ifa = NULL; + smartlist_t *result; + if (getifaddrs(&ifa) < 0) { + log_fn(severity, LD_NET, "Unable to call getifaddrs(): %s", + strerror(errno)); + return NULL; + } + + result = ifaddrs_to_smartlist(ifa); + freeifaddrs(ifa); + return result; -#elif defined(_WIN32) +} +#endif + +#ifdef HAVE_IP_ADAPTER_TO_SMARTLIST + +/** Convert a Windows-specific <b>addresses</b> linked list into smartlist + * of <b>tor_addr_t</b> structures. + */ + +STATIC smartlist_t * +ip_adapter_addresses_to_smartlist(const IP_ADAPTER_ADDRESSES *addresses) +{ + smartlist_t *result = smartlist_new(); + const IP_ADAPTER_ADDRESSES *address; + + for (address = addresses; address; address = address->Next) { + const IP_ADAPTER_UNICAST_ADDRESS *a; + for (a = address->FirstUnicastAddress; a; a = a->Next) { + /* Yes, it's a linked list inside a linked list */ + const struct sockaddr *sa = a->Address.lpSockaddr; + tor_addr_t tmp; + if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6) + continue; + if (tor_addr_from_sockaddr(&tmp, sa, NULL) < 0) + continue; + smartlist_add(result, tor_memdup(&tmp, sizeof(tmp))); + } + } + + return result; +} + +/** Windows only: use GetAdaptersInfo() function to retrieve network interface + * addresses of current machine and return them to caller as smartlist of + * <b>tor_addr_t</b> structures. + */ +STATIC smartlist_t * +get_interface_addresses_win32(int severity) +{ + /* Windows XP began to provide GetAdaptersAddresses. Windows 2000 had a "GetAdaptersInfo", but that's deprecated; let's just try GetAdaptersAddresses and fall back to connect+getsockname. @@ -1252,7 +1326,7 @@ get_interface_addresses_raw(int severity) smartlist_t *result = NULL; GetAdaptersAddresses_fn_t fn; ULONG size, res; - IP_ADAPTER_ADDRESSES *addresses = NULL, *address; + IP_ADAPTER_ADDRESSES *addresses = NULL; (void) severity; @@ -1287,63 +1361,106 @@ get_interface_addresses_raw(int severity) goto done; } - result = smartlist_new(); - for (address = addresses; address; address = address->Next) { - IP_ADAPTER_UNICAST_ADDRESS *a; - for (a = address->FirstUnicastAddress; a; a = a->Next) { - /* Yes, it's a linked list inside a linked list */ - struct sockaddr *sa = a->Address.lpSockaddr; - tor_addr_t tmp; - if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6) - continue; - if (tor_addr_from_sockaddr(&tmp, sa, NULL) < 0) - continue; - smartlist_add(result, tor_memdup(&tmp, sizeof(tmp))); - } - } + result = ip_adapter_addresses_to_smartlist(addresses); done: if (lib) FreeLibrary(lib); tor_free(addresses); return result; -#elif defined(SIOCGIFCONF) && defined(HAVE_IOCTL) +} + +#endif + +#ifdef HAVE_IFCONF_TO_SMARTLIST + +/* This is defined on Mac OS X */ +#ifndef _SIZEOF_ADDR_IFREQ +#define _SIZEOF_ADDR_IFREQ sizeof +#endif + +/** Convert <b>*ifr</b>, an ifreq structure array of size <b>buflen</b> + * into smartlist of <b>tor_addr_t</b> structures. + */ +STATIC smartlist_t * +ifreq_to_smartlist(const struct ifreq *ifr, size_t buflen) +{ + smartlist_t *result = smartlist_new(); + + struct ifreq *r = (struct ifreq *)ifr; + + while ((char *)r < (char *)ifr+buflen) { + const struct sockaddr *sa = &r->ifr_addr; + tor_addr_t tmp; + int valid_sa_family = (sa->sa_family == AF_INET || + sa->sa_family == AF_INET6); + + int conversion_success = (tor_addr_from_sockaddr(&tmp, sa, NULL) == 0); + + if (valid_sa_family && conversion_success) + smartlist_add(result, tor_memdup(&tmp, sizeof(tmp))); + + r = (struct ifreq *)((char *)r + _SIZEOF_ADDR_IFREQ(*r)); + } + + return result; +} + +/** Use ioctl(.,SIOCGIFCONF,.) to get a list of current machine + * network interface addresses. Represent the result by smartlist of + * <b>tor_addr_t</b> structures. + */ +STATIC smartlist_t * +get_interface_addresses_ioctl(int severity) +{ /* Some older unixy systems make us use ioctl(SIOCGIFCONF) */ struct ifconf ifc; - int fd, i, sz, n; + int fd, sz; + void *databuf = NULL; smartlist_t *result = NULL; + /* This interface, AFAICT, only supports AF_INET addresses */ fd = socket(AF_INET, SOCK_DGRAM, 0); if (fd < 0) { tor_log(severity, LD_NET, "socket failed: %s", strerror(errno)); goto done; } + /* Guess how much space we need. */ - ifc.ifc_len = sz = 15*1024; - ifc.ifc_ifcu.ifcu_req = tor_malloc(sz); + ifc.ifc_len = sz = 4096; + databuf = tor_malloc_zero(sz); + ifc.ifc_buf = databuf; + if (ioctl(fd, SIOCGIFCONF, &ifc) < 0) { tor_log(severity, LD_NET, "ioctl failed: %s", strerror(errno)); close(fd); goto done; } - close(fd); - result = smartlist_new(); - if (ifc.ifc_len < sz) - sz = ifc.ifc_len; - n = sz / sizeof(struct ifreq); - for (i = 0; i < n ; ++i) { - struct ifreq *r = &ifc.ifc_ifcu.ifcu_req[i]; - struct sockaddr *sa = &r->ifr_addr; - tor_addr_t tmp; - if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6) - continue; /* should be impossible */ - if (tor_addr_from_sockaddr(&tmp, sa, NULL) < 0) - continue; - smartlist_add(result, tor_memdup(&tmp, sizeof(tmp))); - } + + result = ifreq_to_smartlist(databuf, ifc.ifc_len); + done: - tor_free(ifc.ifc_ifcu.ifcu_req); + close(fd); + tor_free(databuf); return result; +} +#endif + +/** Try to ask our network interfaces what addresses they are bound to. + * Return a new smartlist of tor_addr_t on success, and NULL on failure. + * (An empty smartlist indicates that we successfully learned that we have no + * addresses.) Log failure messages at <b>severity</b>. */ +STATIC smartlist_t * +get_interface_addresses_raw(int severity) +{ +#if defined(HAVE_IFADDRS_TO_SMARTLIST) + return get_interface_addresses_ifaddrs(severity); +#endif +#if defined(HAVE_IP_ADAPTER_TO_SMARTLIST) + return get_interface_addresses_win32(severity); +#endif +#if defined(HAVE_IFCONF_TO_SMARTLIST) + return get_interface_addresses_ioctl(severity); #else (void) severity; return NULL; diff --git a/src/common/address.h b/src/common/address.h index d70bb9c508..8c6ee5abbb 100644 --- a/src/common/address.h +++ b/src/common/address.h @@ -11,10 +11,41 @@ #ifndef TOR_ADDRESS_H #define TOR_ADDRESS_H +//#include <sys/sockio.h> #include "orconfig.h" #include "torint.h" #include "compat.h" +#ifdef ADDRESS_PRIVATE + +#if defined(HAVE_SYS_IOCTL_H) +#include <sys/ioctl.h> +#endif + +#ifdef HAVE_GETIFADDRS +#define HAVE_IFADDRS_TO_SMARTLIST +#endif + +#ifdef _WIN32 +#define HAVE_IP_ADAPTER_TO_SMARTLIST +#endif + +#if defined(SIOCGIFCONF) && defined(HAVE_IOCTL) +#define HAVE_IFCONF_TO_SMARTLIST +#endif + +#if defined(HAVE_NET_IF_H) +#include <net/if.h> // for struct ifconf +#endif + +#if defined(HAVE_IFADDRS_TO_SMARTLIST) +#include <ifaddrs.h> +#endif + +// TODO win32 specific includes +#include "container.h" +#endif // ADDRESS_PRIVATE + /** The number of bits from an address to consider while doing a masked * comparison. */ typedef uint8_t maskbits_t; @@ -241,5 +272,27 @@ MOCK_DECL(int,get_interface_address,(int severity, uint32_t *addr)); tor_addr_port_t *tor_addr_port_new(const tor_addr_t *addr, uint16_t port); +#ifdef ADDRESS_PRIVATE +STATIC smartlist_t *get_interface_addresses_raw(int severity); + +#ifdef HAVE_IFADDRS_TO_SMARTLIST +STATIC smartlist_t *ifaddrs_to_smartlist(const struct ifaddrs *ifa); +STATIC smartlist_t *get_interface_addresses_ifaddrs(int severity); +#endif + +#ifdef HAVE_IP_ADAPTER_TO_SMARTLIST +STATIC smartlist_t *ip_adapter_addresses_to_smartlist( + const IP_ADAPTER_ADDRESSES *addresses); +STATIC smartlist_t *get_interface_addresses_win32(int severity); +#endif + +#ifdef HAVE_IFCONF_TO_SMARTLIST +STATIC smartlist_t *ifreq_to_smartlist(const struct ifreq *ifr, + size_t buflen); +STATIC smartlist_t *get_interface_addresses_ioctl(int severity); +#endif + +#endif // ADDRESS_PRIVATE + #endif diff --git a/src/common/compat.c b/src/common/compat.c index 11e2545709..5575316b2b 100644 --- a/src/common/compat.c +++ b/src/common/compat.c @@ -27,7 +27,6 @@ #include "compat.h" #ifdef _WIN32 -#include <process.h> #include <windows.h> #include <sys/locking.h> #endif @@ -823,6 +822,7 @@ replace_file(const char *from, const char *to) case FN_NOENT: break; case FN_FILE: + case FN_EMPTY: if (unlink(to)) return -1; break; case FN_ERROR: @@ -2543,109 +2543,6 @@ get_uname(void) * Process control */ -#if defined(USE_PTHREADS) -/** Wraps a void (*)(void*) function and its argument so we can - * invoke them in a way pthreads would expect. - */ -typedef struct tor_pthread_data_t { - void (*func)(void *); - void *data; -} tor_pthread_data_t; -/** Given a tor_pthread_data_t <b>_data</b>, call _data->func(d->data) - * and free _data. Used to make sure we can call functions the way pthread - * expects. */ -static void * -tor_pthread_helper_fn(void *_data) -{ - tor_pthread_data_t *data = _data; - void (*func)(void*); - void *arg; - /* mask signals to worker threads to avoid SIGPIPE, etc */ - sigset_t sigs; - /* We're in a subthread; don't handle any signals here. */ - sigfillset(&sigs); - pthread_sigmask(SIG_SETMASK, &sigs, NULL); - - func = data->func; - arg = data->data; - tor_free(_data); - func(arg); - return NULL; -} -/** - * A pthread attribute to make threads start detached. - */ -static pthread_attr_t attr_detached; -/** True iff we've called tor_threads_init() */ -static int threads_initialized = 0; -#endif - -/** Minimalist interface to run a void function in the background. On - * Unix calls fork, on win32 calls beginthread. Returns -1 on failure. - * func should not return, but rather should call spawn_exit. - * - * NOTE: if <b>data</b> is used, it should not be allocated on the stack, - * since in a multithreaded environment, there is no way to be sure that - * the caller's stack will still be around when the called function is - * running. - */ -int -spawn_func(void (*func)(void *), void *data) -{ -#if defined(USE_WIN32_THREADS) - int rv; - rv = (int)_beginthread(func, 0, data); - if (rv == (int)-1) - return -1; - return 0; -#elif defined(USE_PTHREADS) - pthread_t thread; - tor_pthread_data_t *d; - if (PREDICT_UNLIKELY(!threads_initialized)) - tor_threads_init(); - d = tor_malloc(sizeof(tor_pthread_data_t)); - d->data = data; - d->func = func; - if (pthread_create(&thread,&attr_detached,tor_pthread_helper_fn,d)) - return -1; - return 0; -#else - pid_t pid; - pid = fork(); - if (pid<0) - return -1; - if (pid==0) { - /* Child */ - func(data); - tor_assert(0); /* Should never reach here. */ - return 0; /* suppress "control-reaches-end-of-non-void" warning. */ - } else { - /* Parent */ - return 0; - } -#endif -} - -/** End the current thread/process. - */ -void -spawn_exit(void) -{ -#if defined(USE_WIN32_THREADS) - _endthread(); - //we should never get here. my compiler thinks that _endthread returns, this - //is an attempt to fool it. - tor_assert(0); - _exit(0); -#elif defined(USE_PTHREADS) - pthread_exit(NULL); -#else - /* http://www.erlenstar.demon.co.uk/unix/faq_2.html says we should - * call _exit, not exit, from child processes. */ - _exit(0); -#endif -} - /** Implementation logic for compute_num_cpus(). */ static int compute_num_cpus_impl(void) @@ -2934,280 +2831,6 @@ tor_gmtime_r(const time_t *timep, struct tm *result) } #endif -#if defined(USE_WIN32_THREADS) -void -tor_mutex_init(tor_mutex_t *m) -{ - InitializeCriticalSection(&m->mutex); -} -void -tor_mutex_uninit(tor_mutex_t *m) -{ - DeleteCriticalSection(&m->mutex); -} -void -tor_mutex_acquire(tor_mutex_t *m) -{ - tor_assert(m); - EnterCriticalSection(&m->mutex); -} -void -tor_mutex_release(tor_mutex_t *m) -{ - LeaveCriticalSection(&m->mutex); -} -unsigned long -tor_get_thread_id(void) -{ - return (unsigned long)GetCurrentThreadId(); -} -#elif defined(USE_PTHREADS) -/** A mutex attribute that we're going to use to tell pthreads that we want - * "reentrant" mutexes (i.e., once we can re-lock if we're already holding - * them.) */ -static pthread_mutexattr_t attr_reentrant; -/** Initialize <b>mutex</b> so it can be locked. Every mutex must be set - * up with tor_mutex_init() or tor_mutex_new(); not both. */ -void -tor_mutex_init(tor_mutex_t *mutex) -{ - int err; - if (PREDICT_UNLIKELY(!threads_initialized)) - tor_threads_init(); - err = pthread_mutex_init(&mutex->mutex, &attr_reentrant); - if (PREDICT_UNLIKELY(err)) { - log_err(LD_GENERAL, "Error %d creating a mutex.", err); - tor_fragile_assert(); - } -} -/** Wait until <b>m</b> is free, then acquire it. */ -void -tor_mutex_acquire(tor_mutex_t *m) -{ - int err; - tor_assert(m); - err = pthread_mutex_lock(&m->mutex); - if (PREDICT_UNLIKELY(err)) { - log_err(LD_GENERAL, "Error %d locking a mutex.", err); - tor_fragile_assert(); - } -} -/** Release the lock <b>m</b> so another thread can have it. */ -void -tor_mutex_release(tor_mutex_t *m) -{ - int err; - tor_assert(m); - err = pthread_mutex_unlock(&m->mutex); - if (PREDICT_UNLIKELY(err)) { - log_err(LD_GENERAL, "Error %d unlocking a mutex.", err); - tor_fragile_assert(); - } -} -/** Clean up the mutex <b>m</b> so that it no longer uses any system - * resources. Does not free <b>m</b>. This function must only be called on - * mutexes from tor_mutex_init(). */ -void -tor_mutex_uninit(tor_mutex_t *m) -{ - int err; - tor_assert(m); - err = pthread_mutex_destroy(&m->mutex); - if (PREDICT_UNLIKELY(err)) { - log_err(LD_GENERAL, "Error %d destroying a mutex.", err); - tor_fragile_assert(); - } -} -/** Return an integer representing this thread. */ -unsigned long -tor_get_thread_id(void) -{ - union { - pthread_t thr; - unsigned long id; - } r; - r.thr = pthread_self(); - return r.id; -} -#endif - -/** Return a newly allocated, ready-for-use mutex. */ -tor_mutex_t * -tor_mutex_new(void) -{ - tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t)); - tor_mutex_init(m); - return m; -} -/** Release all storage and system resources held by <b>m</b>. */ -void -tor_mutex_free(tor_mutex_t *m) -{ - if (!m) - return; - tor_mutex_uninit(m); - tor_free(m); -} - -/* Conditions. */ -#ifdef USE_PTHREADS -#if 0 -/** Cross-platform condition implementation. */ -struct tor_cond_t { - pthread_cond_t cond; -}; -/** Return a newly allocated condition, with nobody waiting on it. */ -tor_cond_t * -tor_cond_new(void) -{ - tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t)); - if (pthread_cond_init(&cond->cond, NULL)) { - tor_free(cond); - return NULL; - } - return cond; -} -/** Release all resources held by <b>cond</b>. */ -void -tor_cond_free(tor_cond_t *cond) -{ - if (!cond) - return; - if (pthread_cond_destroy(&cond->cond)) { - log_warn(LD_GENERAL,"Error freeing condition: %s", strerror(errno)); - return; - } - tor_free(cond); -} -/** Wait until one of the tor_cond_signal functions is called on <b>cond</b>. - * All waiters on the condition must wait holding the same <b>mutex</b>. - * Returns 0 on success, negative on failure. */ -int -tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex) -{ - return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0; -} -/** Wake up one of the waiters on <b>cond</b>. */ -void -tor_cond_signal_one(tor_cond_t *cond) -{ - pthread_cond_signal(&cond->cond); -} -/** Wake up all of the waiters on <b>cond</b>. */ -void -tor_cond_signal_all(tor_cond_t *cond) -{ - pthread_cond_broadcast(&cond->cond); -} -#endif -/** Set up common structures for use by threading. */ -void -tor_threads_init(void) -{ - if (!threads_initialized) { - pthread_mutexattr_init(&attr_reentrant); - pthread_mutexattr_settype(&attr_reentrant, PTHREAD_MUTEX_RECURSIVE); - tor_assert(0==pthread_attr_init(&attr_detached)); - tor_assert(0==pthread_attr_setdetachstate(&attr_detached, 1)); - threads_initialized = 1; - set_main_thread(); - } -} -#elif defined(USE_WIN32_THREADS) -#if 0 -static DWORD cond_event_tls_index; -struct tor_cond_t { - CRITICAL_SECTION mutex; - smartlist_t *events; -}; -tor_cond_t * -tor_cond_new(void) -{ - tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t)); - InitializeCriticalSection(&cond->mutex); - cond->events = smartlist_new(); - return cond; -} -void -tor_cond_free(tor_cond_t *cond) -{ - if (!cond) - return; - DeleteCriticalSection(&cond->mutex); - /* XXXX notify? */ - smartlist_free(cond->events); - tor_free(cond); -} -int -tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex) -{ - HANDLE event; - int r; - tor_assert(cond); - tor_assert(mutex); - event = TlsGetValue(cond_event_tls_index); - if (!event) { - event = CreateEvent(0, FALSE, FALSE, NULL); - TlsSetValue(cond_event_tls_index, event); - } - EnterCriticalSection(&cond->mutex); - - tor_assert(WaitForSingleObject(event, 0) == WAIT_TIMEOUT); - tor_assert(!smartlist_contains(cond->events, event)); - smartlist_add(cond->events, event); - - LeaveCriticalSection(&cond->mutex); - - tor_mutex_release(mutex); - r = WaitForSingleObject(event, INFINITE); - tor_mutex_acquire(mutex); - - switch (r) { - case WAIT_OBJECT_0: /* we got the mutex normally. */ - break; - case WAIT_ABANDONED: /* holding thread exited. */ - case WAIT_TIMEOUT: /* Should never happen. */ - tor_assert(0); - break; - case WAIT_FAILED: - log_warn(LD_GENERAL, "Failed to acquire mutex: %d",(int) GetLastError()); - } - return 0; -} -void -tor_cond_signal_one(tor_cond_t *cond) -{ - HANDLE event; - tor_assert(cond); - - EnterCriticalSection(&cond->mutex); - - if ((event = smartlist_pop_last(cond->events))) - SetEvent(event); - - LeaveCriticalSection(&cond->mutex); -} -void -tor_cond_signal_all(tor_cond_t *cond) -{ - tor_assert(cond); - - EnterCriticalSection(&cond->mutex); - SMARTLIST_FOREACH(cond->events, HANDLE, event, SetEvent(event)); - smartlist_clear(cond->events); - LeaveCriticalSection(&cond->mutex); -} -#endif -void -tor_threads_init(void) -{ -#if 0 - cond_event_tls_index = TlsAlloc(); -#endif - set_main_thread(); -} -#endif - #if defined(HAVE_MLOCKALL) && HAVE_DECL_MLOCKALL && defined(RLIMIT_MEMLOCK) /** Attempt to raise the current and max rlimit to infinity for our process. * This only needs to be done once and can probably only be done when we have @@ -3291,23 +2914,6 @@ tor_mlockall(void) #endif } -/** Identity of the "main" thread */ -static unsigned long main_thread_id = -1; - -/** Start considering the current thread to be the 'main thread'. This has - * no effect on anything besides in_main_thread(). */ -void -set_main_thread(void) -{ - main_thread_id = tor_get_thread_id(); -} -/** Return true iff called from the main thread. */ -int -in_main_thread(void) -{ - return main_thread_id == tor_get_thread_id(); -} - /** * On Windows, WSAEWOULDBLOCK is not always correct: when you see it, * you need to ask the socket for its actual errno. Also, you need to diff --git a/src/common/compat.h b/src/common/compat.h index 04e8cb267c..23f8614196 100644 --- a/src/common/compat.h +++ b/src/common/compat.h @@ -36,9 +36,6 @@ #ifdef HAVE_STRING_H #include <string.h> #endif -#if defined(HAVE_PTHREAD_H) && !defined(_WIN32) -#include <pthread.h> -#endif #include <stdarg.h> #ifdef HAVE_SYS_RESOURCE_H #include <sys/resource.h> @@ -642,61 +639,10 @@ char **get_environment(void); int get_total_system_memory(size_t *mem_out); -int spawn_func(void (*func)(void *), void *data); -void spawn_exit(void) ATTR_NORETURN; - -#if defined(_WIN32) -#define USE_WIN32_THREADS -#elif defined(HAVE_PTHREAD_H) && defined(HAVE_PTHREAD_CREATE) -#define USE_PTHREADS -#else -#error "No threading system was found" -#endif - int compute_num_cpus(void); -/* Because we use threads instead of processes on most platforms (Windows, - * Linux, etc), we need locking for them. On platforms with poor thread - * support or broken gethostbyname_r, these functions are no-ops. */ - -/** A generic lock structure for multithreaded builds. */ -typedef struct tor_mutex_t { -#if defined(USE_WIN32_THREADS) - /** Windows-only: on windows, we implement locks with CRITICAL_SECTIONS. */ - CRITICAL_SECTION mutex; -#elif defined(USE_PTHREADS) - /** Pthreads-only: with pthreads, we implement locks with - * pthread_mutex_t. */ - pthread_mutex_t mutex; -#else - /** No-threads only: Dummy variable so that tor_mutex_t takes up space. */ - int _unused; -#endif -} tor_mutex_t; - int tor_mlockall(void); -tor_mutex_t *tor_mutex_new(void); -void tor_mutex_init(tor_mutex_t *m); -void tor_mutex_acquire(tor_mutex_t *m); -void tor_mutex_release(tor_mutex_t *m); -void tor_mutex_free(tor_mutex_t *m); -void tor_mutex_uninit(tor_mutex_t *m); -unsigned long tor_get_thread_id(void); -void tor_threads_init(void); - -void set_main_thread(void); -int in_main_thread(void); - -#if 0 -typedef struct tor_cond_t tor_cond_t; -tor_cond_t *tor_cond_new(void); -void tor_cond_free(tor_cond_t *cond); -int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex); -void tor_cond_signal_one(tor_cond_t *cond); -void tor_cond_signal_all(tor_cond_t *cond); -#endif - /** Macros for MIN/MAX. Never use these when the arguments could have * side-effects. * {With GCC extensions we could probably define a safer MIN/MAX. But @@ -742,5 +688,8 @@ STATIC int tor_ersatz_socketpair(int family, int type, int protocol, #endif #endif +/* This needs some of the declarations above so we include it here. */ +#include "compat_threads.h" + #endif diff --git a/src/common/compat_libevent.h b/src/common/compat_libevent.h index 69259e7ed6..6bbfae0056 100644 --- a/src/common/compat_libevent.h +++ b/src/common/compat_libevent.h @@ -5,6 +5,7 @@ #define TOR_COMPAT_LIBEVENT_H #include "orconfig.h" +#include "testsupport.h" struct event; struct event_base; diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c new file mode 100644 index 0000000000..f4a6cad154 --- /dev/null +++ b/src/common/compat_pthreads.c @@ -0,0 +1,287 @@ +/* Copyright (c) 2003-2004, Roger Dingledine + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#define _GNU_SOURCE + +#include "orconfig.h" +#include <pthread.h> +#include <signal.h> +#include <time.h> + +#include "compat.h" +#include "torlog.h" +#include "util.h" + +/** Wraps a void (*)(void*) function and its argument so we can + * invoke them in a way pthreads would expect. + */ +typedef struct tor_pthread_data_t { + void (*func)(void *); + void *data; +} tor_pthread_data_t; +/** Given a tor_pthread_data_t <b>_data</b>, call _data->func(d->data) + * and free _data. Used to make sure we can call functions the way pthread + * expects. */ +static void * +tor_pthread_helper_fn(void *_data) +{ + tor_pthread_data_t *data = _data; + void (*func)(void*); + void *arg; + /* mask signals to worker threads to avoid SIGPIPE, etc */ + sigset_t sigs; + /* We're in a subthread; don't handle any signals here. */ + sigfillset(&sigs); + pthread_sigmask(SIG_SETMASK, &sigs, NULL); + + func = data->func; + arg = data->data; + tor_free(_data); + func(arg); + return NULL; +} +/** + * A pthread attribute to make threads start detached. + */ +static pthread_attr_t attr_detached; +/** True iff we've called tor_threads_init() */ +static int threads_initialized = 0; + +/** Minimalist interface to run a void function in the background. On + * Unix calls fork, on win32 calls beginthread. Returns -1 on failure. + * func should not return, but rather should call spawn_exit. + * + * NOTE: if <b>data</b> is used, it should not be allocated on the stack, + * since in a multithreaded environment, there is no way to be sure that + * the caller's stack will still be around when the called function is + * running. + */ +int +spawn_func(void (*func)(void *), void *data) +{ + pthread_t thread; + tor_pthread_data_t *d; + if (PREDICT_UNLIKELY(!threads_initialized)) + tor_threads_init(); + d = tor_malloc(sizeof(tor_pthread_data_t)); + d->data = data; + d->func = func; + if (pthread_create(&thread,&attr_detached,tor_pthread_helper_fn,d)) + return -1; + return 0; +} + +/** End the current thread/process. + */ +void +spawn_exit(void) +{ + pthread_exit(NULL); +} + +/** A mutex attribute that we're going to use to tell pthreads that we want + * "recursive" mutexes (i.e., once we can re-lock if we're already holding + * them.) */ +static pthread_mutexattr_t attr_recursive; + +/** Initialize <b>mutex</b> so it can be locked. Every mutex must be set + * up with tor_mutex_init() or tor_mutex_new(); not both. */ +void +tor_mutex_init(tor_mutex_t *mutex) +{ + int err; + if (PREDICT_UNLIKELY(!threads_initialized)) + tor_threads_init(); + err = pthread_mutex_init(&mutex->mutex, &attr_recursive); + if (PREDICT_UNLIKELY(err)) { + log_err(LD_GENERAL, "Error %d creating a mutex.", err); + tor_fragile_assert(); + } +} + +/** As tor_mutex_init, but initialize a mutex suitable that may be + * non-recursive, if the OS supports that. */ +void +tor_mutex_init_nonrecursive(tor_mutex_t *mutex) +{ + int err; + if (PREDICT_UNLIKELY(!threads_initialized)) + tor_threads_init(); + err = pthread_mutex_init(&mutex->mutex, NULL); + if (PREDICT_UNLIKELY(err)) { + log_err(LD_GENERAL, "Error %d creating a mutex.", err); + tor_fragile_assert(); + } +} + +/** Wait until <b>m</b> is free, then acquire it. */ +void +tor_mutex_acquire(tor_mutex_t *m) +{ + int err; + tor_assert(m); + err = pthread_mutex_lock(&m->mutex); + if (PREDICT_UNLIKELY(err)) { + log_err(LD_GENERAL, "Error %d locking a mutex.", err); + tor_fragile_assert(); + } +} +/** Release the lock <b>m</b> so another thread can have it. */ +void +tor_mutex_release(tor_mutex_t *m) +{ + int err; + tor_assert(m); + err = pthread_mutex_unlock(&m->mutex); + if (PREDICT_UNLIKELY(err)) { + log_err(LD_GENERAL, "Error %d unlocking a mutex.", err); + tor_fragile_assert(); + } +} +/** Clean up the mutex <b>m</b> so that it no longer uses any system + * resources. Does not free <b>m</b>. This function must only be called on + * mutexes from tor_mutex_init(). */ +void +tor_mutex_uninit(tor_mutex_t *m) +{ + int err; + tor_assert(m); + err = pthread_mutex_destroy(&m->mutex); + if (PREDICT_UNLIKELY(err)) { + log_err(LD_GENERAL, "Error %d destroying a mutex.", err); + tor_fragile_assert(); + } +} +/** Return an integer representing this thread. */ +unsigned long +tor_get_thread_id(void) +{ + union { + pthread_t thr; + unsigned long id; + } r; + r.thr = pthread_self(); + return r.id; +} + +/* Conditions. */ + +/** Initialize an already-allocated condition variable. */ +int +tor_cond_init(tor_cond_t *cond) +{ + pthread_condattr_t condattr; + + memset(cond, 0, sizeof(tor_cond_t)); + /* Default condition attribute. Might be used if clock monotonic is + * available else this won't affect anything. */ + if (pthread_condattr_init(&condattr)) { + return -1; + } + +#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) + /* Use monotonic time so when we timedwait() on it, any clock adjustment + * won't affect the timeout value. */ + if (pthread_condattr_setclock(&condattr, CLOCK_MONOTONIC)) { + return -1; + } +#endif + if (pthread_cond_init(&cond->cond, &condattr)) { + return -1; + } + return 0; +} + +/** Release all resources held by <b>cond</b>, but do not free <b>cond</b> + * itself. */ +void +tor_cond_uninit(tor_cond_t *cond) +{ + if (pthread_cond_destroy(&cond->cond)) { + log_warn(LD_GENERAL,"Error freeing condition: %s", strerror(errno)); + return; + } +} +/** Wait until one of the tor_cond_signal functions is called on <b>cond</b>. + * (If <b>tv</b> is set, and that amount of time passes with no signal to + * <b>cond</b>, return anyway. All waiters on the condition must wait holding + * the same <b>mutex</b>. All signallers should hold that mutex. The mutex + * needs to have been allocated with tor_mutex_init_for_cond(). + * + * Returns 0 on success, -1 on failure, 1 on timeout. */ +int +tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv) +{ + int r; + if (tv == NULL) { + while (1) { + r = pthread_cond_wait(&cond->cond, &mutex->mutex); + if (r == EINTR) { + /* EINTR should be impossible according to POSIX, but POSIX, like the + * Pirate's Code, is apparently treated "more like what you'd call + * guidelines than actual rules." */ + continue; + } + return r ? -1 : 0; + } + } else { + struct timeval tvnow, tvsum; + struct timespec ts; + while (1) { +#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) + if (clock_gettime(CLOCK_MONOTONIC, &ts) < 0) { + return -1; + } + tvnow.tv_sec = ts.tv_sec; + tvnow.tv_usec = ts.tv_nsec / 1000; + timeradd(tv, &tvnow, &tvsum); +#else + if (gettimeofday(&tvnow, NULL) < 0) + return -1; + timeradd(tv, &tvnow, &tvsum); +#endif /* HAVE_CLOCK_GETTIME, CLOCK_MONOTONIC */ + + ts.tv_sec = tvsum.tv_sec; + ts.tv_nsec = tvsum.tv_usec * 1000; + + r = pthread_cond_timedwait(&cond->cond, &mutex->mutex, &ts); + if (r == 0) + return 0; + else if (r == ETIMEDOUT) + return 1; + else if (r == EINTR) + continue; + else + return -1; + } + } +} +/** Wake up one of the waiters on <b>cond</b>. */ +void +tor_cond_signal_one(tor_cond_t *cond) +{ + pthread_cond_signal(&cond->cond); +} +/** Wake up all of the waiters on <b>cond</b>. */ +void +tor_cond_signal_all(tor_cond_t *cond) +{ + pthread_cond_broadcast(&cond->cond); +} + +/** Set up common structures for use by threading. */ +void +tor_threads_init(void) +{ + if (!threads_initialized) { + pthread_mutexattr_init(&attr_recursive); + pthread_mutexattr_settype(&attr_recursive, PTHREAD_MUTEX_RECURSIVE); + tor_assert(0==pthread_attr_init(&attr_detached)); + tor_assert(0==pthread_attr_setdetachstate(&attr_detached, 1)); + threads_initialized = 1; + set_main_thread(); + } +} + diff --git a/src/common/compat_threads.c b/src/common/compat_threads.c new file mode 100644 index 0000000000..d2d929e430 --- /dev/null +++ b/src/common/compat_threads.c @@ -0,0 +1,302 @@ +/* Copyright (c) 2003-2004, Roger Dingledine + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#define _GNU_SOURCE + +#include "orconfig.h" +#include <stdlib.h> +#include "compat.h" +#include "compat_threads.h" + +#include "util.h" +#include "torlog.h" + +#ifdef HAVE_SYS_EVENTFD_H +#include <sys/eventfd.h> +#endif +#ifdef HAVE_FCNTL_H +#include <fcntl.h> +#endif +#ifdef HAVE_UNISTD_H +#include <unistd.h> +#endif + +/** Return a newly allocated, ready-for-use mutex. */ +tor_mutex_t * +tor_mutex_new(void) +{ + tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t)); + tor_mutex_init(m); + return m; +} +/** Return a newly allocated, ready-for-use mutex. This one might be + * non-recursive, if that's faster. */ +tor_mutex_t * +tor_mutex_new_nonrecursive(void) +{ + tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t)); + tor_mutex_init_nonrecursive(m); + return m; +} +/** Release all storage and system resources held by <b>m</b>. */ +void +tor_mutex_free(tor_mutex_t *m) +{ + if (!m) + return; + tor_mutex_uninit(m); + tor_free(m); +} + +/** Allocate and return a new condition variable. */ +tor_cond_t * +tor_cond_new(void) +{ + tor_cond_t *cond = tor_malloc(sizeof(tor_cond_t)); + if (tor_cond_init(cond)<0) + tor_free(cond); + return cond; +} + +/** Free all storage held in <b>c</b>. */ +void +tor_cond_free(tor_cond_t *c) +{ + if (!c) + return; + tor_cond_uninit(c); + tor_free(c); +} + +/** Identity of the "main" thread */ +static unsigned long main_thread_id = -1; + +/** Start considering the current thread to be the 'main thread'. This has + * no effect on anything besides in_main_thread(). */ +void +set_main_thread(void) +{ + main_thread_id = tor_get_thread_id(); +} +/** Return true iff called from the main thread. */ +int +in_main_thread(void) +{ + return main_thread_id == tor_get_thread_id(); +} + +#if defined(HAVE_EVENTFD) || defined(HAVE_PIPE) +/* non-interruptable versions */ +static int +write_ni(int fd, const void *buf, size_t n) +{ + int r; + again: + r = (int) write(fd, buf, n); + if (r < 0 && errno == EINTR) + goto again; + return r; +} +static int +read_ni(int fd, void *buf, size_t n) +{ + int r; + again: + r = (int) read(fd, buf, n); + if (r < 0 && errno == EINTR) + goto again; + return r; +} +#endif + +/* non-interruptable versions */ +static int +send_ni(int fd, const void *buf, size_t n, int flags) +{ + int r; + again: + r = (int) send(fd, buf, n, flags); + if (r < 0 && errno == EINTR) + goto again; + return r; +} + +static int +recv_ni(int fd, void *buf, size_t n, int flags) +{ + int r; + again: + r = (int) recv(fd, buf, n, flags); + if (r < 0 && errno == EINTR) + goto again; + return r; +} + +#ifdef HAVE_EVENTFD +static int +eventfd_alert(int fd) +{ + uint64_t u = 1; + int r = write_ni(fd, (void*)&u, sizeof(u)); + if (r < 0 && errno != EAGAIN) + return -1; + return 0; +} + +static int +eventfd_drain(int fd) +{ + uint64_t u = 0; + int r = read_ni(fd, (void*)&u, sizeof(u)); + if (r < 0 && errno != EAGAIN) + return -1; + return 0; +} +#endif + +#ifdef HAVE_PIPE +static int +pipe_alert(int fd) +{ + ssize_t r = write_ni(fd, "x", 1); + if (r < 0 && errno != EAGAIN) + return -1; + return 0; +} + +static int +pipe_drain(int fd) +{ + char buf[32]; + ssize_t r; + while ((r = read_ni(fd, buf, sizeof(buf))) >= 0) + ; + if (r == 0 || errno != EAGAIN) + return -1; + return 0; +} +#endif + +static int +sock_alert(tor_socket_t fd) +{ + ssize_t r = send_ni(fd, "x", 1, 0); + if (r < 0 && !ERRNO_IS_EAGAIN(tor_socket_errno(fd))) + return -1; + return 0; +} + +static int +sock_drain(tor_socket_t fd) +{ + char buf[32]; + ssize_t r; + while ((r = recv_ni(fd, buf, sizeof(buf), 0)) >= 0) + ; + if (r == 0 || !ERRNO_IS_EAGAIN(tor_socket_errno(fd))) + return -1; + return 0; +} + +/** Allocate a new set of alert sockets, and set the appropriate function + * pointers, in <b>socks_out</b>. */ +int +alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags) +{ + tor_socket_t socks[2] = { TOR_INVALID_SOCKET, TOR_INVALID_SOCKET }; + +#ifdef HAVE_EVENTFD + /* First, we try the Linux eventfd() syscall. This gives a 64-bit counter + * associated with a single file descriptor. */ +#if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK) + if (!(flags & ASOCKS_NOEVENTFD2)) + socks[0] = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); +#endif + if (socks[0] < 0 && !(flags & ASOCKS_NOEVENTFD)) { + socks[0] = eventfd(0,0); + if (socks[0] >= 0) { + if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 || + set_socket_nonblocking(socks[0]) < 0) { + close(socks[0]); + return -1; + } + } + } + if (socks[0] >= 0) { + socks_out->read_fd = socks_out->write_fd = socks[0]; + socks_out->alert_fn = eventfd_alert; + socks_out->drain_fn = eventfd_drain; + return 0; + } +#endif + +#ifdef HAVE_PIPE2 + /* Now we're going to try pipes. First type the pipe2() syscall, if we + * have it, so we can save some calls... */ + if (!(flags & ASOCKS_NOPIPE2) && + pipe2(socks, O_NONBLOCK|O_CLOEXEC) == 0) { + socks_out->read_fd = socks[0]; + socks_out->write_fd = socks[1]; + socks_out->alert_fn = pipe_alert; + socks_out->drain_fn = pipe_drain; + return 0; + } +#endif + +#ifdef HAVE_PIPE + /* Now try the regular pipe() syscall. Pipes have a bit lower overhead than + * socketpairs, fwict. */ + if (!(flags & ASOCKS_NOPIPE) && + pipe(socks) == 0) { + if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 || + fcntl(socks[1], F_SETFD, FD_CLOEXEC) < 0 || + set_socket_nonblocking(socks[0]) < 0 || + set_socket_nonblocking(socks[1]) < 0) { + close(socks[0]); + close(socks[1]); + return -1; + } + socks_out->read_fd = socks[0]; + socks_out->write_fd = socks[1]; + socks_out->alert_fn = pipe_alert; + socks_out->drain_fn = pipe_drain; + return 0; + } +#endif + + /* If nothing else worked, fall back on socketpair(). */ + if (!(flags & ASOCKS_NOSOCKETPAIR) && + tor_socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == 0) { + if (set_socket_nonblocking(socks[0]) < 0 || + set_socket_nonblocking(socks[1])) { + tor_close_socket(socks[0]); + tor_close_socket(socks[1]); + return -1; + } + socks_out->read_fd = socks[0]; + socks_out->write_fd = socks[1]; + socks_out->alert_fn = sock_alert; + socks_out->drain_fn = sock_drain; + return 0; + } + return -1; +} + +/** Close the sockets in <b>socks</b>. */ +void +alert_sockets_close(alert_sockets_t *socks) +{ + if (socks->alert_fn == sock_alert) { + /* they are sockets. */ + tor_close_socket(socks->read_fd); + tor_close_socket(socks->write_fd); + } else { + close(socks->read_fd); + if (socks->write_fd != socks->read_fd) + close(socks->write_fd); + } + socks->read_fd = socks->write_fd = -1; +} + diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h new file mode 100644 index 0000000000..acf3083f37 --- /dev/null +++ b/src/common/compat_threads.h @@ -0,0 +1,115 @@ +/* Copyright (c) 2003-2004, Roger Dingledine + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_COMPAT_THREADS_H +#define TOR_COMPAT_THREADS_H + +#include "orconfig.h" +#include "torint.h" +#include "testsupport.h" + +#if defined(HAVE_PTHREAD_H) && !defined(_WIN32) +#include <pthread.h> +#endif + +#if defined(_WIN32) +#define USE_WIN32_THREADS +#elif defined(HAVE_PTHREAD_H) && defined(HAVE_PTHREAD_CREATE) +#define USE_PTHREADS +#else +#error "No threading system was found" +#endif + +int spawn_func(void (*func)(void *), void *data); +void spawn_exit(void) ATTR_NORETURN; + +/* Because we use threads instead of processes on most platforms (Windows, + * Linux, etc), we need locking for them. On platforms with poor thread + * support or broken gethostbyname_r, these functions are no-ops. */ + +/** A generic lock structure for multithreaded builds. */ +typedef struct tor_mutex_t { +#if defined(USE_WIN32_THREADS) + /** Windows-only: on windows, we implement locks with CRITICAL_SECTIONS. */ + CRITICAL_SECTION mutex; +#elif defined(USE_PTHREADS) + /** Pthreads-only: with pthreads, we implement locks with + * pthread_mutex_t. */ + pthread_mutex_t mutex; +#else + /** No-threads only: Dummy variable so that tor_mutex_t takes up space. */ + int _unused; +#endif +} tor_mutex_t; + +tor_mutex_t *tor_mutex_new(void); +tor_mutex_t *tor_mutex_new_nonrecursive(void); +void tor_mutex_init(tor_mutex_t *m); +void tor_mutex_init_nonrecursive(tor_mutex_t *m); +void tor_mutex_acquire(tor_mutex_t *m); +void tor_mutex_release(tor_mutex_t *m); +void tor_mutex_free(tor_mutex_t *m); +void tor_mutex_uninit(tor_mutex_t *m); +unsigned long tor_get_thread_id(void); +void tor_threads_init(void); + +/** Conditions need nonrecursive mutexes with pthreads. */ +#define tor_mutex_init_for_cond(m) tor_mutex_init_nonrecursive(m) + +void set_main_thread(void); +int in_main_thread(void); + +typedef struct tor_cond_t { +#ifdef USE_PTHREADS + pthread_cond_t cond; +#elif defined(USE_WIN32_THREADS) + HANDLE event; + + CRITICAL_SECTION lock; + int n_waiting; + int n_to_wake; + int generation; +#else +#error no known condition implementation. +#endif +} tor_cond_t; + +tor_cond_t *tor_cond_new(void); +void tor_cond_free(tor_cond_t *cond); +int tor_cond_init(tor_cond_t *cond); +void tor_cond_uninit(tor_cond_t *cond); +int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, + const struct timeval *tv); +void tor_cond_signal_one(tor_cond_t *cond); +void tor_cond_signal_all(tor_cond_t *cond); + +/** Helper type used to manage waking up the main thread while it's in + * the libevent main loop. Used by the work queue code. */ +typedef struct alert_sockets_s { + /* XXXX This structure needs a better name. */ + /** Socket that the main thread should listen for EV_READ events on. + * Note that this socket may be a regular fd on a non-Windows platform. + */ + tor_socket_t read_fd; + /** Socket to use when alerting the main thread. */ + tor_socket_t write_fd; + /** Function to alert the main thread */ + int (*alert_fn)(tor_socket_t write_fd); + /** Function to make the main thread no longer alerted. */ + int (*drain_fn)(tor_socket_t read_fd); +} alert_sockets_t; + +/* Flags to disable one or more alert_sockets backends. */ +#define ASOCKS_NOEVENTFD2 (1u<<0) +#define ASOCKS_NOEVENTFD (1u<<1) +#define ASOCKS_NOPIPE2 (1u<<2) +#define ASOCKS_NOPIPE (1u<<3) +#define ASOCKS_NOSOCKETPAIR (1u<<4) + +int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags); +void alert_sockets_close(alert_sockets_t *socks); + +#endif + diff --git a/src/common/compat_winthreads.c b/src/common/compat_winthreads.c new file mode 100644 index 0000000000..71b994c4e4 --- /dev/null +++ b/src/common/compat_winthreads.c @@ -0,0 +1,196 @@ +/* Copyright (c) 2003-2004, Roger Dingledine + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "compat.h" +#include <windows.h> +#include <process.h> +#include "util.h" +#include "container.h" +#include "torlog.h" +#include <process.h> + +/* This value is more or less total cargo-cult */ +#define SPIN_COUNT 2000 + +/** Minimalist interface to run a void function in the background. On + * Unix calls fork, on win32 calls beginthread. Returns -1 on failure. + * func should not return, but rather should call spawn_exit. + * + * NOTE: if <b>data</b> is used, it should not be allocated on the stack, + * since in a multithreaded environment, there is no way to be sure that + * the caller's stack will still be around when the called function is + * running. + */ +int +spawn_func(void (*func)(void *), void *data) +{ + int rv; + rv = (int)_beginthread(func, 0, data); + if (rv == (int)-1) + return -1; + return 0; +} + +/** End the current thread/process. + */ +void +spawn_exit(void) +{ + _endthread(); + //we should never get here. my compiler thinks that _endthread returns, this + //is an attempt to fool it. + tor_assert(0); + _exit(0); +} + +void +tor_mutex_init(tor_mutex_t *m) +{ + InitializeCriticalSection(&m->mutex); +} +void +tor_mutex_init_nonrecursive(tor_mutex_t *m) +{ + InitializeCriticalSection(&m->mutex); +} + +void +tor_mutex_uninit(tor_mutex_t *m) +{ + DeleteCriticalSection(&m->mutex); +} +void +tor_mutex_acquire(tor_mutex_t *m) +{ + tor_assert(m); + EnterCriticalSection(&m->mutex); +} +void +tor_mutex_release(tor_mutex_t *m) +{ + LeaveCriticalSection(&m->mutex); +} +unsigned long +tor_get_thread_id(void) +{ + return (unsigned long)GetCurrentThreadId(); +} + +int +tor_cond_init(tor_cond_t *cond) +{ + memset(cond, 0, sizeof(tor_cond_t)); + if (InitializeCriticalSectionAndSpinCount(&cond->lock, SPIN_COUNT)==0) { + return -1; + } + if ((cond->event = CreateEvent(NULL,TRUE,FALSE,NULL)) == NULL) { + DeleteCriticalSection(&cond->lock); + return -1; + } + cond->n_waiting = cond->n_to_wake = cond->generation = 0; + return 0; +} +void +tor_cond_uninit(tor_cond_t *cond) +{ + DeleteCriticalSection(&cond->lock); + CloseHandle(cond->event); +} + +static void +tor_cond_signal_impl(tor_cond_t *cond, int broadcast) +{ + EnterCriticalSection(&cond->lock); + if (broadcast) + cond->n_to_wake = cond->n_waiting; + else + ++cond->n_to_wake; + cond->generation++; + SetEvent(cond->event); + LeaveCriticalSection(&cond->lock); +} +void +tor_cond_signal_one(tor_cond_t *cond) +{ + tor_cond_signal_impl(cond, 0); +} +void +tor_cond_signal_all(tor_cond_t *cond) +{ + tor_cond_signal_impl(cond, 1); +} + +int +tor_cond_wait(tor_cond_t *cond, tor_mutex_t *lock_, const struct timeval *tv) +{ + CRITICAL_SECTION *lock = &lock_->mutex; + int generation_at_start; + int waiting = 1; + int result = -1; + DWORD ms = INFINITE, ms_orig = INFINITE, startTime, endTime; + if (tv) + ms_orig = ms = tv->tv_sec*1000 + (tv->tv_usec+999)/1000; + + EnterCriticalSection(&cond->lock); + ++cond->n_waiting; + generation_at_start = cond->generation; + LeaveCriticalSection(&cond->lock); + + LeaveCriticalSection(lock); + + startTime = GetTickCount(); + do { + DWORD res; + res = WaitForSingleObject(cond->event, ms); + EnterCriticalSection(&cond->lock); + if (cond->n_to_wake && + cond->generation != generation_at_start) { + --cond->n_to_wake; + --cond->n_waiting; + result = 0; + waiting = 0; + goto out; + } else if (res != WAIT_OBJECT_0) { + result = (res==WAIT_TIMEOUT) ? 1 : -1; + --cond->n_waiting; + waiting = 0; + goto out; + } else if (ms != INFINITE) { + endTime = GetTickCount(); + if (startTime + ms_orig <= endTime) { + result = 1; /* Timeout */ + --cond->n_waiting; + waiting = 0; + goto out; + } else { + ms = startTime + ms_orig - endTime; + } + } + /* If we make it here, we are still waiting. */ + if (cond->n_to_wake == 0) { + /* There is nobody else who should wake up; reset + * the event. */ + ResetEvent(cond->event); + } + out: + LeaveCriticalSection(&cond->lock); + } while (waiting); + + EnterCriticalSection(lock); + + EnterCriticalSection(&cond->lock); + if (!cond->n_waiting) + ResetEvent(cond->event); + LeaveCriticalSection(&cond->lock); + + return result; +} + +void +tor_threads_init(void) +{ + set_main_thread(); +} + diff --git a/src/common/include.am b/src/common/include.am index 6441596199..14838ab555 100644 --- a/src/common/include.am +++ b/src/common/include.am @@ -54,10 +54,18 @@ endif LIBDONNA += $(LIBED25519_REF10) +if THREADS_PTHREADS +threads_impl_source=src/common/compat_pthreads.c +endif +if THREADS_WIN32 +threads_impl_source=src/common/compat_winthreads.c +endif + LIBOR_A_SOURCES = \ src/common/address.c \ src/common/backtrace.c \ src/common/compat.c \ + src/common/compat_threads.c \ src/common/container.c \ src/common/di_ops.c \ src/common/log.c \ @@ -66,10 +74,12 @@ LIBOR_A_SOURCES = \ src/common/util_codedigest.c \ src/common/util_process.c \ src/common/sandbox.c \ + src/common/workqueue.c \ src/ext/csiphash.c \ src/ext/trunnel/trunnel.c \ $(libor_extra_source) \ - $(libor_mempool_source) + $(libor_mempool_source) \ + $(threads_impl_source) LIBOR_CRYPTO_A_SOURCES = \ src/common/aes.c \ @@ -102,7 +112,6 @@ src_common_libor_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) src_common_libor_crypto_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) src_common_libor_event_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) - COMMONHEADERS = \ src/common/address.h \ src/common/backtrace.h \ @@ -110,6 +119,7 @@ COMMONHEADERS = \ src/common/ciphers.inc \ src/common/compat.h \ src/common/compat_libevent.h \ + src/common/compat_threads.h \ src/common/container.h \ src/common/crypto.h \ src/common/crypto_curve25519.h \ @@ -128,6 +138,7 @@ COMMONHEADERS = \ src/common/tortls.h \ src/common/util.h \ src/common/util_process.h \ + src/common/workqueue.h \ $(libor_mempool_header) noinst_HEADERS+= $(COMMONHEADERS) diff --git a/src/common/tortls.c b/src/common/tortls.c index dd33b330dc..ca629135a6 100644 --- a/src/common/tortls.c +++ b/src/common/tortls.c @@ -29,6 +29,20 @@ #include <ws2tcpip.h> #endif #endif + +#ifdef __GNUC__ +#define GCC_VERSION (__GNUC__ * 100 + __GNUC_MINOR__) +#endif + +#if __GNUC__ && GCC_VERSION >= 402 +#if GCC_VERSION >= 406 +#pragma GCC diagnostic push +#endif +/* Some versions of OpenSSL declare SSL_get_selected_srtp_profile twice in + * srtp.h. Suppress the GCC warning so we can build with -Wredundant-decl. */ +#pragma GCC diagnostic ignored "-Wredundant-decls" +#endif + #include <openssl/ssl.h> #include <openssl/ssl3.h> #include <openssl/err.h> @@ -37,6 +51,14 @@ #include <openssl/bio.h> #include <openssl/opensslv.h> +#if __GNUC__ && GCC_VERSION >= 402 +#if GCC_VERSION >= 406 +#pragma GCC diagnostic pop +#else +#pragma GCC diagnostic warning "-Wredundant-decls" +#endif +#endif + #ifdef USE_BUFFEREVENTS #include <event2/bufferevent_ssl.h> #include <event2/buffer.h> diff --git a/src/common/util.c b/src/common/util.c index e5dec99707..442d57a2cf 100644 --- a/src/common/util.c +++ b/src/common/util.c @@ -527,15 +527,25 @@ round_int64_to_next_multiple_of(int64_t number, int64_t divisor) /** Transform a random value <b>p</b> from the uniform distribution in * [0.0, 1.0[ into a Laplace distributed value with location parameter - * <b>mu</b> and scale parameter <b>b</b> in [-Inf, Inf[. */ -double + * <b>mu</b> and scale parameter <b>b</b>. Truncate the final result + * to be an integer in [INT64_MIN, INT64_MAX]. */ +int64_t sample_laplace_distribution(double mu, double b, double p) { + double result; + tor_assert(p >= 0.0 && p < 1.0); /* This is the "inverse cumulative distribution function" from: * http://en.wikipedia.org/wiki/Laplace_distribution */ - return mu - b * (p > 0.5 ? 1.0 : -1.0) - * tor_mathlog(1.0 - 2.0 * fabs(p - 0.5)); + result = mu - b * (p > 0.5 ? 1.0 : -1.0) + * tor_mathlog(1.0 - 2.0 * fabs(p - 0.5)); + + if (result >= INT64_MAX) + return INT64_MAX; + else if (result <= INT64_MIN) + return INT64_MIN; + else + return (int64_t) result; } /** Add random noise between INT64_MIN and INT64_MAX coming from a @@ -546,10 +556,10 @@ int64_t add_laplace_noise(int64_t signal, double random, double delta_f, double epsilon) { - /* cast to int64_t intended */ int64_t noise = sample_laplace_distribution( 0.0, /* just add noise, no further signal */ delta_f / epsilon, random); + if (noise > 0 && INT64_MAX - noise < signal) return INT64_MAX; else if (noise < 0 && INT64_MIN - noise > signal) @@ -1371,6 +1381,20 @@ esc_for_log(const char *s) return result; } +/** Similar to esc_for_log. Allocate and return a new string representing + * the first n characters in <b>chars</b>, surround by quotes and using + * standard C escapes. If a NUL character is encountered in <b>chars</b>, + * the resulting string will be terminated there. + */ +char * +esc_for_log_len(const char *chars, size_t n) +{ + char *string = tor_strndup(chars, n); + char *string_escaped = esc_for_log(string); + tor_free(string); + return string_escaped; +} + /** Allocate and return a new string representing the contents of <b>s</b>, * surrounded by quotes and using standard C escapes. * @@ -2018,15 +2042,24 @@ clean_name_for_stat(char *name) #endif } -/** Return FN_ERROR if filename can't be read, FN_NOENT if it doesn't - * exist, FN_FILE if it is a regular file, or FN_DIR if it's a - * directory. On FN_ERROR, sets errno. */ +/** Return: + * FN_ERROR if filename can't be read, is NULL, or is zero-length, + * FN_NOENT if it doesn't exist, + * FN_FILE if it is a non-empty regular file, or a FIFO on unix-like systems, + * FN_EMPTY for zero-byte regular files, + * FN_DIR if it's a directory, and + * FN_ERROR for any other file type. + * On FN_ERROR and FN_NOENT, sets errno. (errno is not set when FN_ERROR + * is returned due to an unhandled file type.) */ file_status_t file_status(const char *fname) { struct stat st; char *f; int r; + if (!fname || strlen(fname) == 0) { + return FN_ERROR; + } f = tor_strdup(fname); clean_name_for_stat(f); log_debug(LD_FS, "stat()ing %s", f); @@ -2038,16 +2071,23 @@ file_status(const char *fname) } return FN_ERROR; } - if (st.st_mode & S_IFDIR) + if (st.st_mode & S_IFDIR) { return FN_DIR; - else if (st.st_mode & S_IFREG) - return FN_FILE; + } else if (st.st_mode & S_IFREG) { + if (st.st_size > 0) { + return FN_FILE; + } else if (st.st_size == 0) { + return FN_EMPTY; + } else { + return FN_ERROR; + } #ifndef _WIN32 - else if (st.st_mode & S_IFIFO) + } else if (st.st_mode & S_IFIFO) { return FN_FILE; #endif - else + } else { return FN_ERROR; + } } /** Check whether <b>dirname</b> exists and is private. If yes return 0. If @@ -2966,7 +3006,7 @@ expand_filename(const char *filename) tor_free(username); rest = slash ? (slash+1) : ""; #else - log_warn(LD_CONFIG, "Couldn't expend homedir on system without pwd.h"); + log_warn(LD_CONFIG, "Couldn't expand homedir on system without pwd.h"); return tor_strdup(filename); #endif } diff --git a/src/common/util.h b/src/common/util.h index ee40949b61..175a078c6b 100644 --- a/src/common/util.h +++ b/src/common/util.h @@ -173,7 +173,7 @@ unsigned round_to_next_multiple_of(unsigned number, unsigned divisor); uint32_t round_uint32_to_next_multiple_of(uint32_t number, uint32_t divisor); uint64_t round_uint64_to_next_multiple_of(uint64_t number, uint64_t divisor); int64_t round_int64_to_next_multiple_of(int64_t number, int64_t divisor); -double sample_laplace_distribution(double mu, double b, double p); +int64_t sample_laplace_distribution(double mu, double b, double p); int64_t add_laplace_noise(int64_t signal, double random, double delta_f, double epsilon); int n_bits_set_u8(uint8_t v); @@ -239,6 +239,7 @@ int tor_mem_is_zero(const char *mem, size_t len); int tor_digest_is_zero(const char *digest); int tor_digest256_is_zero(const char *digest); char *esc_for_log(const char *string) ATTR_MALLOC; +char *esc_for_log_len(const char *chars, size_t n) ATTR_MALLOC; const char *escaped(const char *string); char *tor_escape_str_for_pt_args(const char *string, @@ -342,7 +343,7 @@ enum stream_status get_string_from_pipe(FILE *stream, char *buf, size_t count); /** Return values from file_status(); see that function's documentation * for details. */ -typedef enum { FN_ERROR, FN_NOENT, FN_FILE, FN_DIR } file_status_t; +typedef enum { FN_ERROR, FN_NOENT, FN_FILE, FN_DIR, FN_EMPTY } file_status_t; file_status_t file_status(const char *filename); /** Possible behaviors for check_private_dir() on encountering a nonexistent diff --git a/src/common/workqueue.c b/src/common/workqueue.c new file mode 100644 index 0000000000..5da29d5ab9 --- /dev/null +++ b/src/common/workqueue.c @@ -0,0 +1,490 @@ +/* copyright (c) 2013-2015, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "orconfig.h" +#include "compat.h" +#include "compat_threads.h" +#include "util.h" +#include "workqueue.h" +#include "tor_queue.h" +#include "torlog.h" + +struct threadpool_s { + /** An array of pointers to workerthread_t: one for each running worker + * thread. */ + struct workerthread_s **threads; + + /** Condition variable that we wait on when we have no work, and which + * gets signaled when our queue becomes nonempty. */ + tor_cond_t condition; + /** Queue of pending work that we have to do. */ + TOR_TAILQ_HEAD(, workqueue_entry_s) work; + + /** The current 'update generation' of the threadpool. Any thread that is + * at an earlier generation needs to run the update function. */ + unsigned generation; + + /** Function that should be run for updates on each thread. */ + int (*update_fn)(void *, void *); + /** Function to free update arguments if they can't be run. */ + void (*free_update_arg_fn)(void *); + /** Array of n_threads update arguments. */ + void **update_args; + + /** Number of elements in threads. */ + int n_threads; + /** Mutex to protect all the above fields. */ + tor_mutex_t lock; + + /** A reply queue to use when constructing new threads. */ + replyqueue_t *reply_queue; + + /** Functions used to allocate and free thread state. */ + void *(*new_thread_state_fn)(void*); + void (*free_thread_state_fn)(void*); + void *new_thread_state_arg; +}; + +struct workqueue_entry_s { + /** The next workqueue_entry_t that's pending on the same thread or + * reply queue. */ + TOR_TAILQ_ENTRY(workqueue_entry_s) next_work; + /** The threadpool to which this workqueue_entry_t was assigned. This field + * is set when the workqueue_entry_t is created, and won't be cleared until + * after it's handled in the main thread. */ + struct threadpool_s *on_pool; + /** True iff this entry is waiting for a worker to start processing it. */ + uint8_t pending; + /** Function to run in the worker thread. */ + int (*fn)(void *state, void *arg); + /** Function to run while processing the reply queue. */ + void (*reply_fn)(void *arg); + /** Argument for the above functions. */ + void *arg; +}; + +struct replyqueue_s { + /** Mutex to protect the answers field */ + tor_mutex_t lock; + /** Doubly-linked list of answers that the reply queue needs to handle. */ + TOR_TAILQ_HEAD(, workqueue_entry_s) answers; + + /** Mechanism to wake up the main thread when it is receiving answers. */ + alert_sockets_t alert; +}; + +/** A worker thread represents a single thread in a thread pool. To avoid + * contention, each gets its own queue. This breaks the guarantee that that + * queued work will get executed strictly in order. */ +typedef struct workerthread_s { + /** Which thread it this? In range 0..in_pool->n_threads-1 */ + int index; + /** The pool this thread is a part of. */ + struct threadpool_s *in_pool; + /** User-supplied state field that we pass to the worker functions of each + * work item. */ + void *state; + /** Reply queue to which we pass our results. */ + replyqueue_t *reply_queue; + /** The current update generation of this thread */ + unsigned generation; +} workerthread_t; + +static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); + +/** Allocate and return a new workqueue_entry_t, set up to run the function + * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main + * thread. See threadpool_queue_work() for full documentation. */ +static workqueue_entry_t * +workqueue_entry_new(int (*fn)(void*, void*), + void (*reply_fn)(void*), + void *arg) +{ + workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t)); + ent->fn = fn; + ent->reply_fn = reply_fn; + ent->arg = arg; + return ent; +} + +/** + * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on + * any queue. + */ +static void +workqueue_entry_free(workqueue_entry_t *ent) +{ + if (!ent) + return; + memset(ent, 0xf0, sizeof(*ent)); + tor_free(ent); +} + +/** + * Cancel a workqueue_entry_t that has been returned from + * threadpool_queue_work. + * + * You must not call this function on any work whose reply function has been + * executed in the main thread; that will cause undefined behavior (probably, + * a crash). + * + * If the work is cancelled, this function return the argument passed to the + * work function. It is the caller's responsibility to free this storage. + * + * This function will have no effect if the worker thread has already executed + * or begun to execute the work item. In that case, it will return NULL. + */ +void * +workqueue_entry_cancel(workqueue_entry_t *ent) +{ + int cancelled = 0; + void *result = NULL; + tor_mutex_acquire(&ent->on_pool->lock); + if (ent->pending) { + TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work); + cancelled = 1; + result = ent->arg; + } + tor_mutex_release(&ent->on_pool->lock); + + if (cancelled) { + workqueue_entry_free(ent); + } + return result; +} + +/**DOCDOC + + must hold lock */ +static int +worker_thread_has_work(workerthread_t *thread) +{ + return !TOR_TAILQ_EMPTY(&thread->in_pool->work) || + thread->generation != thread->in_pool->generation; +} + +/** + * Main function for the worker thread. + */ +static void +worker_thread_main(void *thread_) +{ + workerthread_t *thread = thread_; + threadpool_t *pool = thread->in_pool; + workqueue_entry_t *work; + int result; + + tor_mutex_acquire(&pool->lock); + while (1) { + /* lock must be held at this point. */ + while (worker_thread_has_work(thread)) { + /* lock must be held at this point. */ + if (thread->in_pool->generation != thread->generation) { + void *arg = thread->in_pool->update_args[thread->index]; + thread->in_pool->update_args[thread->index] = NULL; + int (*update_fn)(void*,void*) = thread->in_pool->update_fn; + thread->generation = thread->in_pool->generation; + tor_mutex_release(&pool->lock); + + int r = update_fn(thread->state, arg); + + if (r < 0) { + return; + } + + tor_mutex_acquire(&pool->lock); + continue; + } + work = TOR_TAILQ_FIRST(&pool->work); + TOR_TAILQ_REMOVE(&pool->work, work, next_work); + work->pending = 0; + tor_mutex_release(&pool->lock); + + /* We run the work function without holding the thread lock. This + * is the main thread's first opportunity to give us more work. */ + result = work->fn(thread->state, work->arg); + + /* Queue the reply for the main thread. */ + queue_reply(thread->reply_queue, work); + + /* We may need to exit the thread. */ + if (result >= WQ_RPL_ERROR) { + return; + } + tor_mutex_acquire(&pool->lock); + } + /* At this point the lock is held, and there is no work in this thread's + * queue. */ + + /* TODO: support an idle-function */ + + /* Okay. Now, wait till somebody has work for us. */ + if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) { + log_warn(LD_GENERAL, "Fail tor_cond_wait."); + } + } +} + +/** Put a reply on the reply queue. The reply must not currently be on + * any thread's work queue. */ +static void +queue_reply(replyqueue_t *queue, workqueue_entry_t *work) +{ + int was_empty; + tor_mutex_acquire(&queue->lock); + was_empty = TOR_TAILQ_EMPTY(&queue->answers); + TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work); + tor_mutex_release(&queue->lock); + + if (was_empty) { + if (queue->alert.alert_fn(queue->alert.write_fd) < 0) { + /* XXXX complain! */ + } + } +} + +/** Allocate and start a new worker thread to use state object <b>state</b>, + * and send responses to <b>replyqueue</b>. */ +static workerthread_t * +workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue) +{ + workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); + thr->state = state; + thr->reply_queue = replyqueue; + thr->in_pool = pool; + + if (spawn_func(worker_thread_main, thr) < 0) { + log_err(LD_GENERAL, "Can't launch worker thread."); + return NULL; + } + + return thr; +} + +/** + * Queue an item of work for a thread in a thread pool. The function + * <b>fn</b> will be run in a worker thread, and will receive as arguments the + * thread's state object, and the provided object <b>arg</b>. It must return + * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN. + * + * Regardless of its return value, the function <b>reply_fn</b> will later be + * run in the main thread when it invokes replyqueue_process(), and will + * receive as its argument the same <b>arg</b> object. It's the reply + * function's responsibility to free the work object. + * + * On success, return a workqueue_entry_t object that can be passed to + * workqueue_entry_cancel(). On failure, return NULL. + * + * Note that because each thread has its own work queue, work items may not + * be executed strictly in order. + */ +workqueue_entry_t * +threadpool_queue_work(threadpool_t *pool, + int (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg) +{ + workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); + ent->on_pool = pool; + ent->pending = 1; + + tor_mutex_acquire(&pool->lock); + + TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work); + + tor_mutex_release(&pool->lock); + + tor_cond_signal_one(&pool->condition); + + return ent; +} + +/** + * Queue a copy of a work item for every thread in a pool. This can be used, + * for example, to tell the threads to update some parameter in their states. + * + * Arguments are as for <b>threadpool_queue_work</b>, except that the + * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to + * make a copy of it. + * + * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update + * will be run. If a new update is scheduled before the old update finishes + * running, then the new will replace the old in any threads that haven't run + * it yet. + * + * Return 0 on success, -1 on failure. + */ +int +threadpool_queue_update(threadpool_t *pool, + void *(*dup_fn)(void *), + int (*fn)(void *, void *), + void (*free_fn)(void *), + void *arg) +{ + int i, n_threads; + void (*old_args_free_fn)(void *arg); + void **old_args; + void **new_args; + + tor_mutex_acquire(&pool->lock); + n_threads = pool->n_threads; + old_args = pool->update_args; + old_args_free_fn = pool->free_update_arg_fn; + + new_args = tor_calloc(n_threads, sizeof(void*)); + for (i = 0; i < n_threads; ++i) { + if (dup_fn) + new_args[i] = dup_fn(arg); + else + new_args[i] = arg; + } + + pool->update_args = new_args; + pool->free_update_arg_fn = free_fn; + pool->update_fn = fn; + ++pool->generation; + + tor_mutex_release(&pool->lock); + + tor_cond_signal_all(&pool->condition); + + if (old_args) { + for (i = 0; i < n_threads; ++i) { + if (old_args[i] && old_args_free_fn) + old_args_free_fn(old_args[i]); + } + tor_free(old_args); + } + + return 0; +} + +/** Launch threads until we have <b>n</b>. */ +static int +threadpool_start_threads(threadpool_t *pool, int n) +{ + tor_mutex_acquire(&pool->lock); + + if (pool->n_threads < n) + pool->threads = tor_realloc(pool->threads, sizeof(workerthread_t*)*n); + + while (pool->n_threads < n) { + void *state = pool->new_thread_state_fn(pool->new_thread_state_arg); + workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue); + thr->index = pool->n_threads; + + if (!thr) { + tor_mutex_release(&pool->lock); + return -1; + } + pool->threads[pool->n_threads++] = thr; + } + tor_mutex_release(&pool->lock); + + return 0; +} + +/** + * Construct a new thread pool with <b>n</b> worker threads, configured to + * send their output to <b>replyqueue</b>. The threads' states will be + * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b> + * as its argument. When the threads close, they will call + * <b>free_thread_state_fn</b> on their states. + */ +threadpool_t * +threadpool_new(int n_threads, + replyqueue_t *replyqueue, + void *(*new_thread_state_fn)(void*), + void (*free_thread_state_fn)(void*), + void *arg) +{ + threadpool_t *pool; + pool = tor_malloc_zero(sizeof(threadpool_t)); + tor_mutex_init_nonrecursive(&pool->lock); + tor_cond_init(&pool->condition); + TOR_TAILQ_INIT(&pool->work); + + pool->new_thread_state_fn = new_thread_state_fn; + pool->new_thread_state_arg = arg; + pool->free_thread_state_fn = free_thread_state_fn; + pool->reply_queue = replyqueue; + + if (threadpool_start_threads(pool, n_threads) < 0) { + tor_mutex_uninit(&pool->lock); + tor_free(pool); + return NULL; + } + + return pool; +} + +/** Return the reply queue associated with a given thread pool. */ +replyqueue_t * +threadpool_get_replyqueue(threadpool_t *tp) +{ + return tp->reply_queue; +} + +/** Allocate a new reply queue. Reply queues are used to pass results from + * worker threads to the main thread. Since the main thread is running an + * IO-centric event loop, it needs to get woken up with means other than a + * condition variable. */ +replyqueue_t * +replyqueue_new(uint32_t alertsocks_flags) +{ + replyqueue_t *rq; + + rq = tor_malloc_zero(sizeof(replyqueue_t)); + if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) { + tor_free(rq); + return NULL; + } + + tor_mutex_init(&rq->lock); + TOR_TAILQ_INIT(&rq->answers); + + return rq; +} + +/** + * Return the "read socket" for a given reply queue. The main thread should + * listen for read events on this socket, and call replyqueue_process() every + * time it triggers. + */ +tor_socket_t +replyqueue_get_socket(replyqueue_t *rq) +{ + return rq->alert.read_fd; +} + +/** + * Process all pending replies on a reply queue. The main thread should call + * this function every time the socket returned by replyqueue_get_socket() is + * readable. + */ +void +replyqueue_process(replyqueue_t *queue) +{ + if (queue->alert.drain_fn(queue->alert.read_fd) < 0) { + static ratelim_t warn_limit = RATELIM_INIT(7200); + log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL, + "Failure from drain_fd"); + } + + tor_mutex_acquire(&queue->lock); + while (!TOR_TAILQ_EMPTY(&queue->answers)) { + /* lock must be held at this point.*/ + workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); + TOR_TAILQ_REMOVE(&queue->answers, work, next_work); + tor_mutex_release(&queue->lock); + work->on_pool = NULL; + + work->reply_fn(work->arg); + workqueue_entry_free(work); + + tor_mutex_acquire(&queue->lock); + } + + tor_mutex_release(&queue->lock); +} + diff --git a/src/common/workqueue.h b/src/common/workqueue.h new file mode 100644 index 0000000000..92e82b8a48 --- /dev/null +++ b/src/common/workqueue.h @@ -0,0 +1,48 @@ +/* Copyright (c) 2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_WORKQUEUE_H +#define TOR_WORKQUEUE_H + +#include "compat.h" + +/** A replyqueue is used to tell the main thread about the outcome of + * work that we queued for the the workers. */ +typedef struct replyqueue_s replyqueue_t; +/** A thread-pool manages starting threads and passing work to them. */ +typedef struct threadpool_s threadpool_t; +/** A workqueue entry represents a request that has been passed to a thread + * pool. */ +typedef struct workqueue_entry_s workqueue_entry_t; + +/** Possible return value from a work function: indicates success. */ +#define WQ_RPL_REPLY 0 +/** Possible return value from a work function: indicates fatal error */ +#define WQ_RPL_ERROR 1 +/** Possible return value from a work function: indicates thread is shutting + * down. */ +#define WQ_RPL_SHUTDOWN 2 + +workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, + int (*fn)(void *, void *), + void (*reply_fn)(void *), + void *arg); +int threadpool_queue_update(threadpool_t *pool, + void *(*dup_fn)(void *), + int (*fn)(void *, void *), + void (*free_fn)(void *), + void *arg); +void *workqueue_entry_cancel(workqueue_entry_t *pending_work); +threadpool_t *threadpool_new(int n_threads, + replyqueue_t *replyqueue, + void *(*new_thread_state_fn)(void*), + void (*free_thread_state_fn)(void*), + void *arg); +replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp); + +replyqueue_t *replyqueue_new(uint32_t alertsocks_flags); +tor_socket_t replyqueue_get_socket(replyqueue_t *rq); +void replyqueue_process(replyqueue_t *queue); + +#endif + |