summaryrefslogtreecommitdiff
path: root/src/or/cpuworker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/or/cpuworker.c')
-rw-r--r--src/or/cpuworker.c599
1 files changed, 0 insertions, 599 deletions
diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c
deleted file mode 100644
index 8b58e4c68c..0000000000
--- a/src/or/cpuworker.c
+++ /dev/null
@@ -1,599 +0,0 @@
-/* Copyright (c) 2003-2004, Roger Dingledine.
- * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
- * Copyright (c) 2007-2018, The Tor Project, Inc. */
-/* See LICENSE for licensing information */
-
-/**
- * \file cpuworker.c
- * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
- * out to subprocesses.
- *
- * The multithreading backend for this module is in workqueue.c; this module
- * specializes workqueue.c.
- *
- * Right now, we use this infrastructure
- * <ul><li>for processing onionskins in onion.c
- * <li>for compressing consensuses in consdiffmgr.c,
- * <li>and for calculating diffs and compressing them in consdiffmgr.c.
- * </ul>
- **/
-#include "or/or.h"
-#include "or/channel.h"
-#include "or/circuitbuild.h"
-#include "or/circuitlist.h"
-#include "or/connection_or.h"
-#include "or/config.h"
-#include "or/cpuworker.h"
-#include "lib/crypt_ops/crypto_rand.h"
-#include "lib/crypt_ops/crypto_util.h"
-#include "or/main.h"
-#include "or/onion.h"
-#include "or/rephist.h"
-#include "or/router.h"
-#include "lib/evloop/workqueue.h"
-
-#include "or/or_circuit_st.h"
-#include "lib/intmath/weakrng.h"
-
-static void queue_pending_tasks(void);
-
-typedef struct worker_state_s {
- int generation;
- server_onion_keys_t *onion_keys;
-} worker_state_t;
-
-static void *
-worker_state_new(void *arg)
-{
- worker_state_t *ws;
- (void)arg;
- ws = tor_malloc_zero(sizeof(worker_state_t));
- ws->onion_keys = server_onion_keys_new();
- return ws;
-}
-
-#define worker_state_free(ws) \
- FREE_AND_NULL(worker_state_t, worker_state_free_, (ws))
-
-static void
-worker_state_free_(worker_state_t *ws)
-{
- if (!ws)
- return;
- server_onion_keys_free(ws->onion_keys);
- tor_free(ws);
-}
-
-static void
-worker_state_free_void(void *arg)
-{
- worker_state_free_(arg);
-}
-
-static replyqueue_t *replyqueue = NULL;
-static threadpool_t *threadpool = NULL;
-
-static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
-
-static int total_pending_tasks = 0;
-static int max_pending_tasks = 128;
-
-/** Initialize the cpuworker subsystem. It is OK to call this more than once
- * during Tor's lifetime.
- */
-void
-cpu_init(void)
-{
- if (!replyqueue) {
- replyqueue = replyqueue_new(0);
- }
- if (!threadpool) {
- /*
- In our threadpool implementation, half the threads are permissive and
- half are strict (when it comes to running lower-priority tasks). So we
- always make sure we have at least two threads, so that there will be at
- least one thread of each kind.
- */
- const int n_threads = get_num_cpus(get_options()) + 1;
- threadpool = threadpool_new(n_threads,
- replyqueue,
- worker_state_new,
- worker_state_free_void,
- NULL);
-
- int r = threadpool_register_reply_event(threadpool, NULL);
-
- tor_assert(r == 0);
- }
-
- /* Total voodoo. Can we make this more sensible? */
- max_pending_tasks = get_num_cpus(get_options()) * 64;
- crypto_seed_weak_rng(&request_sample_rng);
-}
-
-/** Magic numbers to make sure our cpuworker_requests don't grow any
- * mis-framing bugs. */
-#define CPUWORKER_REQUEST_MAGIC 0xda4afeed
-#define CPUWORKER_REPLY_MAGIC 0x5eedf00d
-
-/** A request sent to a cpuworker. */
-typedef struct cpuworker_request_t {
- /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
- uint32_t magic;
-
- /** Flag: Are we timing this request? */
- unsigned timed : 1;
- /** If we're timing this request, when was it sent to the cpuworker? */
- struct timeval started_at;
-
- /** A create cell for the cpuworker to process. */
- create_cell_t create_cell;
-
- /* Turn the above into a tagged union if needed. */
-} cpuworker_request_t;
-
-/** A reply sent by a cpuworker. */
-typedef struct cpuworker_reply_t {
- /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
- uint32_t magic;
-
- /** True iff we got a successful request. */
- uint8_t success;
-
- /** Are we timing this request? */
- unsigned int timed : 1;
- /** What handshake type was the request? (Used for timing) */
- uint16_t handshake_type;
- /** When did we send the request to the cpuworker? */
- struct timeval started_at;
- /** Once the cpuworker received the request, how many microseconds did it
- * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
- * and we'll never have an onion handshake that takes so long.) */
- uint32_t n_usec;
-
- /** Output of processing a create cell
- *
- * @{
- */
- /** The created cell to send back. */
- created_cell_t created_cell;
- /** The keys to use on this circuit. */
- uint8_t keys[CPATH_KEY_MATERIAL_LEN];
- /** Input to use for authenticating introduce1 cells. */
- uint8_t rend_auth_material[DIGEST_LEN];
-} cpuworker_reply_t;
-
-typedef struct cpuworker_job_u {
- or_circuit_t *circ;
- union {
- cpuworker_request_t request;
- cpuworker_reply_t reply;
- } u;
-} cpuworker_job_t;
-
-static workqueue_reply_t
-update_state_threadfn(void *state_, void *work_)
-{
- worker_state_t *state = state_;
- worker_state_t *update = work_;
- server_onion_keys_free(state->onion_keys);
- state->onion_keys = update->onion_keys;
- update->onion_keys = NULL;
- worker_state_free(update);
- ++state->generation;
- return WQ_RPL_REPLY;
-}
-
-/** Called when the onion key has changed so update all CPU worker(s) with
- * new function pointers with which a new state will be generated.
- */
-void
-cpuworkers_rotate_keyinfo(void)
-{
- if (!threadpool) {
- /* If we're a client, then we won't have cpuworkers, and we won't need
- * to tell them to rotate their state.
- */
- return;
- }
- if (threadpool_queue_update(threadpool,
- worker_state_new,
- update_state_threadfn,
- worker_state_free_void,
- NULL)) {
- log_warn(LD_OR, "Failed to queue key update for worker threads.");
- }
-}
-
-/** Indexed by handshake type: how many onionskins have we processed and
- * counted of that type? */
-static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
-/** Indexed by handshake type, corresponding to the onionskins counted in
- * onionskins_n_processed: how many microseconds have we spent in cpuworkers
- * processing that kind of onionskin? */
-static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
-/** Indexed by handshake type, corresponding to onionskins counted in
- * onionskins_n_processed: how many microseconds have we spent waiting for
- * cpuworkers to give us answers for that kind of onionskin?
- */
-static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
-
-/** If any onionskin takes longer than this, we clip them to this
- * time. (microseconds) */
-#define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
-
-/** Return true iff we'd like to measure a handshake of type
- * <b>onionskin_type</b>. Call only from the main thread. */
-static int
-should_time_request(uint16_t onionskin_type)
-{
- /* If we've never heard of this type, we shouldn't even be here. */
- if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
- return 0;
- /* Measure the first N handshakes of each type, to ensure we have a
- * sample */
- if (onionskins_n_processed[onionskin_type] < 4096)
- return 1;
- /** Otherwise, measure with P=1/128. We avoid doing this for every
- * handshake, since the measurement itself can take a little time. */
- return tor_weak_random_one_in_n(&request_sample_rng, 128);
-}
-
-/** Return an estimate of how many microseconds we will need for a single
- * cpuworker to process <b>n_requests</b> onionskins of type
- * <b>onionskin_type</b>. */
-uint64_t
-estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
-{
- if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
- return 1000 * (uint64_t)n_requests;
- if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
- /* Until we have 100 data points, just asssume everything takes 1 msec. */
- return 1000 * (uint64_t)n_requests;
- } else {
- /* This can't overflow: we'll never have more than 500000 onionskins
- * measured in onionskin_usec_internal, and they won't take anything near
- * 1 sec each, and we won't have anything like 1 million queued
- * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
- * UINT64_MAX. */
- return (onionskins_usec_internal[onionskin_type] * n_requests) /
- onionskins_n_processed[onionskin_type];
- }
-}
-
-/** Compute the absolute and relative overhead of using the cpuworker
- * framework for onionskins of type <b>onionskin_type</b>.*/
-static int
-get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
- uint16_t onionskin_type)
-{
- uint64_t overhead;
-
- *usec_out = 0;
- *frac_out = 0.0;
-
- if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
- return -1;
- if (onionskins_n_processed[onionskin_type] == 0 ||
- onionskins_usec_internal[onionskin_type] == 0 ||
- onionskins_usec_roundtrip[onionskin_type] == 0)
- return -1;
-
- overhead = onionskins_usec_roundtrip[onionskin_type] -
- onionskins_usec_internal[onionskin_type];
-
- *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
- *frac_out = ((double)overhead) / onionskins_usec_internal[onionskin_type];
-
- return 0;
-}
-
-/** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
- * log it. */
-void
-cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
- const char *onionskin_type_name)
-{
- uint32_t overhead;
- double relative_overhead;
- int r;
-
- r = get_overhead_for_onionskins(&overhead, &relative_overhead,
- onionskin_type);
- if (!overhead || r<0)
- return;
-
- log_fn(severity, LD_OR,
- "%s onionskins have averaged %u usec overhead (%.2f%%) in "
- "cpuworker code ",
- onionskin_type_name, (unsigned)overhead, relative_overhead*100);
-}
-
-/** Handle a reply from the worker threads. */
-static void
-cpuworker_onion_handshake_replyfn(void *work_)
-{
- cpuworker_job_t *job = work_;
- cpuworker_reply_t rpl;
- or_circuit_t *circ = NULL;
-
- tor_assert(total_pending_tasks > 0);
- --total_pending_tasks;
-
- /* Could avoid this, but doesn't matter. */
- memcpy(&rpl, &job->u.reply, sizeof(rpl));
-
- tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
-
- if (rpl.timed && rpl.success &&
- rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
- /* Time how long this request took. The handshake_type check should be
- needless, but let's leave it in to be safe. */
- struct timeval tv_end, tv_diff;
- int64_t usec_roundtrip;
- tor_gettimeofday(&tv_end);
- timersub(&tv_end, &rpl.started_at, &tv_diff);
- usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
- if (usec_roundtrip >= 0 &&
- usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
- ++onionskins_n_processed[rpl.handshake_type];
- onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
- onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
- if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
- /* Scale down every 500000 handshakes. On a busy server, that's
- * less impressive than it sounds. */
- onionskins_n_processed[rpl.handshake_type] /= 2;
- onionskins_usec_internal[rpl.handshake_type] /= 2;
- onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
- }
- }
- }
-
- circ = job->circ;
-
- log_debug(LD_OR,
- "Unpacking cpuworker reply %p, circ=%p, success=%d",
- job, circ, rpl.success);
-
- if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
- /* The circuit was supposed to get freed while the reply was
- * pending. Instead, it got left for us to free so that we wouldn't freak
- * out when the job->circ field wound up pointing to nothing. */
- log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
- circ->base_.magic = 0;
- tor_free(circ);
- goto done_processing;
- }
-
- circ->workqueue_entry = NULL;
-
- if (TO_CIRCUIT(circ)->marked_for_close) {
- /* We already marked this circuit; we can't call it open. */
- log_debug(LD_OR,"circuit is already marked.");
- goto done_processing;
- }
-
- if (rpl.success == 0) {
- log_debug(LD_OR,
- "decoding onionskin failed. "
- "(Old key or bad software.) Closing.");
- circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
- goto done_processing;
- }
-
- if (onionskin_answer(circ,
- &rpl.created_cell,
- (const char*)rpl.keys, sizeof(rpl.keys),
- rpl.rend_auth_material) < 0) {
- log_warn(LD_OR,"onionskin_answer failed. Closing.");
- circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
- goto done_processing;
- }
- log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
-
- done_processing:
- memwipe(&rpl, 0, sizeof(rpl));
- memwipe(job, 0, sizeof(*job));
- tor_free(job);
- queue_pending_tasks();
-}
-
-/** Implementation function for onion handshake requests. */
-static workqueue_reply_t
-cpuworker_onion_handshake_threadfn(void *state_, void *work_)
-{
- worker_state_t *state = state_;
- cpuworker_job_t *job = work_;
-
- /* variables for onion processing */
- server_onion_keys_t *onion_keys = state->onion_keys;
- cpuworker_request_t req;
- cpuworker_reply_t rpl;
-
- memcpy(&req, &job->u.request, sizeof(req));
-
- tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
- memset(&rpl, 0, sizeof(rpl));
-
- const create_cell_t *cc = &req.create_cell;
- created_cell_t *cell_out = &rpl.created_cell;
- struct timeval tv_start = {0,0}, tv_end;
- int n;
- rpl.timed = req.timed;
- rpl.started_at = req.started_at;
- rpl.handshake_type = cc->handshake_type;
- if (req.timed)
- tor_gettimeofday(&tv_start);
- n = onion_skin_server_handshake(cc->handshake_type,
- cc->onionskin, cc->handshake_len,
- onion_keys,
- cell_out->reply,
- rpl.keys, CPATH_KEY_MATERIAL_LEN,
- rpl.rend_auth_material);
- if (n < 0) {
- /* failure */
- log_debug(LD_OR,"onion_skin_server_handshake failed.");
- memset(&rpl, 0, sizeof(rpl));
- rpl.success = 0;
- } else {
- /* success */
- log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
- cell_out->handshake_len = n;
- switch (cc->cell_type) {
- case CELL_CREATE:
- cell_out->cell_type = CELL_CREATED; break;
- case CELL_CREATE2:
- cell_out->cell_type = CELL_CREATED2; break;
- case CELL_CREATE_FAST:
- cell_out->cell_type = CELL_CREATED_FAST; break;
- default:
- tor_assert(0);
- return WQ_RPL_SHUTDOWN;
- }
- rpl.success = 1;
- }
- rpl.magic = CPUWORKER_REPLY_MAGIC;
- if (req.timed) {
- struct timeval tv_diff;
- int64_t usec;
- tor_gettimeofday(&tv_end);
- timersub(&tv_end, &tv_start, &tv_diff);
- usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
- if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
- rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
- else
- rpl.n_usec = (uint32_t) usec;
- }
-
- memcpy(&job->u.reply, &rpl, sizeof(rpl));
-
- memwipe(&req, 0, sizeof(req));
- memwipe(&rpl, 0, sizeof(req));
- return WQ_RPL_REPLY;
-}
-
-/** Take pending tasks from the queue and assign them to cpuworkers. */
-static void
-queue_pending_tasks(void)
-{
- or_circuit_t *circ;
- create_cell_t *onionskin = NULL;
-
- while (total_pending_tasks < max_pending_tasks) {
- circ = onion_next_task(&onionskin);
-
- if (!circ)
- return;
-
- if (assign_onionskin_to_cpuworker(circ, onionskin) < 0)
- log_info(LD_OR,"assign_to_cpuworker failed. Ignoring.");
- }
-}
-
-/** DOCDOC */
-MOCK_IMPL(workqueue_entry_t *,
-cpuworker_queue_work,(workqueue_priority_t priority,
- workqueue_reply_t (*fn)(void *, void *),
- void (*reply_fn)(void *),
- void *arg))
-{
- tor_assert(threadpool);
-
- return threadpool_queue_work_priority(threadpool,
- priority,
- fn,
- reply_fn,
- arg);
-}
-
-/** Try to tell a cpuworker to perform the public key operations necessary to
- * respond to <b>onionskin</b> for the circuit <b>circ</b>.
- *
- * Return 0 if we successfully assign the task, or -1 on failure.
- */
-int
-assign_onionskin_to_cpuworker(or_circuit_t *circ,
- create_cell_t *onionskin)
-{
- workqueue_entry_t *queue_entry;
- cpuworker_job_t *job;
- cpuworker_request_t req;
- int should_time;
-
- tor_assert(threadpool);
-
- if (!circ->p_chan) {
- log_info(LD_OR,"circ->p_chan gone. Failing circ.");
- tor_free(onionskin);
- return -1;
- }
-
- if (total_pending_tasks >= max_pending_tasks) {
- log_debug(LD_OR,"No idle cpuworkers. Queuing.");
- if (onion_pending_add(circ, onionskin) < 0) {
- tor_free(onionskin);
- return -1;
- }
- return 0;
- }
-
- if (!channel_is_client(circ->p_chan))
- rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
-
- should_time = should_time_request(onionskin->handshake_type);
- memset(&req, 0, sizeof(req));
- req.magic = CPUWORKER_REQUEST_MAGIC;
- req.timed = should_time;
-
- memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
-
- tor_free(onionskin);
-
- if (should_time)
- tor_gettimeofday(&req.started_at);
-
- job = tor_malloc_zero(sizeof(cpuworker_job_t));
- job->circ = circ;
- memcpy(&job->u.request, &req, sizeof(req));
- memwipe(&req, 0, sizeof(req));
-
- ++total_pending_tasks;
- queue_entry = threadpool_queue_work_priority(threadpool,
- WQ_PRI_HIGH,
- cpuworker_onion_handshake_threadfn,
- cpuworker_onion_handshake_replyfn,
- job);
- if (!queue_entry) {
- log_warn(LD_BUG, "Couldn't queue work on threadpool");
- tor_free(job);
- return -1;
- }
-
- log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
- job, queue_entry, job->circ);
-
- circ->workqueue_entry = queue_entry;
-
- return 0;
-}
-
-/** If <b>circ</b> has a pending handshake that hasn't been processed yet,
- * remove it from the worker queue. */
-void
-cpuworker_cancel_circ_handshake(or_circuit_t *circ)
-{
- cpuworker_job_t *job;
- if (circ->workqueue_entry == NULL)
- return;
-
- job = workqueue_entry_cancel(circ->workqueue_entry);
- if (job) {
- /* It successfully cancelled. */
- memwipe(job, 0xe0, sizeof(*job));
- tor_free(job);
- tor_assert(total_pending_tasks > 0);
- --total_pending_tasks;
- /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
- circ->workqueue_entry = NULL;
- }
-}