aboutsummaryrefslogtreecommitdiff
path: root/src/common/workqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/common/workqueue.c')
-rw-r--r--src/common/workqueue.c68
1 files changed, 12 insertions, 56 deletions
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index 80e061dfb5..ed70d5e5fc 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -9,11 +9,6 @@
#include "tor_queue.h"
#include "torlog.h"
-#ifdef HAVE_UNISTD_H
-// XXXX move wherever we move the write/send stuff
-#include <unistd.h>
-#endif
-
/*
design:
@@ -44,9 +39,7 @@ struct replyqueue_s {
tor_mutex_t lock;
TOR_TAILQ_HEAD(, workqueue_entry_s) answers;
- void (*alert_fn)(struct replyqueue_s *); // lock not held on this, next 2.
- tor_socket_t write_sock;
- tor_socket_t read_sock;
+ alert_sockets_t alert; // lock not held on this.
};
typedef struct workerthread_s {
@@ -169,22 +162,12 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
tor_mutex_release(&queue->lock);
if (was_empty) {
- queue->alert_fn(queue);
+ if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
+ /* XXXX complain! */
+ }
}
}
-
-static void
-alert_by_fd(replyqueue_t *queue)
-{
- /* XXX extract this into new function */
-#ifndef _WIN32
- (void) send(queue->write_sock, "x", 1, 0);
-#else
- (void) write(queue->write_sock, "x", 1);
-#endif
-}
-
static workerthread_t *
workerthread_new(void *state, replyqueue_t *replyqueue)
{
@@ -293,59 +276,32 @@ threadpool_get_replyqueue(threadpool_t *tp)
replyqueue_t *
replyqueue_new(void)
{
- tor_socket_t pair[2];
replyqueue_t *rq;
- int r;
-
- /* XXX extract this into new function */
-#ifdef _WIN32
- r = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, pair);
-#else
- r = pipe(pair);
-#endif
- if (r < 0)
- return NULL;
-
- set_socket_nonblocking(pair[0]); /* the read-size should be nonblocking. */
-#if defined(FD_CLOEXEC)
- fcntl(pair[0], F_SETFD, FD_CLOEXEC);
- fcntl(pair[1], F_SETFD, FD_CLOEXEC);
-#endif
rq = tor_malloc_zero(sizeof(replyqueue_t));
+ if (alert_sockets_create(&rq->alert) < 0) {
+ tor_free(rq);
+ return NULL;
+ }
tor_mutex_init(&rq->lock);
TOR_TAILQ_INIT(&rq->answers);
- rq->read_sock = pair[0];
- rq->write_sock = pair[1];
- rq->alert_fn = alert_by_fd;
-
return rq;
}
tor_socket_t
replyqueue_get_socket(replyqueue_t *rq)
{
- return rq->read_sock;
+ return rq->alert.read_fd;
}
void
replyqueue_process(replyqueue_t *queue)
{
- ssize_t r;
-
- /* XXX extract this into new function */
- do {
- char buf[64];
-#ifdef _WIN32
- r = recv(queue->read_sock, buf, sizeof(buf), 0);
-#else
- r = read(queue->read_sock, buf, sizeof(buf));
-#endif
- } while (r > 0);
-
- /* XXXX freak out on r == 0, or r == "error, not retryable". */
+ if (queue->alert.drain_fn(queue->alert.read_fd) < 0) {
+ /* XXXX complain! */
+ }
tor_mutex_acquire(&queue->lock);
while (!TOR_TAILQ_EMPTY(&queue->answers)) {