diff options
Diffstat (limited to 'src/common')
-rw-r--r-- | src/common/compat_threads.c | 150 | ||||
-rw-r--r-- | src/common/compat_threads.h | 11 | ||||
-rw-r--r-- | src/common/workqueue.c | 68 |
3 files changed, 173 insertions, 56 deletions
diff --git a/src/common/compat_threads.c b/src/common/compat_threads.c index e0cbf5c1d8..98bdbbcf5e 100644 --- a/src/common/compat_threads.c +++ b/src/common/compat_threads.c @@ -3,8 +3,24 @@ * Copyright (c) 2007-2015, The Tor Project, Inc. */ /* See LICENSE for licensing information */ +#include "orconfig.h" +#define _GNU_SOURCE +#include <stdlib.h> #include "compat.h" +#include "compat_threads.h" + #include "util.h" +#include "torlog.h" + +#ifdef HAVE_SYS_EVENTFD_H +#include <sys/eventfd.h> +#endif +#ifdef HAVE_UNISTD_H +#include <unistd.h> +#endif +#ifdef HAVE_FCNTL_H +#include <fcntl.h> +#endif /** Return a newly allocated, ready-for-use mutex. */ tor_mutex_t * @@ -57,3 +73,137 @@ in_main_thread(void) { return main_thread_id == tor_get_thread_id(); } + +#ifdef HAVE_EVENTFD +static int +eventfd_alert(int fd) +{ + uint64_t u = 1; + int r = write(fd, (void*)&u, sizeof(u)); + if (r < 0 && errno != EAGAIN) + return -1; + return 0; +} + +static int +eventfd_drain(int fd) +{ + uint64_t u = 0; + int r = read(fd, (void*)&u, sizeof(u)); + if (r < 0 && errno != EAGAIN) + return -1; + return 0; +} +#endif + +#ifdef HAVE_PIPE +static int +pipe_alert(int fd) +{ + ssize_t r = write(fd, "x", 1); + if (r < 0 && errno != EAGAIN) + return -1; + return 0; +} + +static int +pipe_drain(int fd) +{ + char buf[32]; + ssize_t r; + while ((r = read(fd, buf, sizeof(buf))) >= 0) + ; + if (r == 0 || errno != EAGAIN) + return -1; + return 0; +} +#endif + +static int +sock_alert(tor_socket_t fd) +{ + ssize_t r = send(fd, "x", 1, 0); + if (r < 0 && !ERRNO_IS_EAGAIN(tor_socket_errno(fd))) + return -1; + return 0; +} + +static int +sock_drain(tor_socket_t fd) +{ + char buf[32]; + ssize_t r; + while ((r = recv(fd, buf, sizeof(buf), 0)) >= 0) + ; + if (r == 0 || !ERRNO_IS_EAGAIN(tor_socket_errno(fd))) + return -1; + return 0; +} + +/** Allocate a new set of alert sockets. DOCDOC */ +int +alert_sockets_create(alert_sockets_t *socks_out) +{ + tor_socket_t socks[2]; + +#ifdef HAVE_EVENTFD +#if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK) + socks[0] = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); +#else + socks[0] = -1; +#endif + if (socks[0] < 0) { + socks[0] = eventfd(0,0); + if (socks[0] >= 0) { + if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 || + set_socket_nonblocking(socks[0]) < 0) { + close(socks[0]); + return -1; + } + } + } + if (socks[0] >= 0) { + socks_out->read_fd = socks_out->write_fd = socks[0]; + socks_out->alert_fn = eventfd_alert; + socks_out->drain_fn = eventfd_drain; + return 0; + } +#endif + +#ifdef HAVE_PIPE2 + if (pipe2(socks, O_NONBLOCK|O_CLOEXEC) == 0) { + socks_out->read_fd = socks[0]; + socks_out->write_fd = socks[1]; + socks_out->alert_fn = pipe_alert; + socks_out->drain_fn = pipe_drain; + return 0; + } +#endif + +#ifdef HAVE_PIPE + if (pipe(socks) == 0) { + if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 || + fcntl(socks[1], F_SETFD, FD_CLOEXEC) < 0 || + set_socket_nonblocking(socks[0]) < 0 || + set_socket_nonblocking(socks[1]) < 0) { + close(socks[0]); + close(socks[1]); + return -1; + } + socks_out->read_fd = socks[0]; + socks_out->write_fd = socks[1]; + socks_out->alert_fn = pipe_alert; + socks_out->drain_fn = pipe_drain; + return 0; + } +#endif + + if (tor_socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == 0) { + set_socket_nonblocking(socks[0]); + set_socket_nonblocking(socks[1]); + socks_out->alert_fn = sock_alert; + socks_out->drain_fn = sock_drain; + return 0; + } + return -1; +} diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h index 581d8dd7b9..b053136c15 100644 --- a/src/common/compat_threads.h +++ b/src/common/compat_threads.h @@ -82,4 +82,15 @@ int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, void tor_cond_signal_one(tor_cond_t *cond); void tor_cond_signal_all(tor_cond_t *cond); +/** DOCDOC */ +typedef struct alert_sockets_s { + /*XXX needs a better name */ + tor_socket_t read_fd; + tor_socket_t write_fd; + int (*alert_fn)(tor_socket_t write_fd); + int (*drain_fn)(tor_socket_t read_fd); +} alert_sockets_t; + +int alert_sockets_create(alert_sockets_t *socks_out); + #endif diff --git a/src/common/workqueue.c b/src/common/workqueue.c index 80e061dfb5..ed70d5e5fc 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -9,11 +9,6 @@ #include "tor_queue.h" #include "torlog.h" -#ifdef HAVE_UNISTD_H -// XXXX move wherever we move the write/send stuff -#include <unistd.h> -#endif - /* design: @@ -44,9 +39,7 @@ struct replyqueue_s { tor_mutex_t lock; TOR_TAILQ_HEAD(, workqueue_entry_s) answers; - void (*alert_fn)(struct replyqueue_s *); // lock not held on this, next 2. - tor_socket_t write_sock; - tor_socket_t read_sock; + alert_sockets_t alert; // lock not held on this. }; typedef struct workerthread_s { @@ -169,22 +162,12 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work) tor_mutex_release(&queue->lock); if (was_empty) { - queue->alert_fn(queue); + if (queue->alert.alert_fn(queue->alert.write_fd) < 0) { + /* XXXX complain! */ + } } } - -static void -alert_by_fd(replyqueue_t *queue) -{ - /* XXX extract this into new function */ -#ifndef _WIN32 - (void) send(queue->write_sock, "x", 1, 0); -#else - (void) write(queue->write_sock, "x", 1); -#endif -} - static workerthread_t * workerthread_new(void *state, replyqueue_t *replyqueue) { @@ -293,59 +276,32 @@ threadpool_get_replyqueue(threadpool_t *tp) replyqueue_t * replyqueue_new(void) { - tor_socket_t pair[2]; replyqueue_t *rq; - int r; - - /* XXX extract this into new function */ -#ifdef _WIN32 - r = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, pair); -#else - r = pipe(pair); -#endif - if (r < 0) - return NULL; - - set_socket_nonblocking(pair[0]); /* the read-size should be nonblocking. */ -#if defined(FD_CLOEXEC) - fcntl(pair[0], F_SETFD, FD_CLOEXEC); - fcntl(pair[1], F_SETFD, FD_CLOEXEC); -#endif rq = tor_malloc_zero(sizeof(replyqueue_t)); + if (alert_sockets_create(&rq->alert) < 0) { + tor_free(rq); + return NULL; + } tor_mutex_init(&rq->lock); TOR_TAILQ_INIT(&rq->answers); - rq->read_sock = pair[0]; - rq->write_sock = pair[1]; - rq->alert_fn = alert_by_fd; - return rq; } tor_socket_t replyqueue_get_socket(replyqueue_t *rq) { - return rq->read_sock; + return rq->alert.read_fd; } void replyqueue_process(replyqueue_t *queue) { - ssize_t r; - - /* XXX extract this into new function */ - do { - char buf[64]; -#ifdef _WIN32 - r = recv(queue->read_sock, buf, sizeof(buf), 0); -#else - r = read(queue->read_sock, buf, sizeof(buf)); -#endif - } while (r > 0); - - /* XXXX freak out on r == 0, or r == "error, not retryable". */ + if (queue->alert.drain_fn(queue->alert.read_fd) < 0) { + /* XXXX complain! */ + } tor_mutex_acquire(&queue->lock); while (!TOR_TAILQ_EMPTY(&queue->answers)) { |