diff options
author | Nick Mathewson <nickm@torproject.org> | 2013-10-02 12:32:09 -0400 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2015-01-14 11:23:34 -0500 |
commit | 1e896214e7eb5ede65663486291252b171e9daea (patch) | |
tree | 3f8d6aeb91e844888f16a2295e2200e22979a181 | |
parent | cc6529e9bb7d7e01a25b5632d6d6c2424c6fc2b4 (diff) | |
download | tor-1e896214e7eb5ede65663486291252b171e9daea.tar.gz tor-1e896214e7eb5ede65663486291252b171e9daea.zip |
Refactor cpuworker to use workqueue/threadpool code.
-rw-r--r-- | changes/better_workqueues | 10 | ||||
-rw-r--r-- | src/common/workqueue.c | 4 | ||||
-rw-r--r-- | src/common/workqueue.h | 2 | ||||
-rw-r--r-- | src/or/command.c | 2 | ||||
-rw-r--r-- | src/or/config.c | 2 | ||||
-rw-r--r-- | src/or/connection.c | 24 | ||||
-rw-r--r-- | src/or/cpuworker.c | 532 | ||||
-rw-r--r-- | src/or/cpuworker.h | 10 | ||||
-rw-r--r-- | src/or/main.c | 6 | ||||
-rw-r--r-- | src/or/onion.c | 19 | ||||
-rw-r--r-- | src/or/onion.h | 4 | ||||
-rw-r--r-- | src/or/or.h | 17 |
12 files changed, 238 insertions, 394 deletions
diff --git a/changes/better_workqueues b/changes/better_workqueues new file mode 100644 index 0000000000..32c984cb71 --- /dev/null +++ b/changes/better_workqueues @@ -0,0 +1,10 @@ + o Major features: + - Refactor the CPU worker implementation for better performance by + avoiding the kernel and lengthening pipelines. The original + implementation used sockets to transfer data from the main thread + to the worker threads, and didn't allow any thread to be assigned + more than a single piece of work at once. The new implementation + avoids communications overhead by making requests in shared + memory, avoiding kernel IO where possible, and keeping more + request in flight at once. Resolves issue #9682. + diff --git a/src/common/workqueue.c b/src/common/workqueue.c index 44cf98d0dc..f3ef67891d 100644 --- a/src/common/workqueue.c +++ b/src/common/workqueue.c @@ -108,6 +108,7 @@ workqueue_entry_free(workqueue_entry_t *ent) { if (!ent) return; + memset(ent, 0xf0, sizeof(*ent)); tor_free(ent); } @@ -310,7 +311,7 @@ threadpool_queue_work(threadpool_t *pool, */ int threadpool_queue_for_all(threadpool_t *pool, - void *(*dup_fn)(const void *), + void *(*dup_fn)(void *), int (*fn)(void *, void *), void (*reply_fn)(void *), void *arg) @@ -444,6 +445,7 @@ replyqueue_process(replyqueue_t *queue) workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); TOR_TAILQ_REMOVE(&queue->answers, work, next_work); tor_mutex_release(&queue->lock); + work->on_thread = NULL; work->reply_fn(work->arg); workqueue_entry_free(work); diff --git a/src/common/workqueue.h b/src/common/workqueue.h index aa8620ddb7..aa1bcc518a 100644 --- a/src/common/workqueue.h +++ b/src/common/workqueue.h @@ -28,7 +28,7 @@ workqueue_entry_t *threadpool_queue_work(threadpool_t *pool, void (*reply_fn)(void *), void *arg); int threadpool_queue_for_all(threadpool_t *pool, - void *(*dup_fn)(const void *), + void *(*dup_fn)(void *), int (*fn)(void *, void *), void (*reply_fn)(void *), void *arg); diff --git a/src/or/command.c b/src/or/command.c index 6dde2a9b7e..c4a0f9baeb 100644 --- a/src/or/command.c +++ b/src/or/command.c @@ -310,7 +310,7 @@ command_process_create_cell(cell_t *cell, channel_t *chan) /* hand it off to the cpuworkers, and then return. */ if (connection_or_digest_is_known_relay(chan->identity_digest)) rep_hist_note_circuit_handshake_requested(create_cell->handshake_type); - if (assign_onionskin_to_cpuworker(NULL, circ, create_cell) < 0) { + if (assign_onionskin_to_cpuworker(circ, create_cell) < 0) { log_debug(LD_GENERAL,"Failed to hand off onionskin. Closing."); circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_RESOURCELIMIT); return; diff --git a/src/or/config.c b/src/or/config.c index 5db065f000..5b8560b9e4 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -1729,7 +1729,7 @@ options_act(const or_options_t *old_options) if (have_completed_a_circuit() || !any_predicted_circuits(time(NULL))) inform_testing_reachability(); } - cpuworkers_rotate(); + cpuworkers_rotate_keyinfo(); if (dns_reset()) return -1; } else { diff --git a/src/or/connection.c b/src/or/connection.c index 11ff224e67..1b7426b588 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -29,7 +29,6 @@ #include "connection_edge.h" #include "connection_or.h" #include "control.h" -#include "cpuworker.h" #include "directory.h" #include "dirserv.h" #include "dns.h" @@ -130,7 +129,6 @@ conn_type_to_string(int type) case CONN_TYPE_AP: return "Socks"; case CONN_TYPE_DIR_LISTENER: return "Directory listener"; case CONN_TYPE_DIR: return "Directory"; - case CONN_TYPE_CPUWORKER: return "CPU worker"; case CONN_TYPE_CONTROL_LISTENER: return "Control listener"; case CONN_TYPE_CONTROL: return "Control"; case CONN_TYPE_EXT_OR: return "Extended OR"; @@ -213,12 +211,6 @@ conn_state_to_string(int type, int state) case DIR_CONN_STATE_SERVER_WRITING: return "writing"; } break; - case CONN_TYPE_CPUWORKER: - switch (state) { - case CPUWORKER_STATE_IDLE: return "idle"; - case CPUWORKER_STATE_BUSY_ONION: return "busy with onion"; - } - break; case CONN_TYPE_CONTROL: switch (state) { case CONTROL_CONN_STATE_OPEN: return "open (protocol v1)"; @@ -248,7 +240,6 @@ connection_type_uses_bufferevent(connection_t *conn) case CONN_TYPE_CONTROL: case CONN_TYPE_OR: case CONN_TYPE_EXT_OR: - case CONN_TYPE_CPUWORKER: return 1; default: return 0; @@ -2436,7 +2427,6 @@ connection_mark_all_noncontrol_connections(void) if (conn->marked_for_close) continue; switch (conn->type) { - case CONN_TYPE_CPUWORKER: case CONN_TYPE_CONTROL_LISTENER: case CONN_TYPE_CONTROL: break; @@ -4530,8 +4520,6 @@ connection_process_inbuf(connection_t *conn, int package_partial) package_partial); case CONN_TYPE_DIR: return connection_dir_process_inbuf(TO_DIR_CONN(conn)); - case CONN_TYPE_CPUWORKER: - return connection_cpu_process_inbuf(conn); case CONN_TYPE_CONTROL: return connection_control_process_inbuf(TO_CONTROL_CONN(conn)); default: @@ -4591,8 +4579,6 @@ connection_finished_flushing(connection_t *conn) return connection_edge_finished_flushing(TO_EDGE_CONN(conn)); case CONN_TYPE_DIR: return connection_dir_finished_flushing(TO_DIR_CONN(conn)); - case CONN_TYPE_CPUWORKER: - return connection_cpu_finished_flushing(conn); case CONN_TYPE_CONTROL: return connection_control_finished_flushing(TO_CONTROL_CONN(conn)); default: @@ -4648,8 +4634,6 @@ connection_reached_eof(connection_t *conn) return connection_edge_reached_eof(TO_EDGE_CONN(conn)); case CONN_TYPE_DIR: return connection_dir_reached_eof(TO_DIR_CONN(conn)); - case CONN_TYPE_CPUWORKER: - return connection_cpu_reached_eof(conn); case CONN_TYPE_CONTROL: return connection_control_reached_eof(TO_CONTROL_CONN(conn)); default: @@ -4855,10 +4839,6 @@ assert_connection_ok(connection_t *conn, time_t now) tor_assert(conn->purpose >= DIR_PURPOSE_MIN_); tor_assert(conn->purpose <= DIR_PURPOSE_MAX_); break; - case CONN_TYPE_CPUWORKER: - tor_assert(conn->state >= CPUWORKER_STATE_MIN_); - tor_assert(conn->state <= CPUWORKER_STATE_MAX_); - break; case CONN_TYPE_CONTROL: tor_assert(conn->state >= CONTROL_CONN_STATE_MIN_); tor_assert(conn->state <= CONTROL_CONN_STATE_MAX_); @@ -4959,9 +4939,7 @@ proxy_type_to_string(int proxy_type) } /** Call connection_free_() on every connection in our array, and release all - * storage held by connection.c. This is used by cpuworkers and dnsworkers - * when they fork, so they don't keep resources held open (especially - * sockets). + * storage held by connection.c. * * Don't do the checks in connection_free(), because they will * fail. diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c index 340fbec620..f3f275d099 100644 --- a/src/or/cpuworker.c +++ b/src/or/cpuworker.c @@ -5,84 +5,98 @@ /** * \file cpuworker.c - * \brief Implements a farm of 'CPU worker' processes to perform - * CPU-intensive tasks in another thread or process, to not - * interrupt the main thread. + * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities + * out to subprocesses. * * Right now, we only use this for processing onionskins. **/ #include "or.h" -#include "buffers.h" #include "channel.h" -#include "channeltls.h" #include "circuitbuild.h" #include "circuitlist.h" -#include "config.h" -#include "connection.h" #include "connection_or.h" +#include "config.h" #include "cpuworker.h" #include "main.h" #include "onion.h" #include "rephist.h" #include "router.h" +#include "workqueue.h" -/** The maximum number of cpuworker processes we will keep around. */ -#define MAX_CPUWORKERS 16 -/** The minimum number of cpuworker processes we will keep around. */ -#define MIN_CPUWORKERS 1 - -/** The tag specifies which circuit this onionskin was from. */ -#define TAG_LEN 12 +#ifdef HAVE_EVENT2_EVENT_H +#include <event2/event.h> +#else +#include <event.h> +#endif -/** How many cpuworkers we have running right now. */ -static int num_cpuworkers=0; -/** How many of the running cpuworkers have an assigned task right now. */ -static int num_cpuworkers_busy=0; -/** We need to spawn new cpuworkers whenever we rotate the onion keys - * on platforms where execution contexts==processes. This variable stores - * the last time we got a key rotation event. */ -static time_t last_rotation_time=0; +static void queue_pending_tasks(void); -static void cpuworker_main(void *data) ATTR_NORETURN; -static int spawn_cpuworker(void); -static void spawn_enough_cpuworkers(void); -static void process_pending_task(connection_t *cpuworker); +typedef struct worker_state_s { + int generation; + server_onion_keys_t *onion_keys; +} worker_state_t; -/** Initialize the cpuworker subsystem. - */ -void -cpu_init(void) +static void * +worker_state_new(void *arg) { - cpuworkers_rotate(); + worker_state_t *ws; + (void)arg; + ws = tor_malloc_zero(sizeof(worker_state_t)); + ws->onion_keys = server_onion_keys_new(); + return ws; } - -/** Called when we're done sending a request to a cpuworker. */ -int -connection_cpu_finished_flushing(connection_t *conn) +static void +worker_state_free(void *arg) { - tor_assert(conn); - tor_assert(conn->type == CONN_TYPE_CPUWORKER); - return 0; + worker_state_t *ws = arg; + server_onion_keys_free(ws->onion_keys); + tor_free(ws); } -/** Pack global_id and circ_id; set *tag to the result. (See note on - * cpuworker_main for wire format.) */ +static replyqueue_t *replyqueue = NULL; +static threadpool_t *threadpool = NULL; +static struct event *reply_event = NULL; + +static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT; + +static int total_pending_tasks = 0; +static int max_pending_tasks = 128; + static void -tag_pack(uint8_t *tag, uint64_t chan_id, circid_t circ_id) +replyqueue_process_cb(evutil_socket_t sock, short events, void *arg) { - /*XXXX RETHINK THIS WHOLE MESS !!!! !NM NM NM NM*/ - /*XXXX DOUBLEPLUSTHIS!!!! AS AS AS AS*/ - set_uint64(tag, chan_id); - set_uint32(tag+8, circ_id); + replyqueue_t *rq = arg; + (void) sock; + (void) events; + replyqueue_process(rq); } -/** Unpack <b>tag</b> into addr, port, and circ_id. +/** Initialize the cpuworker subsystem. */ -static void -tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id) +void +cpu_init(void) { - *chan_id = get_uint64(tag); - *circ_id = get_uint32(tag+8); + if (!replyqueue) { + replyqueue = replyqueue_new(0); + } + if (!reply_event) { + reply_event = tor_event_new(tor_libevent_get_base(), + replyqueue_get_socket(replyqueue), + EV_READ|EV_PERSIST, + replyqueue_process_cb, + replyqueue); + event_add(reply_event, NULL); + } + if (!threadpool) { + threadpool = threadpool_new(get_num_cpus(get_options()), + replyqueue, + worker_state_new, + worker_state_free, + NULL); + } + /* Total voodoo. Can we make this more sensible? */ + max_pending_tasks = get_num_cpus(get_options()) * 64; + crypto_seed_weak_rng(&request_sample_rng); } /** Magic numbers to make sure our cpuworker_requests don't grow any @@ -94,10 +108,6 @@ tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id) typedef struct cpuworker_request_t { /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */ uint32_t magic; - /** Opaque tag to identify the job */ - uint8_t tag[TAG_LEN]; - /** Task code. Must be one of CPUWORKER_TASK_* */ - uint8_t task; /** Flag: Are we timing this request? */ unsigned timed : 1; @@ -114,8 +124,7 @@ typedef struct cpuworker_request_t { typedef struct cpuworker_reply_t { /** Magic number; must be CPUWORKER_REPLY_MAGIC. */ uint32_t magic; - /** Opaque tag to identify the job; matches the request's tag.*/ - uint8_t tag[TAG_LEN]; + /** True iff we got a successful request. */ uint8_t success; @@ -142,42 +151,46 @@ typedef struct cpuworker_reply_t { uint8_t rend_auth_material[DIGEST_LEN]; } cpuworker_reply_t; +typedef struct cpuworker_job_u { + uint64_t chan_id; + uint32_t circ_id; + union { + cpuworker_request_t request; + cpuworker_reply_t reply; + } u; +} cpuworker_job_t; + +static int +update_state_threadfn(void *state_, void *work_) +{ + worker_state_t *state = state_; + worker_state_t *update = work_; + server_onion_keys_free(state->onion_keys); + state->onion_keys = update->onion_keys; + update->onion_keys = NULL; + ++state->generation; + return WQ_RPL_REPLY; +} +static void +update_state_replyfn(void *work_) +{ + tor_free(work_); +} + /** Called when the onion key has changed and we need to spawn new * cpuworkers. Close all currently idle cpuworkers, and mark the last * rotation time as now. */ void -cpuworkers_rotate(void) +cpuworkers_rotate_keyinfo(void) { - connection_t *cpuworker; - while ((cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER, - CPUWORKER_STATE_IDLE))) { - connection_mark_for_close(cpuworker); - --num_cpuworkers; + if (threadpool_queue_for_all(threadpool, + worker_state_new, + update_state_threadfn, + update_state_replyfn, + NULL)) { + log_warn(LD_OR, "Failed to queue key update for worker threads."); } - last_rotation_time = time(NULL); - if (server_mode(get_options())) - spawn_enough_cpuworkers(); -} - -/** If the cpuworker closes the connection, - * mark it as closed and spawn a new one as needed. */ -int -connection_cpu_reached_eof(connection_t *conn) -{ - log_warn(LD_GENERAL,"Read eof. CPU worker died unexpectedly."); - if (conn->state != CPUWORKER_STATE_IDLE) { - /* the circ associated with this cpuworker will have to wait until - * it gets culled in run_connection_housekeeping(), since we have - * no way to find out which circ it was. */ - log_warn(LD_GENERAL,"...and it left a circuit queued; abandoning circ."); - num_cpuworkers_busy--; - } - num_cpuworkers--; - spawn_enough_cpuworkers(); /* try to regrow. hope we don't end up - spinning. */ - connection_mark_for_close(conn); - return 0; } /** Indexed by handshake type: how many onionskins have we processed and @@ -197,8 +210,6 @@ static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1]; * time. (microseconds) */ #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000) -static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT; - /** Return true iff we'd like to measure a handshake of type * <b>onionskin_type</b>. Call only from the main thread. */ static int @@ -286,31 +297,22 @@ cpuworker_log_onionskin_overhead(int severity, int onionskin_type, onionskin_type_name, (unsigned)overhead, relative_overhead*100); } -/** Called when we get data from a cpuworker. If the answer is not complete, - * wait for a complete answer. If the answer is complete, - * process it as appropriate. - */ -int -connection_cpu_process_inbuf(connection_t *conn) +/** */ +static void +cpuworker_onion_handshake_replyfn(void *work_) { + cpuworker_job_t *job = work_; + cpuworker_reply_t rpl; uint64_t chan_id; circid_t circ_id; channel_t *p_chan = NULL; - circuit_t *circ; - - tor_assert(conn); - tor_assert(conn->type == CONN_TYPE_CPUWORKER); + circuit_t *circ = NULL; - if (!connection_get_inbuf_len(conn)) - return 0; - - if (conn->state == CPUWORKER_STATE_BUSY_ONION) { - cpuworker_reply_t rpl; - if (connection_get_inbuf_len(conn) < sizeof(cpuworker_reply_t)) - return 0; /* not yet */ - tor_assert(connection_get_inbuf_len(conn) == sizeof(cpuworker_reply_t)); + --total_pending_tasks; - connection_fetch_from_buf((void*)&rpl,sizeof(cpuworker_reply_t),conn); + if (1) { + /* Could avoid this, but doesn't matter. */ + memcpy(&rpl, &job->u.reply, sizeof(rpl)); tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC); @@ -337,18 +339,21 @@ connection_cpu_process_inbuf(connection_t *conn) } } } - /* parse out the circ it was talking about */ - tag_unpack(rpl.tag, &chan_id, &circ_id); - circ = NULL; - log_debug(LD_OR, - "Unpacking cpuworker reply, chan_id is " U64_FORMAT - ", circ_id is %u", - U64_PRINTF_ARG(chan_id), (unsigned)circ_id); + /* Find the circ it was talking about */ + chan_id = job->chan_id; + circ_id = job->circ_id; + p_chan = channel_find_by_global_id(chan_id); if (p_chan) circ = circuit_get_by_circid_channel(circ_id, p_chan); + log_debug(LD_OR, + "Unpacking cpuworker reply %p, chan_id is " U64_FORMAT + ", circ_id is %u, p_chan=%p, circ=%p, success=%d", + job, U64_PRINTF_ARG(chan_id), (unsigned)circ_id, + p_chan, circ, rpl.success); + if (rpl.success == 0) { log_debug(LD_OR, "decoding onionskin failed. " @@ -367,6 +372,7 @@ connection_cpu_process_inbuf(connection_t *conn) goto done_processing; } tor_assert(! CIRCUIT_IS_ORIGIN(circ)); + TO_OR_CIRCUIT(circ)->workqueue_entry = NULL; if (onionskin_answer(TO_OR_CIRCUIT(circ), &rpl.created_cell, (const char*)rpl.keys, @@ -376,58 +382,33 @@ connection_cpu_process_inbuf(connection_t *conn) goto done_processing; } log_debug(LD_OR,"onionskin_answer succeeded. Yay."); - } else { - tor_assert(0); /* don't ask me to do handshakes yet */ } done_processing: - conn->state = CPUWORKER_STATE_IDLE; - num_cpuworkers_busy--; - if (conn->timestamp_created < last_rotation_time) { - connection_mark_for_close(conn); - num_cpuworkers--; - spawn_enough_cpuworkers(); - } else { - process_pending_task(conn); - } - return 0; + memwipe(&rpl, 0, sizeof(rpl)); + memwipe(job, 0, sizeof(*job)); + tor_free(job); + queue_pending_tasks(); } -/** Implement a cpuworker. 'data' is an fdarray as returned by socketpair. - * Read and writes from fdarray[1]. Reads requests, writes answers. - * - * Request format: - * cpuworker_request_t. - * Response format: - * cpuworker_reply_t - */ -static void -cpuworker_main(void *data) +/** Implementation function for onion handshake requests. */ +static int +cpuworker_onion_handshake_threadfn(void *state_, void *work_) { - /* For talking to the parent thread/process */ - tor_socket_t *fdarray = data; - tor_socket_t fd; + worker_state_t *state = state_; + cpuworker_job_t *job = work_; /* variables for onion processing */ - server_onion_keys_t onion_keys; + server_onion_keys_t *onion_keys = state->onion_keys; cpuworker_request_t req; cpuworker_reply_t rpl; - fd = fdarray[1]; /* this side is ours */ - tor_free(data); - - setup_server_onion_keys(&onion_keys); - - for (;;) { - if (read_all(fd, (void *)&req, sizeof(req), 1) != sizeof(req)) { - log_info(LD_OR, "read request failed. Exiting."); - goto end; - } - tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC); + memcpy(&req, &job->u.request, sizeof(req)); - memset(&rpl, 0, sizeof(rpl)); + tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC); + memset(&rpl, 0, sizeof(rpl)); - if (req.task == CPUWORKER_TASK_ONION) { + if (1) { const create_cell_t *cc = &req.create_cell; created_cell_t *cell_out = &rpl.created_cell; struct timeval tv_start = {0,0}, tv_end; @@ -439,7 +420,7 @@ cpuworker_main(void *data) tor_gettimeofday(&tv_start); n = onion_skin_server_handshake(cc->handshake_type, cc->onionskin, cc->handshake_len, - &onion_keys, + onion_keys, cell_out->reply, rpl.keys, CPATH_KEY_MATERIAL_LEN, rpl.rend_auth_material); @@ -447,12 +428,10 @@ cpuworker_main(void *data) /* failure */ log_debug(LD_OR,"onion_skin_server_handshake failed."); memset(&rpl, 0, sizeof(rpl)); - memcpy(rpl.tag, req.tag, TAG_LEN); rpl.success = 0; } else { /* success */ log_debug(LD_OR,"onion_skin_server_handshake succeeded."); - memcpy(rpl.tag, req.tag, TAG_LEN); cell_out->handshake_len = n; switch (cc->cell_type) { case CELL_CREATE: @@ -463,7 +442,7 @@ cpuworker_main(void *data) cell_out->cell_type = CELL_CREATED_FAST; break; default: tor_assert(0); - goto end; + return WQ_RPL_SHUTDOWN; } rpl.success = 1; } @@ -479,187 +458,55 @@ cpuworker_main(void *data) else rpl.n_usec = (uint32_t) usec; } - if (write_all(fd, (void*)&rpl, sizeof(rpl), 1) != sizeof(rpl)) { - log_err(LD_BUG,"writing response buf failed. Exiting."); - goto end; - } - log_debug(LD_OR,"finished writing response."); - } else if (req.task == CPUWORKER_TASK_SHUTDOWN) { - log_info(LD_OR,"Clean shutdown: exiting"); - goto end; - } - memwipe(&req, 0, sizeof(req)); - memwipe(&rpl, 0, sizeof(req)); - } - end: - memwipe(&req, 0, sizeof(req)); - memwipe(&rpl, 0, sizeof(req)); - release_server_onion_keys(&onion_keys); - tor_close_socket(fd); - crypto_thread_cleanup(); - spawn_exit(); -} - -/** Launch a new cpuworker. Return 0 if we're happy, -1 if we failed. - */ -static int -spawn_cpuworker(void) -{ - tor_socket_t *fdarray; - tor_socket_t fd; - connection_t *conn; - int err; - - fdarray = tor_calloc(2, sizeof(tor_socket_t)); - if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fdarray)) < 0) { - log_warn(LD_NET, "Couldn't construct socketpair for cpuworker: %s", - tor_socket_strerror(-err)); - tor_free(fdarray); - return -1; - } - - tor_assert(SOCKET_OK(fdarray[0])); - tor_assert(SOCKET_OK(fdarray[1])); - - fd = fdarray[0]; - if (spawn_func(cpuworker_main, (void*)fdarray) < 0) { - tor_close_socket(fdarray[0]); - tor_close_socket(fdarray[1]); - tor_free(fdarray); - return -1; } - log_debug(LD_OR,"just spawned a cpu worker."); - - conn = connection_new(CONN_TYPE_CPUWORKER, AF_UNIX); - - /* set up conn so it's got all the data we need to remember */ - conn->s = fd; - conn->address = tor_strdup("localhost"); - tor_addr_make_unspec(&conn->addr); - - if (set_socket_nonblocking(fd) == -1) { - connection_free(conn); /* this closes fd */ - return -1; - } - - if (connection_add(conn) < 0) { /* no space, forget it */ - log_warn(LD_NET,"connection_add for cpuworker failed. Giving up."); - connection_free(conn); /* this closes fd */ - return -1; - } - - conn->state = CPUWORKER_STATE_IDLE; - connection_start_reading(conn); - return 0; /* success */ -} - -/** If we have too few or too many active cpuworkers, try to spawn new ones - * or kill idle ones. - */ -static void -spawn_enough_cpuworkers(void) -{ - int num_cpuworkers_needed = get_num_cpus(get_options()); - int reseed = 0; - - if (num_cpuworkers_needed < MIN_CPUWORKERS) - num_cpuworkers_needed = MIN_CPUWORKERS; - if (num_cpuworkers_needed > MAX_CPUWORKERS) - num_cpuworkers_needed = MAX_CPUWORKERS; - - while (num_cpuworkers < num_cpuworkers_needed) { - if (spawn_cpuworker() < 0) { - log_warn(LD_GENERAL,"Cpuworker spawn failed. Will try again later."); - return; - } - num_cpuworkers++; - reseed++; - } + memcpy(&job->u.reply, &rpl, sizeof(rpl)); - if (reseed) - crypto_seed_weak_rng(&request_sample_rng); + memwipe(&req, 0, sizeof(req)); + memwipe(&rpl, 0, sizeof(req)); + return WQ_RPL_REPLY; } -/** Take a pending task from the queue and assign it to 'cpuworker'. */ +/** Take pending tasks from the queue and assign them to cpuworkers. */ static void -process_pending_task(connection_t *cpuworker) +queue_pending_tasks(void) { or_circuit_t *circ; create_cell_t *onionskin = NULL; - tor_assert(cpuworker); + while (total_pending_tasks < max_pending_tasks) { + circ = onion_next_task(&onionskin); - /* for now only process onion tasks */ - - circ = onion_next_task(&onionskin); - if (!circ) - return; - if (assign_onionskin_to_cpuworker(cpuworker, circ, onionskin)) - log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring."); -} - -/** How long should we let a cpuworker stay busy before we give - * up on it and decide that we have a bug or infinite loop? - * This value is high because some servers with low memory/cpu - * sometimes spend an hour or more swapping, and Tor starves. */ -#define CPUWORKER_BUSY_TIMEOUT (60*60*12) + if (!circ) + return; -/** We have a bug that I can't find. Sometimes, very rarely, cpuworkers get - * stuck in the 'busy' state, even though the cpuworker process thinks of - * itself as idle. I don't know why. But here's a workaround to kill any - * cpuworker that's been busy for more than CPUWORKER_BUSY_TIMEOUT. - */ -static void -cull_wedged_cpuworkers(void) -{ - time_t now = time(NULL); - smartlist_t *conns = get_connection_array(); - SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { - if (!conn->marked_for_close && - conn->type == CONN_TYPE_CPUWORKER && - conn->state == CPUWORKER_STATE_BUSY_ONION && - conn->timestamp_lastwritten + CPUWORKER_BUSY_TIMEOUT < now) { - log_notice(LD_BUG, - "closing wedged cpuworker. Can somebody find the bug?"); - num_cpuworkers_busy--; - num_cpuworkers--; - connection_mark_for_close(conn); - } - } SMARTLIST_FOREACH_END(conn); + if (assign_onionskin_to_cpuworker(circ, onionskin)) + log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring."); + } } /** Try to tell a cpuworker to perform the public key operations necessary to * respond to <b>onionskin</b> for the circuit <b>circ</b>. * - * If <b>cpuworker</b> is defined, assert that he's idle, and use him. Else, - * look for an idle cpuworker and use him. If none idle, queue task onto the - * pending onion list and return. Return 0 if we successfully assign the - * task, or -1 on failure. + * Return 0 if we successfully assign the task, or -1 on failure. */ int -assign_onionskin_to_cpuworker(connection_t *cpuworker, - or_circuit_t *circ, +assign_onionskin_to_cpuworker(or_circuit_t *circ, create_cell_t *onionskin) { + workqueue_entry_t *queue_entry; + cpuworker_job_t *job; cpuworker_request_t req; - time_t now = approx_time(); - static time_t last_culled_cpuworkers = 0; int should_time; - /* Checking for wedged cpuworkers requires a linear search over all - * connections, so let's do it only once a minute. - */ -#define CULL_CPUWORKERS_INTERVAL 60 - - if (last_culled_cpuworkers + CULL_CPUWORKERS_INTERVAL <= now) { - cull_wedged_cpuworkers(); - spawn_enough_cpuworkers(); - last_culled_cpuworkers = now; - } - if (1) { - if (num_cpuworkers_busy == num_cpuworkers) { + if (!circ->p_chan) { + log_info(LD_OR,"circ->p_chan gone. Failing circ."); + tor_free(onionskin); + return -1; + } + + if (total_pending_tasks >= max_pending_tasks) { log_debug(LD_OR,"No idle cpuworkers. Queuing."); if (onion_pending_add(circ, onionskin) < 0) { tor_free(onionskin); @@ -668,36 +515,14 @@ assign_onionskin_to_cpuworker(connection_t *cpuworker, return 0; } - if (!cpuworker) - cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER, - CPUWORKER_STATE_IDLE); - - tor_assert(cpuworker); - - if (!circ->p_chan) { - log_info(LD_OR,"circ->p_chan gone. Failing circ."); - tor_free(onionskin); - return -1; - } - if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest)) rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type); should_time = should_time_request(onionskin->handshake_type); memset(&req, 0, sizeof(req)); req.magic = CPUWORKER_REQUEST_MAGIC; - tag_pack(req.tag, circ->p_chan->global_identifier, - circ->p_circ_id); req.timed = should_time; - cpuworker->state = CPUWORKER_STATE_BUSY_ONION; - /* touch the lastwritten timestamp, since that's how we check to - * see how long it's been since we asked the question, and sometimes - * we check before the first call to connection_handle_write(). */ - cpuworker->timestamp_lastwritten = now; - num_cpuworkers_busy++; - - req.task = CPUWORKER_TASK_ONION; memcpy(&req.create_cell, onionskin, sizeof(create_cell_t)); tor_free(onionskin); @@ -705,9 +530,46 @@ assign_onionskin_to_cpuworker(connection_t *cpuworker, if (should_time) tor_gettimeofday(&req.started_at); - connection_write_to_buf((void*)&req, sizeof(req), cpuworker); + job = tor_malloc_zero(sizeof(cpuworker_job_t)); + job->chan_id = circ->p_chan->global_identifier; + job->circ_id = circ->p_circ_id; + memcpy(&job->u.request, &req, sizeof(req)); memwipe(&req, 0, sizeof(req)); + + ++total_pending_tasks; + queue_entry = threadpool_queue_work(threadpool, + cpuworker_onion_handshake_threadfn, + cpuworker_onion_handshake_replyfn, + job); + if (!queue_entry) { + log_warn(LD_BUG, "Couldn't queue work on threadpool"); + tor_free(job); + return -1; + } + log_debug(LD_OR, "Queued task %p (qe=%p, chanid="U64_FORMAT", circid=%u)", + job, queue_entry, U64_PRINTF_ARG(job->chan_id), job->circ_id); + + circ->workqueue_entry = queue_entry; } return 0; } +/** If <b>circ</b> has a pending handshake that hasn't been processed yet, + * remove it from the worker queue. */ +void +cpuworker_cancel_circ_handshake(or_circuit_t *circ) +{ + cpuworker_job_t *job; + if (circ->workqueue_entry == NULL) + return; + + job = workqueue_entry_cancel(circ->workqueue_entry); + if (job) { + /* It successfully cancelled. */ + memwipe(job, 0xe0, sizeof(*job)); + tor_free(job); + } + + circ->workqueue_entry = NULL; +} + diff --git a/src/or/cpuworker.h b/src/or/cpuworker.h index 2a2b37a975..70a595e472 100644 --- a/src/or/cpuworker.h +++ b/src/or/cpuworker.h @@ -13,19 +13,17 @@ #define TOR_CPUWORKER_H void cpu_init(void); -void cpuworkers_rotate(void); -int connection_cpu_finished_flushing(connection_t *conn); -int connection_cpu_reached_eof(connection_t *conn); -int connection_cpu_process_inbuf(connection_t *conn); +void cpuworkers_rotate_keyinfo(void); + struct create_cell_t; -int assign_onionskin_to_cpuworker(connection_t *cpuworker, - or_circuit_t *circ, +int assign_onionskin_to_cpuworker(or_circuit_t *circ, struct create_cell_t *onionskin); uint64_t estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type); void cpuworker_log_onionskin_overhead(int severity, int onionskin_type, const char *onionskin_type_name); +void cpuworker_cancel_circ_handshake(or_circuit_t *circ); #endif diff --git a/src/or/main.c b/src/or/main.c index abf3230c4c..136043c117 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -1271,7 +1271,7 @@ run_scheduled_events(time_t now) get_onion_key_set_at()+MIN_ONION_KEY_LIFETIME < now) { log_info(LD_GENERAL,"Rotating onion key."); rotate_onion_key(); - cpuworkers_rotate(); + cpuworkers_rotate_keyinfo(); if (router_rebuild_descriptor(1)<0) { log_info(LD_CONFIG, "Couldn't rebuild router descriptor"); } @@ -1960,9 +1960,9 @@ do_hup(void) * force a retry there. */ if (server_mode(options)) { - /* Restart cpuworker and dnsworker processes, so they get up-to-date + /* Update cpuworker and dnsworker processes, so they get up-to-date * configuration options. */ - cpuworkers_rotate(); + cpuworkers_rotate_keyinfo(); dns_reset(); } return 0; diff --git a/src/or/onion.c b/src/or/onion.c index 3723a3e11e..43fb63c832 100644 --- a/src/or/onion.c +++ b/src/or/onion.c @@ -295,6 +295,8 @@ onion_pending_remove(or_circuit_t *circ) victim = circ->onionqueue_entry; if (victim) onion_queue_entry_remove(victim); + + cpuworker_cancel_circ_handshake(circ); } /** Remove a queue entry <b>victim</b> from the queue, unlinking it from @@ -339,25 +341,25 @@ clear_pending_onions(void) /* ============================================================ */ -/** Fill in a server_onion_keys_t object at <b>keys</b> with all of the keys +/** Return a new server_onion_keys_t object with all of the keys * and other info we might need to do onion handshakes. (We make a copy of * our keys for each cpuworker to avoid race conditions with the main thread, * and to avoid locking) */ -void -setup_server_onion_keys(server_onion_keys_t *keys) +server_onion_keys_t * +server_onion_keys_new(void) { - memset(keys, 0, sizeof(server_onion_keys_t)); + server_onion_keys_t *keys = tor_malloc_zero(sizeof(server_onion_keys_t)); memcpy(keys->my_identity, router_get_my_id_digest(), DIGEST_LEN); dup_onion_keys(&keys->onion_key, &keys->last_onion_key); keys->curve25519_key_map = construct_ntor_key_map(); keys->junk_keypair = tor_malloc_zero(sizeof(curve25519_keypair_t)); curve25519_keypair_generate(keys->junk_keypair, 0); + return keys; } -/** Release all storage held in <b>keys</b>, but do not free <b>keys</b> - * itself (as it's likely to be stack-allocated.) */ +/** Release all storage held in <b>keys</b>. */ void -release_server_onion_keys(server_onion_keys_t *keys) +server_onion_keys_free(server_onion_keys_t *keys) { if (! keys) return; @@ -366,7 +368,8 @@ release_server_onion_keys(server_onion_keys_t *keys) crypto_pk_free(keys->last_onion_key); ntor_key_map_free(keys->curve25519_key_map); tor_free(keys->junk_keypair); - memset(keys, 0, sizeof(server_onion_keys_t)); + memwipe(keys, 0, sizeof(server_onion_keys_t)); + tor_free(keys); } /** Release whatever storage is held in <b>state</b>, depending on its diff --git a/src/or/onion.h b/src/or/onion.h index 35619879e4..96050083f8 100644 --- a/src/or/onion.h +++ b/src/or/onion.h @@ -30,8 +30,8 @@ typedef struct server_onion_keys_t { #define MAX_ONIONSKIN_CHALLENGE_LEN 255 #define MAX_ONIONSKIN_REPLY_LEN 255 -void setup_server_onion_keys(server_onion_keys_t *keys); -void release_server_onion_keys(server_onion_keys_t *keys); +server_onion_keys_t *server_onion_keys_new(void); +void server_onion_keys_free(server_onion_keys_t *keys); void onion_handshake_state_release(onion_handshake_state_t *state); diff --git a/src/or/or.h b/src/or/or.h index 8a15529336..4ff3555845 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -213,8 +213,7 @@ typedef enum { #define CONN_TYPE_DIR_LISTENER 8 /** Type for HTTP connections to the directory server. */ #define CONN_TYPE_DIR 9 -/** Connection from the main process to a CPU worker process. */ -#define CONN_TYPE_CPUWORKER 10 +/* Type 10 is unused. */ /** Type for listening for connections from user interface process. */ #define CONN_TYPE_CONTROL_LISTENER 11 /** Type for connections from user interface process. */ @@ -276,17 +275,6 @@ typedef enum { /** State for any listener connection. */ #define LISTENER_STATE_READY 0 -#define CPUWORKER_STATE_MIN_ 1 -/** State for a connection to a cpuworker process that's idle. */ -#define CPUWORKER_STATE_IDLE 1 -/** State for a connection to a cpuworker process that's processing a - * handshake. */ -#define CPUWORKER_STATE_BUSY_ONION 2 -#define CPUWORKER_STATE_MAX_ 2 - -#define CPUWORKER_TASK_ONION CPUWORKER_STATE_BUSY_ONION -#define CPUWORKER_TASK_SHUTDOWN 255 - #define OR_CONN_STATE_MIN_ 1 /** State for a connection to an OR: waiting for connect() to finish. */ #define OR_CONN_STATE_CONNECTING 1 @@ -3158,6 +3146,9 @@ typedef struct or_circuit_t { /** Pointer to an entry on the onion queue, if this circuit is waiting for a * chance to give an onionskin to a cpuworker. Used only in onion.c */ struct onion_queue_t *onionqueue_entry; + /** Pointer to a workqueue entry, if this circuit has given an onionskin to + * a cpuworker and is waiting for a response. Used only in cpuworker.c */ + struct workqueue_entry_s *workqueue_entry; /** The circuit_id used in the previous (backward) hop of this circuit. */ circid_t p_circ_id; |