summaryrefslogtreecommitdiff
path: root/src/common/workqueue.c
blob: 7b030052cfaeb483ccab9d80b55c274f4d549ce2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
/* copyright (c) 2013-2015, The Tor Project, Inc. */
/* See LICENSE for licensing information */

/**
 * \file workqueue.c
 *
 * \brief Implements worker threads, queues of work for them, and mechanisms
 * for them to send answers back to the main thread.
 *
 * The main structure here is a threadpool_t : it manages a set of worker
 * threads, a queue of pending work, and a reply queue.  Every piece of work
 * is a workqueue_entry_t, containing data to process and a function to
 * process it with.
 *
 * The main thread informs the worker threads of pending work by using a
 * condition variable.  The workers inform the main process of completed work
 * by using an alert_sockets_t object, as implemented in compat_threads.c.
 *
 * The main thread can also queue an "update" that will be handled by all the
 * workers.  This is useful for updating state that all the workers share.
 *
 * In Tor today, there is currently only one thread pool, used in cpuworker.c.
 */

#include "orconfig.h"
#include "common/compat.h"
#include "common/compat_libevent.h"
#include "lib/thread/threads.h"
#include "lib/crypt_ops/crypto_rand.h"
#include "common/util.h"
#include "common/workqueue.h"
#include "tor_queue.h"
#include "lib/net/alertsock.h"
#include "lib/log/torlog.h"
#include "lib/intmath/weakrng.h"

#include <event2/event.h>
#include <string.h>

#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)

TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_s);
typedef struct work_tailq_t work_tailq_t;

struct threadpool_s {
  /** An array of pointers to workerthread_t: one for each running worker
   * thread. */
  struct workerthread_s **threads;

  /** Condition variable that we wait on when we have no work, and which
   * gets signaled when our queue becomes nonempty. */
  tor_cond_t condition;
  /** Queues of pending work that we have to do. The queue with priority
   * <b>p</b> is work[p]. */
  work_tailq_t work[WORKQUEUE_N_PRIORITIES];

  /** Weak RNG, used to decide when to ignore priority. */
  tor_weak_rng_t weak_rng;

  /** The current 'update generation' of the threadpool.  Any thread that is
   * at an earlier generation needs to run the update function. */
  unsigned generation;

  /** Function that should be run for updates on each thread. */
  workqueue_reply_t (*update_fn)(void *, void *);
  /** Function to free update arguments if they can't be run. */
  void (*free_update_arg_fn)(void *);
  /** Array of n_threads update arguments. */
  void **update_args;
  /** Event to notice when another thread has sent a reply. */
  struct event *reply_event;
  void (*reply_cb)(threadpool_t *);

  /** Number of elements in threads. */
  int n_threads;
  /** Mutex to protect all the above fields. */
  tor_mutex_t lock;

  /** A reply queue to use when constructing new threads. */
  replyqueue_t *reply_queue;

  /** Functions used to allocate and free thread state. */
  void *(*new_thread_state_fn)(void*);
  void (*free_thread_state_fn)(void*);
  void *new_thread_state_arg;
};

/** Used to put a workqueue_priority_t value into a bitfield. */
#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
/** Number of bits needed to hold all legal values of workqueue_priority_t */
#define WORKQUEUE_PRIORITY_BITS 2

struct workqueue_entry_s {
  /** The next workqueue_entry_t that's pending on the same thread or
   * reply queue. */
  TOR_TAILQ_ENTRY(workqueue_entry_s) next_work;
  /** The threadpool to which this workqueue_entry_t was assigned. This field
   * is set when the workqueue_entry_t is created, and won't be cleared until
   * after it's handled in the main thread. */
  struct threadpool_s *on_pool;
  /** True iff this entry is waiting for a worker to start processing it. */
  uint8_t pending;
  /** Priority of this entry. */
  workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS;
  /** Function to run in the worker thread. */
  workqueue_reply_t (*fn)(void *state, void *arg);
  /** Function to run while processing the reply queue. */
  void (*reply_fn)(void *arg);
  /** Argument for the above functions. */
  void *arg;
};

