/* Copyright (c) 2003-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
* Copyright (c) 2007-2021, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file cpuworker.c
* \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
* out to subprocesses.
*
* The multithreading backend for this module is in workqueue.c; this module
* specializes workqueue.c.
*
* Right now, we use this infrastructure
*
- for processing onionskins in onion.c
*
- for compressing consensuses in consdiffmgr.c,
*
- and for calculating diffs and compressing them in consdiffmgr.c.
*
**/
#include "core/or/or.h"
#include "core/or/channel.h"
#include "core/or/circuitlist.h"
#include "core/or/connection_or.h"
#include "core/or/congestion_control_common.h"
#include "core/or/congestion_control_flow.h"
#include "app/config/config.h"
#include "core/mainloop/cpuworker.h"
#include "lib/crypt_ops/crypto_rand.h"
#include "lib/crypt_ops/crypto_util.h"
#include "core/or/onion.h"
#include "feature/relay/circuitbuild_relay.h"
#include "feature/relay/onion_queue.h"
#include "feature/stats/rephist.h"
#include "feature/relay/router.h"
#include "lib/evloop/workqueue.h"
#include "core/crypto/onion_crypto.h"
#include "core/or/or_circuit_st.h"
static void queue_pending_tasks(void);
typedef struct worker_state_t {
int generation;
server_onion_keys_t *onion_keys;
} worker_state_t;
static void *
worker_state_new(void *arg)
{
worker_state_t *ws;
(void)arg;
ws = tor_malloc_zero(sizeof(worker_state_t));
ws->onion_keys = server_onion_keys_new();
return ws;
}
#define worker_state_free(ws) \
FREE_AND_NULL(worker_state_t, worker_state_free_, (ws))
static void
worker_state_free_(worker_state_t *ws)
{
if (!ws)
return;
server_onion_keys_free(ws->onion_keys);
tor_free(ws);
}
static void
worker_state_free_void(void *arg)
{
worker_state_free_(arg);
}
static replyqueue_t *replyqueue = NULL;
static threadpool_t *threadpool = NULL;
static int total_pending_tasks = 0;
static int max_pending_tasks = 128;
/** Initialize the cpuworker subsystem. It is OK to call this more than once
* during Tor's lifetime.
*/
void
cpu_init(void)
{
if (!replyqueue) {
replyqueue = replyqueue_new(0);
}
if (!threadpool) {
/*
In our threadpool implementation, half the threads are permissive and
half are strict (when it comes to running lower-priority tasks). So we
always make sure we have at least two threads, so that there will be at
least one thread of each kind.
*/
const int n_threads = get_num_cpus(get_options()) + 1;
threadpool = threadpool_new(n_threads,
replyqueue,
worker_state_new,
worker_state_free_void,
NULL);
int r = threadpool_register_reply_event(threadpool, NULL);
tor_assert(r == 0);
}
/* Total voodoo. Can we make this more sensible? */
max_pending_tasks = get_num_cpus(get_options()) * 64;
}
/** Magic numbers to make sure our cpuworker_requests don't grow any
* mis-framing bugs. */
#define CPUWORKER_REQUEST_MAGIC 0xda4afeed
#define CPUWORKER_REPLY_MAGIC 0x5eedf00d
/** A request sent to a cpuworker. */
typedef struct cpuworker_request_t {
/** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
uint32_t magic;
/** Flag: Are we timing this request? */
unsigned timed : 1;
/** If we're timing this request, when was it sent to the cpuworker? */
struct timeval started_at;
/** A create cell for the cpuworker to process. */
create_cell_t create_cell;
/**
* A copy of this relay's consensus params that are relevant to
* the circuit, for use in negotiation. */
circuit_params_t circ_ns_params;
/* Turn the above into a tagged union if needed. */
} cpuworker_request_t;
/** A reply sent by a cpuworker. */
typedef struct cpuworker_reply_t {
/** Magic number; must be CPUWORKER_REPLY_MAGIC. */
uint32_t magic;
/** True iff we got a successful request. */
uint8_t success;
/** Are we timing this request? */
unsigned int timed : 1;
/** What handshake type was the request? (Used for timing) */
uint16_t handshake_type;
/** When did we send the request to the cpuworker? */
struct timeval started_at;
/** Once the cpuworker received the request, how many microseconds did it
* take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
* and we'll never have an onion handshake that takes so long.) */
uint32_t n_usec;
/** Output of processing a create cell
*
* @{
*/
/** The created cell to send back. */
created_cell_t created_cell;
/** The keys to use on this circuit. */
uint8_t keys[CPATH_KEY_MATERIAL_LEN];
/** Input to use for authenticating introduce1 cells. */
uint8_t rend_auth_material[DIGEST_LEN];
/** Negotiated circuit parameters. */
circuit_params_t circ_params;
} cpuworker_reply_t;
typedef struct cpuworker_job_u_t {
or_circuit_t *circ;
union {
cpuworker_request_t request;
cpuworker_reply_t reply;
} u;
} cpuworker_job_t;
static workqueue_reply_t
update_state_threadfn(void *state_, void *work_)
{
worker_state_t *state = state_;
worker_state_t *update = work_;
server_onion_keys_free(state->onion_keys);
state->onion_keys = update->onion_keys;
update->onion_keys = NULL;
worker_state_free(update);
++state->generation;
return WQ_RPL_REPLY;
}
/** Called when the onion key has changed so update all CPU worker(s) with
* new function pointers with which a new state will be generated.
*/
void
cpuworkers_rotate_keyinfo(void)
{
if (!threadpool) {
/* If we're a client, then we won't have cpuworkers, and we won't need
* to tell them to rotate their state.
*/
return;
}
if (threadpool_queue_update(threadpool,
worker_state_new,
update_state_threadfn,
worker_state_free_void,
NULL)) {
log_warn(LD_OR, "Failed to queue key update for worker threads.");
}
}
/** Indexed by handshake type: how many onionskins have we processed and
* counted of that type? */
static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
/** Indexed by handshake type, corresponding to the onionskins counted in
* onionskins_n_processed: how many microseconds have we spent in cpuworkers
* processing that kind of onionskin? */
static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
/** Indexed by handshake type, corresponding to onionskins counted in
* onionskins_n_processed: how many microseconds have we spent waiting for
* cpuworkers to give us answers for that kind of onionskin?
*/
static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
/** If any onionskin takes longer than this, we clip them to this
* time. (microseconds) */
#define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
/** Return true iff we'd like to measure a handshake of type
* onionskin_type. Call only from the main thread. */
static int
should_time_request(uint16_t onionskin_type)
{
/* If we've never heard of this type, we shouldn't even be here. */
if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
return 0;
/* Measure the first N handshakes of each type, to ensure we have a
* sample */
if (onionskins_n_processed[onionskin_type] < 4096)
return 1;
/** Otherwise, measure with P=1/128. We avoid doing this for every
* handshake, since the measurement itself can take a little time. */
return crypto_fast_rng_one_in_n(get_thread_fast_rng(), 128);
}
/** Return an estimate of how many microseconds we will need for a single
* cpuworker to process n_requests onionskins of type
* onionskin_type. */
uint64_t
estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
{
if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
return 1000 * (uint64_t)n_requests;
if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
/* Until we have 100 data points, just assume everything takes 1 msec. */
return 1000 * (uint64_t)n_requests;
} else {
/* This can't overflow: we'll never have more than 500000 onionskins
* measured in onionskin_usec_internal, and they won't take anything near
* 1 sec each, and we won't have anything like 1 million queued
* onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
* UINT64_MAX. */
return (onionskins_usec_internal[onionskin_type] * n_requests) /
onionskins_n_processed[onionskin_type];
}
}
/** Compute the absolute and relative overhead of using the cpuworker
* framework for onionskins of type onionskin_type.*/
static int
get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
uint16_t onionskin_type)
{
uint64_t overhead;
*usec_out = 0;
*frac_out = 0.0;
if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
return -1;
if (onionskins_n_processed[onionskin_type] == 0 ||
onionskins_usec_internal[onionskin_type] == 0 ||
onionskins_usec_roundtrip[onionskin_type] == 0)
return -1;
overhead = onionskins_usec_roundtrip[onionskin_type] -
onionskins_usec_internal[onionskin_type];
*usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
*frac_out = ((double)overhead) / onionskins_usec_internal[onionskin_type];
return 0;
}
/** If we've measured overhead for onionskins of type onionskin_type,
* log it. */
void
cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
const char *onionskin_type_name)
{
uint32_t overhead;
double relative_overhead;
int r;
r = get_overhead_for_onionskins(&overhead, &relative_overhead,
onionskin_type);
if (!overhead || r<0)
return;
log_fn(severity, LD_OR,
"%s onionskins have averaged %u usec overhead (%.2f%%) in "
"cpuworker code ",
onionskin_type_name, (unsigned)overhead, relative_overhead*100);
}
/** Handle a reply from the worker threads. */
static void
cpuworker_onion_handshake_replyfn(void *work_)
{
cpuworker_job_t *job = work_;
cpuworker_reply_t rpl;
or_circuit_t *circ = NULL;
tor_assert(total_pending_tasks > 0);
--total_pending_tasks;
/* Could avoid this, but doesn't matter. */
memcpy(&rpl, &job->u.reply, sizeof(rpl));
tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
if (rpl.timed && rpl.success &&
rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
/* Time how long this request took. The handshake_type check should be
needless, but let's leave it in to be safe. */
struct timeval tv_end, tv_diff;
int64_t usec_roundtrip;
tor_gettimeofday(&tv_end);
timersub(&tv_end, &rpl.started_at, &tv_diff);
usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
if (usec_roundtrip >= 0 &&
usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
++onionskins_n_processed[rpl.handshake_type];
onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
/* Scale down every 500000 handshakes. On a busy server, that's
* less impressive than it sounds. */
onionskins_n_processed[rpl.handshake_type] /= 2;
onionskins_usec_internal[rpl.handshake_type] /= 2;
onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
}
}
}
circ = job->circ;
log_debug(LD_OR,
"Unpacking cpuworker reply %p, circ=%p, success=%d",
job, circ, rpl.success);
if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
/* The circuit was supposed to get freed while the reply was
* pending. Instead, it got left for us to free so that we wouldn't freak
* out when the job->circ field wound up pointing to nothing. */
log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
circ->base_.magic = 0;
tor_free(circ);
goto done_processing;
}
circ->workqueue_entry = NULL;
if (TO_CIRCUIT(circ)->marked_for_close) {
/* We already marked this circuit; we can't call it open. */
log_debug(LD_OR,"circuit is already marked.");
goto done_processing;
}
if (rpl.success == 0) {
log_debug(LD_OR,
"decoding onionskin failed. "
"(Old key or bad software.) Closing.");
circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
goto done_processing;
}
/* If the client asked for congestion control, if our consensus parameter
* allowed it to negotiate as enabled, allocate a congestion control obj. */
if (rpl.circ_params.cc_enabled) {
if (get_options()->SbwsExit) {
TO_CIRCUIT(circ)->ccontrol = congestion_control_new(&rpl.circ_params,
CC_PATH_SBWS);
} else {
TO_CIRCUIT(circ)->ccontrol = congestion_control_new(&rpl.circ_params,
CC_PATH_EXIT);
}
}
if (onionskin_answer(circ,
&rpl.created_cell,
(const char*)rpl.keys, sizeof(rpl.keys),
rpl.rend_auth_material) < 0) {
log_warn(LD_OR,"onionskin_answer failed. Closing.");
circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
goto done_processing;
}
log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
done_processing:
memwipe(&rpl, 0, sizeof(rpl));
memwipe(job, 0, sizeof(*job));
tor_free(job);
queue_pending_tasks();
}
/** Implementation function for onion handshake requests. */
static workqueue_reply_t
cpuworker_onion_handshake_threadfn(void *state_, void *work_)
{
worker_state_t *state = state_;
cpuworker_job_t *job = work_;
/* variables for onion processing */
server_onion_keys_t *onion_keys = state->onion_keys;
cpuworker_request_t req;
cpuworker_reply_t rpl;
memcpy(&req, &job->u.request, sizeof(req));
tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
memset(&rpl, 0, sizeof(rpl));
const create_cell_t *cc = &req.create_cell;
created_cell_t *cell_out = &rpl.created_cell;
struct timeval tv_start = {0,0}, tv_end;
int n;
rpl.timed = req.timed;
rpl.started_at = req.started_at;
rpl.handshake_type = cc->handshake_type;
if (req.timed)
tor_gettimeofday(&tv_start);
n = onion_skin_server_handshake(cc->handshake_type,
cc->onionskin, cc->handshake_len,
onion_keys,
&req.circ_ns_params,
cell_out->reply,
sizeof(cell_out->reply),
rpl.keys, CPATH_KEY_MATERIAL_LEN,
rpl.rend_auth_material,
&rpl.circ_params);
if (n < 0) {
/* failure */
log_debug(LD_OR,"onion_skin_server_handshake failed.");
memset(&rpl, 0, sizeof(rpl));
rpl.success = 0;
} else {
/* success */
log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
cell_out->handshake_len = n;
switch (cc->cell_type) {
case CELL_CREATE:
cell_out->cell_type = CELL_CREATED; break;
case CELL_CREATE2:
cell_out->cell_type = CELL_CREATED2; break;
case CELL_CREATE_FAST:
cell_out->cell_type = CELL_CREATED_FAST; break;
default:
tor_assert(0);
return WQ_RPL_SHUTDOWN;
}
rpl.success = 1;
}
rpl.magic = CPUWORKER_REPLY_MAGIC;
if (req.timed) {
struct timeval tv_diff;
int64_t usec;
tor_gettimeofday(&tv_end);
timersub(&tv_end, &tv_start, &tv_diff);
usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
else
rpl.n_usec = (uint32_t) usec;
}
memcpy(&job->u.reply, &rpl, sizeof(rpl));
memwipe(&req, 0, sizeof(req));
memwipe(&rpl, 0, sizeof(req));
return WQ_RPL_REPLY;
}
/** Take pending tasks from the queue and assign them to cpuworkers. */
static void
queue_pending_tasks(void)
{
or_circuit_t *circ;
create_cell_t *onionskin = NULL;
while (total_pending_tasks < max_pending_tasks) {
circ = onion_next_task(&onionskin);
if (!circ)
return;
if (assign_onionskin_to_cpuworker(circ, onionskin) < 0)
log_info(LD_OR,"assign_to_cpuworker failed. Ignoring.");
}
}
/** DOCDOC */
MOCK_IMPL(workqueue_entry_t *,
cpuworker_queue_work,(workqueue_priority_t priority,
workqueue_reply_t (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg))
{
tor_assert(threadpool);
return threadpool_queue_work_priority(threadpool,
priority,
fn,
reply_fn,
arg);
}
/** Try to tell a cpuworker to perform the public key operations necessary to
* respond to onionskin for the circuit circ.
*
* Return 0 if we successfully assign the task, or -1 on failure.
*/
int
assign_onionskin_to_cpuworker(or_circuit_t *circ,
create_cell_t *onionskin)
{
workqueue_entry_t *queue_entry;
cpuworker_job_t *job;
cpuworker_request_t req;
int should_time;
tor_assert(threadpool);
if (!circ->p_chan) {
log_info(LD_OR,"circ->p_chan gone. Failing circ.");
tor_free(onionskin);
return -1;
}
if (total_pending_tasks >= max_pending_tasks) {
log_debug(LD_OR,"No idle cpuworkers. Queuing.");
if (onion_pending_add(circ, onionskin) < 0) {
tor_free(onionskin);
return -1;
}
return 0;
}
if (!channel_is_client(circ->p_chan))
rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
should_time = should_time_request(onionskin->handshake_type);
memset(&req, 0, sizeof(req));
req.magic = CPUWORKER_REQUEST_MAGIC;
req.timed = should_time;
memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
tor_free(onionskin);
if (should_time)
tor_gettimeofday(&req.started_at);
/* Copy the current cached consensus params relevant to
* circuit negotiation into the CPU worker context */
req.circ_ns_params.cc_enabled = congestion_control_enabled();
req.circ_ns_params.sendme_inc_cells = congestion_control_sendme_inc();
job = tor_malloc_zero(sizeof(cpuworker_job_t));
job->circ = circ;
memcpy(&job->u.request, &req, sizeof(req));
memwipe(&req, 0, sizeof(req));
++total_pending_tasks;
queue_entry = threadpool_queue_work_priority(threadpool,
WQ_PRI_HIGH,
cpuworker_onion_handshake_threadfn,
cpuworker_onion_handshake_replyfn,
job);
if (!queue_entry) {
log_warn(LD_BUG, "Couldn't queue work on threadpool");
tor_free(job);
return -1;
}
log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
job, queue_entry, job->circ);
circ->workqueue_entry = queue_entry;
return 0;
}
/** If circ has a pending handshake that hasn't been processed yet,
* remove it from the worker queue. */
void
cpuworker_cancel_circ_handshake(or_circuit_t *circ)
{
cpuworker_job_t *job;
if (circ->workqueue_entry == NULL)
return;
job = workqueue_entry_cancel(circ->workqueue_entry);
if (job) {
/* It successfully cancelled. */
memwipe(job, 0xe0, sizeof(*job));
tor_free(job);
tor_assert(total_pending_tasks > 0);
--total_pending_tasks;
/* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
circ->workqueue_entry = NULL;
}
}