struct replyqueue_s {
  /** Mutex to protect the answers field */
  tor_mutex_t lock;
  /** Doubly-linked list of answers that the reply queue needs to handle. */
  TOR_TAILQ_HEAD(, workqueue_entry_s) answers;

  /** Mechanism to wake up the main thread when it is receiving answers. */
  alert_sockets_t alert;
};

/** A worker thread represents a single thread in a thread pool. */
typedef struct workerthread_s {
  /** Which thread it this?  In range 0..in_pool->n_threads-1 */
  int index;
  /** The pool this thread is a part of. */
  struct threadpool_s *in_pool;
  /** User-supplied state field that we pass to the worker functions of each
   * work item. */
  void *state;
  /** Reply queue to which we pass our results. */
  replyqueue_t *reply_queue;
  /** The current update generation of this thread */
  unsigned generation;
  /** One over the probability of taking work from a lower-priority queue. */
  int32_t lower_priority_chance;
} workerthread_t;

static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);

/** Allocate and return a new workqueue_entry_t, set up to run the function
 * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
 * thread. See threadpool_queue_work() for full documentation. */
static workqueue_entry_t *
workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
                    void (*reply_fn)(void*),
                    void *arg)
{
  workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
  ent->fn = fn;
  ent->reply_fn = reply_fn;
  ent->arg = arg;
  ent->priority = WQ_PRI_HIGH;
  return ent;
}

#define workqueue_entry_free(ent) \
  FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))

/**
 * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on
 * any queue.
 */
static void
workqueue_entry_free_(workqueue_entry_t *ent)
{
  if (!ent)
    return;
  memset(ent, 0xf0, sizeof(*ent));
  tor_free(ent);
}

/**
 * Cancel a workqueue_entry_t that has been returned from
 * threadpool_queue_work.
 *
 * You must not call this function on any work whose reply function has been
 * executed in the main thread; that will cause undefined behavior (probably,
 * a crash).
 *
 * If the work is cancelled, this function return the argument passed to the
 * work function. It is the caller's responsibility to free this storage.
 *
 * This function will have no effect if the worker thread has already executed
 * or begun to execute the work item.  In that case, it will return NULL.
 */
void *
workqueue_entry_cancel(workqueue_entry_t *ent)
{
  int cancelled = 0;
  void *result = NULL;
  tor_mutex_acquire(&ent->on_pool->lock);
  workqueue_priority_t prio = ent->priority;
  if (ent->pending) {
    TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
    cancelled = 1;
    result = ent->arg;
  }
  tor_mutex_release(&ent->on_pool->lock);

  if (cancelled) {
    workqueue_entry_free(ent);
  }
  return result;
}

/**DOCDOC

   must hold lock */
static int
worker_thread_has_work(workerthread_t *thread)
{
  unsigned i;
  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
    if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
        return 1;
  }
  return thread->generation != thread->in_pool->generation;
}

/** Extract the next workqueue_entry_t from the the thread's pool, removing
 * it from the relevant queues and marking it as non-pending.
 *
 * The caller must hold the lock. */
static workqueue_entry_t *
worker_thread_extract_next_work(workerthread_t *thread)
{
  threadpool_t *pool = thread->in_pool;
  work_tailq_t *queue = NULL, *this_queue;
  unsigned i;
  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
    this_queue = &pool->work[i];
    if (!TOR_TAILQ_EMPTY(this_queue)) {
      queue = this_queue;
      if (! tor_weak_random_one_in_n(&pool->weak_rng,
                                     thread->lower_priority_chance)) {
        /* Usually we'll just break now, so that we can get out of the loop
         * and use the queue where we found work. But with a small
         * probability, we'll keep looking for lower priority work, so that
         * we don't ignore our low-priority queues entirely. */
        break;
      }
    }
  }

  if (queue == NULL)
    return NULL;

  workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
  TOR_TAILQ_REMOVE(queue, work, next_work);
  work->pending = 0;
  return work;
}

/**
 * Main function for the worker thread.
 */
static void
worker_thread_main(void *thread_)
{
  workerthread_t *thread = thread_;
  threadpool_t *pool = thread->in_pool;
  workqueue_entry_t *work;
  workqueue_reply_t result;

  tor_mutex_acquire(&pool->lock);
  while (1) {
    /* lock must be held at this point. */
    while (worker_thread_has_work(thread)) {
      /* lock must be held at this point. */
      if (thread->in_pool->generation != thread->generation) {
        void *arg = thread->in_pool->update_args[thread->index];
        thread->in_pool->update_args[thread->index] = NULL;
        workqueue_reply_t (*update_fn)(void*,void*) =
            thread->in_pool->update_fn;
        thread->generation = thread->in_pool->generation;
        tor_mutex_release(&pool->lock);

        workqueue_reply_t r = update_fn(thread->state, arg);

        if (r != WQ_RPL_REPLY) {
          return;
        }

        tor_mutex_acquire(&pool->lock);
        continue;
      }
      work = worker_thread_extract_next_work(thread);
      if (BUG(work == NULL))
        break;
      tor_mutex_release(&pool->lock);

      /* We run the work function without holding the thread lock. This
       * is the main thread's first opportunity to give us more work. */
      result = work->fn(thread->state, work->arg);

      /* Queue the reply for the main thread. */
      queue_reply(thread->reply_queue, work);

      /* We may need to exit the thread. */
      if (result != WQ_RPL_REPLY) {
        return;
      }
      tor_mutex_acquire(&pool->lock);
    }
    /* At this point the lock is held, and there is no work in this thread's
     * queue. */

    /* TODO: support an idle-function */

    /* Okay. Now, wait till somebody has work for us. */
    if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
      log_warn(LD_GENERAL, "Fail tor_cond_wait.");
    }
  }
}

/** Put a reply on the reply queue.  The reply must not currently be on
 * any thread's work queue. */
static void
queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
{
  int was_empty;
  tor_mutex_acquire(&queue->lock);
  was_empty = TOR_TAILQ_EMPTY(&queue->answers);
  TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
  tor_mutex_release(&queue->lock);

  if (was_empty) {
    if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
      /* XXXX complain! */
    }
  }
}

/** Allocate and start a new worker thread to use state object <b>state</b>,
 * and send responses to <b>replyqueue</b>. */
static workerthread_t *
workerthread_new(int32_t lower_priority_chance,
                 void *state, threadpool_t *pool, replyqueue_t *replyqueue)
{
  workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
  thr->state = state;
  thr->reply_queue = replyqueue;
  thr->in_pool = pool;
  thr->lower_priority_chance = lower_priority_chance;

  if (spawn_func(worker_thread_main, thr) < 0) {
    //LCOV_EXCL_START
    tor_assert_nonfatal_unreached();
    log_err(LD_GENERAL, "Can't launch worker thread.");
    tor_free(thr);
    return NULL;
    //LCOV_EXCL_STOP
  }

  return thr;
}

/**
 * Queue an item of work for a thread in a thread pool.  The function
 * <b>fn</b> will be run in a worker thread, and will receive as arguments the
 * thread's state object, and the provided object <b>arg</b>. It must return
 * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
 *
 * Regardless of its return value, the function <b>reply_fn</b> will later be
 * run in the main thread when it invokes replyqueue_process(), and will
 * receive as its argument the same <b>arg</b> object.  It's the reply
 * function's responsibility to free the work object.
 *
 * On success, return a workqueue_entry_t object that can be passed to
 * workqueue_entry_cancel(). On failure, return NULL.  (Failure is not
 * currently possible, but callers should check anyway.)
 *
 * Items are executed in a loose priority order -- each thread will usually
 * take from the queued work with the highest prioirity, but will occasionally
 * visit lower-priority queues to keep them from starving completely.
 *
 * Note that because of priorities and thread behavior, work items may not
 * be executed strictly in order.
 */
workqueue_entry_t *
threadpool_queue_work_priority(threadpool_t *pool,
                               workqueue_priority_t prio,
                               workqueue_reply_t (*fn)(void *, void *),
                               void (*reply_fn)(void *),
                               void *arg)
{
  tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
             ((int)prio) <= WORKQUEUE_PRIORITY_LAST);

  workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
  ent->on_pool = pool;
  ent->pending = 1;
  ent->priority = prio;

  tor_mutex_acquire(&pool->lock);

  TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);

  tor_cond_signal_one(&pool->condition);

  tor_mutex_release(&pool->lock);

  return ent;
}

/** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
workqueue_entry_t *
threadpool_queue_work(threadpool_t *pool,
                      workqueue_reply_t (*fn)(void *, void *),
                      void (*reply_fn)(void *),
                      void *arg)
{
  return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
}

/**
 * Queue a copy of a work item for every thread in a pool.  This can be used,
 * for example, to tell the threads to update some parameter in their states.
 *
 * Arguments are as for <b>threadpool_queue_work</b>, except that the
 * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
 * make a copy of it.
 *
 * UPDATE FUNCTIONS MUST BE IDEMPOTENT.  We do not guarantee that every update
 * will be run.  If a new update is scheduled before the old update finishes
 * running, then the new will replace the old in any threads that haven't run
 * it yet.
 *
 * Return 0 on success, -1 on failure.
 */
int
threadpool_queue_update(threadpool_t *pool,
                         void *(*dup_fn)(void *),
                         workqueue_reply_t (*fn)(void *, void *),
                         void (*free_fn)(void *),
                         void *arg)
{
  int i, n_threads;
  void (*old_args_free_fn)(void *arg);
  void **old_args;
  void **new_args;

  tor_mutex_acquire(&pool->lock);
  n_threads = pool->n_threads;
  old_args = pool->update_args;
  old_args_free_fn = pool->free_update_arg_fn;

  new_args = tor_calloc(n_threads, sizeof(void*));
  for (i = 0; i < n_threads; ++i) {
    if (dup_fn)
      new_args[i] = dup_fn(arg);
    else
      new_args[i] = arg;
  }

  pool->update_args = new_args;
  pool->free_update_arg_fn = free_fn;
  pool->update_fn = fn;
  ++pool->generation;

  tor_cond_signal_all(&pool->condition);

  tor_mutex_release(&pool->lock);

  if (old_args) {
    for (i = 0; i < n_threads; ++i) {
      if (old_args[i] && old_args_free_fn)
        old_args_free_fn(old_args[i]);
    }
    tor_free(old_args);
  }

  return 0;
}

/** Don't have more than this many threads per pool. */
#define MAX_THREADS 1024

/** For half of our threads, choose lower priority queues with probability
 * 1/N for each of these values. Both are chosen somewhat arbitrarily.  If
 * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks
 * stalling forever.  If it's too high, we have a risk of low-priority tasks
 * grabbing half of the threads. */
#define CHANCE_PERMISSIVE 37
#define CHANCE_STRICT INT32_MAX

/** Launch threads until we have <b>n</b>. */
static int
threadpool_start_threads(threadpool_t *pool, int n)
{
  if (BUG(n < 0))
    return -1; // LCOV_EXCL_LINE
  if (n > MAX_THREADS)
    n = MAX_THREADS;

  tor_mutex_acquire(&pool->lock);

  if (pool->n_threads < n)
    pool->threads = tor_reallocarray(pool->threads,
                                     sizeof(workerthread_t*), n);

  while (pool->n_threads < n) {
    /* For half of our threads, we'll choose lower priorities permissively;
     * for the other half, we'll stick more strictly to higher priorities.
     * This keeps slow low-priority tasks from taking over completely. */
    int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE;

    void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
    workerthread_t *thr = workerthread_new(chance,
                                           state, pool, pool->reply_queue);

    if (!thr) {
      //LCOV_EXCL_START
      tor_assert_nonfatal_unreached();
      pool->free_thread_state_fn(state);
      tor_mutex_release(&pool->lock);
      return -1;
      //LCOV_EXCL_STOP
    }
    thr->index = pool->n_threads;
    pool->threads[pool->n_threads++] = thr;
  }
  tor_mutex_release(&pool->lock);

  return 0;
}

/**
 * Construct a new thread pool with <b>n</b> worker threads, configured to
 * send their output to <b>replyqueue</b>.  The threads' states will be
 * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
 * as its argument.  When the threads close, they will call
 * <b>free_thread_state_fn</b> on their states.
 */
threadpool_t *
threadpool_new(int n_threads,
               replyqueue_t *replyqueue,
               void *(*new_thread_state_fn)(void*),
               void (*free_thread_state_fn)(void*),
               void *arg)
{
  threadpool_t *pool;
  pool = tor_malloc_zero(sizeof(threadpool_t));
  tor_mutex_init_nonrecursive(&pool->lock);
  tor_cond_init(&pool->condition);
  unsigned i;
  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
    TOR_TAILQ_INIT(&pool->work[i]);
  }
  {
    unsigned seed;
    crypto_rand((void*)&seed, sizeof(seed));
    tor_init_weak_random(&pool->weak_rng, seed);
  }

  pool->new_thread_state_fn = new_thread_state_fn;
  pool->new_thread_state_arg = arg;
  pool->free_thread_state_fn = free_thread_state_fn;
  pool->reply_queue = replyqueue;

  if (threadpool_start_threads(pool, n_threads) < 0) {
    //LCOV_EXCL_START
    tor_assert_nonfatal_unreached();
    tor_cond_uninit(&pool->condition);
    tor_mutex_uninit(&pool->lock);
    tor_free(pool);
    return NULL;
    //LCOV_EXCL_STOP
  }

  return pool;
}

/** Return the reply queue associated with a given thread pool. */
replyqueue_t *
threadpool_get_replyqueue(threadpool_t *tp)
{
  return tp->reply_queue;
}

/** Allocate a new reply queue.  Reply queues are used to pass results from
 * worker threads to the main thread.  Since the main thread is running an
 * IO-centric event loop, it needs to get woken up with means other than a
 * condition variable. */
replyqueue_t *
replyqueue_new(uint32_t alertsocks_flags)
{
  replyqueue_t *rq;

  rq = tor_malloc_zero(sizeof(replyqueue_t));
  if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
    //LCOV_EXCL_START
    tor_free(rq);
    return NULL;
    //LCOV_EXCL_STOP
  }

  tor_mutex_init(&rq->lock);
  TOR_TAILQ_INIT(&rq->answers);

  return rq;
}

/** Internal: Run from the libevent mainloop when there is work to handle in
 * the reply queue handler. */
static void
reply_event_cb(evutil_socket_t sock, short events, void *arg)
{
  threadpool_t *tp = arg;
  (void) sock;
  (void) events;
  replyqueue_process(tp->reply_queue);
  if (tp->reply_cb)
    tp->reply_cb(tp);
}

/** Register the threadpool <b>tp</b>'s reply queue with the libevent
 * mainloop of <b>base</b>. If <b>tp</b> is provided, it is run after
 * each time there is work to process from the reply queue. Return 0 on
 * success, -1 on failure.
 */
int
threadpool_register_reply_event(threadpool_t *tp,
                                void (*cb)(threadpool_t *tp))
{
  struct event_base *base = tor_libevent_get_base();

  if (tp->reply_event) {
    tor_event_free(tp->reply_event);
  }
  tp->reply_event = tor_event_new(base,
                                  tp->reply_queue->alert.read_fd,
                                  EV_READ|EV_PERSIST,
                                  reply_event_cb,
                                  tp);
  tor_assert(tp->reply_event);
  tp->reply_cb = cb;
  return event_add(tp->reply_event, NULL);
}

/**
 * Process all pending replies on a reply queue. The main thread should call
 * this function every time the socket returned by replyqueue_get_socket() is
 * readable.
 */
void
replyqueue_process(replyqueue_t *queue)
{
  int r = queue->alert.drain_fn(queue->alert.read_fd);
  if (r < 0) {
    //LCOV_EXCL_START
    static ratelim_t warn_limit = RATELIM_INIT(7200);
    log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
                 "Failure from drain_fd: %s",
                   tor_socket_strerror(-r));
    //LCOV_EXCL_STOP
  }

  tor_mutex_acquire(&queue->lock);
  while (!TOR_TAILQ_EMPTY(&queue->answers)) {
    /* lock must be held at this point.*/
    workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
    TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
    tor_mutex_release(&queue->lock);
    work->on_pool = NULL;

    work->reply_fn(work->arg);
    workqueue_entry_free(work);

    tor_mutex_acquire(&queue->lock);
  }

  tor_mutex_release(&queue->lock);
}