diff options
author | Nick Mathewson <nickm@torproject.org> | 2013-10-02 12:32:09 -0400 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2015-01-14 11:23:34 -0500 |
commit | 1e896214e7eb5ede65663486291252b171e9daea (patch) | |
tree | 3f8d6aeb91e844888f16a2295e2200e22979a181 /src/or/cpuworker.c | |
parent | cc6529e9bb7d7e01a25b5632d6d6c2424c6fc2b4 (diff) | |
download | tor-1e896214e7eb5ede65663486291252b171e9daea.tar.gz tor-1e896214e7eb5ede65663486291252b171e9daea.zip |
Refactor cpuworker to use workqueue/threadpool code.
Diffstat (limited to 'src/or/cpuworker.c')
-rw-r--r-- | src/or/cpuworker.c | 532 |
1 files changed, 197 insertions, 335 deletions
diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c index 340fbec620..f3f275d099 100644 --- a/src/or/cpuworker.c +++ b/src/or/cpuworker.c @@ -5,84 +5,98 @@ /** * \file cpuworker.c - * \brief Implements a farm of 'CPU worker' processes to perform - * CPU-intensive tasks in another thread or process, to not - * interrupt the main thread. + * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities + * out to subprocesses. * * Right now, we only use this for processing onionskins. **/ #include "or.h" -#include "buffers.h" #include "channel.h" -#include "channeltls.h" #include "circuitbuild.h" #include "circuitlist.h" -#include "config.h" -#include "connection.h" #include "connection_or.h" +#include "config.h" #include "cpuworker.h" #include "main.h" #include "onion.h" #include "rephist.h" #include "router.h" +#include "workqueue.h" -/** The maximum number of cpuworker processes we will keep around. */ -#define MAX_CPUWORKERS 16 -/** The minimum number of cpuworker processes we will keep around. */ -#define MIN_CPUWORKERS 1 - -/** The tag specifies which circuit this onionskin was from. */ -#define TAG_LEN 12 +#ifdef HAVE_EVENT2_EVENT_H +#include <event2/event.h> +#else +#include <event.h> +#endif -/** How many cpuworkers we have running right now. */ -static int num_cpuworkers=0; -/** How many of the running cpuworkers have an assigned task right now. */ -static int num_cpuworkers_busy=0; -/** We need to spawn new cpuworkers whenever we rotate the onion keys - * on platforms where execution contexts==processes. This variable stores - * the last time we got a key rotation event. */ -static time_t last_rotation_time=0; +static void queue_pending_tasks(void); -static void cpuworker_main(void *data) ATTR_NORETURN; -static int spawn_cpuworker(void); -static void spawn_enough_cpuworkers(void); -static void process_pending_task(connection_t *cpuworker); +typedef struct worker_state_s { + int generation; + server_onion_keys_t *onion_keys; +} worker_state_t; -/** Initialize the cpuworker subsystem. - */ -void -cpu_init(void) +static void * +worker_state_new(void *arg) { - cpuworkers_rotate(); + worker_state_t *ws; + (void)arg; + ws = tor_malloc_zero(sizeof(worker_state_t)); + ws->onion_keys = server_onion_keys_new(); + return ws; } - -/** Called when we're done sending a request to a cpuworker. */ -int -connection_cpu_finished_flushing(connection_t *conn) +static void +worker_state_free(void *arg) { - tor_assert(conn); - tor_assert(conn->type == CONN_TYPE_CPUWORKER); - return 0; + worker_state_t *ws = arg; + server_onion_keys_free(ws->onion_keys); + tor_free(ws); } -/** Pack global_id and circ_id; set *tag to the result. (See note on - * cpuworker_main for wire format.) */ +static replyqueue_t *replyqueue = NULL; +static threadpool_t *threadpool = NULL; +static struct event *reply_event = NULL; + +static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT; + +static int total_pending_tasks = 0; +static int max_pending_tasks = 128; + static void -tag_pack(uint8_t *tag, uint64_t chan_id, circid_t circ_id) +replyqueue_process_cb(evutil_socket_t sock, short events, void *arg) { - /*XXXX RETHINK THIS WHOLE MESS !!!! !NM NM NM NM*/ - /*XXXX DOUBLEPLUSTHIS!!!! AS AS AS AS*/ - set_uint64(tag, chan_id); - set_uint32(tag+8, circ_id); + replyqueue_t *rq = arg; + (void) sock; + (void) events; + replyqueue_process(rq); } -/** Unpack <b>tag</b> into addr, port, and circ_id. +/** Initialize the cpuworker subsystem. */ -static void -tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id) +void +cpu_init(void) { - *chan_id = get_uint64(tag); - *circ_id = get_uint32(tag+8); + if (!replyqueue) { + replyqueue = replyqueue_new(0); + } + if (!reply_event) { + reply_event = tor_event_new(tor_libevent_get_base(), + replyqueue_get_socket(replyqueue), + EV_READ|EV_PERSIST, + replyqueue_process_cb, + replyqueue); + event_add(reply_event, NULL); + } + if (!threadpool) { + threadpool = threadpool_new(get_num_cpus(get_options()), + replyqueue, + worker_state_new, + worker_state_free, + NULL); + } + /* Total voodoo. Can we make this more sensible? */ + max_pending_tasks = get_num_cpus(get_options()) * 64; + crypto_seed_weak_rng(&request_sample_rng); } /** Magic numbers to make sure our cpuworker_requests don't grow any @@ -94,10 +108,6 @@ tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id) typedef struct cpuworker_request_t { /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */ uint32_t magic; - /** Opaque tag to identify the job */ - uint8_t tag[TAG_LEN]; - /** Task code. Must be one of CPUWORKER_TASK_* */ - uint8_t task; /** Flag: Are we timing this request? */ unsigned timed : 1; @@ -114,8 +124,7 @@ typedef struct cpuworker_request_t { typedef struct cpuworker_reply_t { /** Magic number; must be CPUWORKER_REPLY_MAGIC. */ uint32_t magic; - /** Opaque tag to identify the job; matches the request's tag.*/ - uint8_t tag[TAG_LEN]; + /** True iff we got a successful request. */ uint8_t success; @@ -142,42 +151,46 @@ typedef struct cpuworker_reply_t { uint8_t rend_auth_material[DIGEST_LEN]; } cpuworker_reply_t; +typedef struct cpuworker_job_u { + uint64_t chan_id; + uint32_t circ_id; + union { + cpuworker_request_t request; + cpuworker_reply_t reply; + } u; +} cpuworker_job_t; + +static int +update_state_threadfn(void *state_, void *work_) +{ + worker_state_t *state = state_; + worker_state_t *update = work_; + server_onion_keys_free(state->onion_keys); + state->onion_keys = update->onion_keys; + update->onion_keys = NULL; + ++state->generation; + return WQ_RPL_REPLY; +} +static void +update_state_replyfn(void *work_) +{ + tor_free(work_); +} + /** Called when the onion key has changed and we need to spawn new * cpuworkers. Close all currently idle cpuworkers, and mark the last * rotation time as now. */ void -cpuworkers_rotate(void) +cpuworkers_rotate_keyinfo(void) { - connection_t *cpuworker; - while ((cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER, - CPUWORKER_STATE_IDLE))) { - connection_mark_for_close(cpuworker); - --num_cpuworkers; + if (threadpool_queue_for_all(threadpool, + worker_state_new, + update_state_threadfn, + update_state_replyfn, + NULL)) { + log_warn(LD_OR, "Failed to queue key update for worker threads."); } - last_rotation_time = time(NULL); - if (server_mode(get_options())) - spawn_enough_cpuworkers(); -} - -/** If the cpuworker closes the connection, - * mark it as closed and spawn a new one as needed. */ -int -connection_cpu_reached_eof(connection_t *conn) -{ - log_warn(LD_GENERAL,"Read eof. CPU worker died unexpectedly."); - if (conn->state != CPUWORKER_STATE_IDLE) { - /* the circ associated with this cpuworker will have to wait until - * it gets culled in run_connection_housekeeping(), since we have - * no way to find out which circ it was. */ - log_warn(LD_GENERAL,"...and it left a circuit queued; abandoning circ."); - num_cpuworkers_busy--; - } - num_cpuworkers--; - spawn_enough_cpuworkers(); /* try to regrow. hope we don't end up - spinning. */ - connection_mark_for_close(conn); - return 0; } /** Indexed by handshake type: how many onionskins have we processed and @@ -197,8 +210,6 @@ static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1]; * time. (microseconds) */ #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000) -static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT; - /** Return true iff we'd like to measure a handshake of type * <b>onionskin_type</b>. Call only from the main thread. */ static int @@ -286,31 +297,22 @@ cpuworker_log_onionskin_overhead(int severity, int onionskin_type, onionskin_type_name, (unsigned)overhead, relative_overhead*100); } -/** Called when we get data from a cpuworker. If the answer is not complete, - * wait for a complete answer. If the answer is complete, - * process it as appropriate. - */ -int -connection_cpu_process_inbuf(connection_t *conn) +/** */ +static void +cpuworker_onion_handshake_replyfn(void *work_) { + cpuworker_job_t *job = work_; + cpuworker_reply_t rpl; uint64_t chan_id; circid_t circ_id; channel_t *p_chan = NULL; - circuit_t *circ; - - tor_assert(conn); - tor_assert(conn->type == CONN_TYPE_CPUWORKER); + circuit_t *circ = NULL; - if (!connection_get_inbuf_len(conn)) - return 0; - - if (conn->state == CPUWORKER_STATE_BUSY_ONION) { - cpuworker_reply_t rpl; - if (connection_get_inbuf_len(conn) < sizeof(cpuworker_reply_t)) - return 0; /* not yet */ - tor_assert(connection_get_inbuf_len(conn) == sizeof(cpuworker_reply_t)); + --total_pending_tasks; - connection_fetch_from_buf((void*)&rpl,sizeof(cpuworker_reply_t),conn); + if (1) { + /* Could avoid this, but doesn't matter. */ + memcpy(&rpl, &job->u.reply, sizeof(rpl)); tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC); @@ -337,18 +339,21 @@ connection_cpu_process_inbuf(connection_t *conn) } } } - /* parse out the circ it was talking about */ - tag_unpack(rpl.tag, &chan_id, &circ_id); - circ = NULL; - log_debug(LD_OR, - "Unpacking cpuworker reply, chan_id is " U64_FORMAT - ", circ_id is %u", - U64_PRINTF_ARG(chan_id), (unsigned)circ_id); + /* Find the circ it was talking about */ + chan_id = job->chan_id; + circ_id = job->circ_id; + p_chan = channel_find_by_global_id(chan_id); if (p_chan) circ = circuit_get_by_circid_channel(circ_id, p_chan); + log_debug(LD_OR, + "Unpacking cpuworker reply %p, chan_id is " U64_FORMAT + ", circ_id is %u, p_chan=%p, circ=%p, success=%d", + job, U64_PRINTF_ARG(chan_id), (unsigned)circ_id, + p_chan, circ, rpl.success); + if (rpl.success == 0) { log_debug(LD_OR, "decoding onionskin failed. " @@ -367,6 +372,7 @@ connection_cpu_process_inbuf(connection_t *conn) goto done_processing; } tor_assert(! CIRCUIT_IS_ORIGIN(circ)); + TO_OR_CIRCUIT(circ)->workqueue_entry = NULL; if (onionskin_answer(TO_OR_CIRCUIT(circ), &rpl.created_cell, (const char*)rpl.keys, @@ -376,58 +382,33 @@ connection_cpu_process_inbuf(connection_t *conn) goto done_processing; } log_debug(LD_OR,"onionskin_answer succeeded. Yay."); - } else { - tor_assert(0); /* don't ask me to do handshakes yet */ } done_processing: - conn->state = CPUWORKER_STATE_IDLE; - num_cpuworkers_busy--; - if (conn->timestamp_created < last_rotation_time) { - connection_mark_for_close(conn); - num_cpuworkers--; - spawn_enough_cpuworkers(); - } else { - process_pending_task(conn); - } - return 0; + memwipe(&rpl, 0, sizeof(rpl)); + memwipe(job, 0, sizeof(*job)); + tor_free(job); + queue_pending_tasks(); } -/** Implement a cpuworker. 'data' is an fdarray as returned by socketpair. - * Read and writes from fdarray[1]. Reads requests, writes answers. - * - * Request format: - * cpuworker_request_t. - * Response format: - * cpuworker_reply_t - */ -static void -cpuworker_main(void *data) +/** Implementation function for onion handshake requests. */ +static int +cpuworker_onion_handshake_threadfn(void *state_, void *work_) { - /* For talking to the parent thread/process */ - tor_socket_t *fdarray = data; - tor_socket_t fd; + worker_state_t *state = state_; + cpuworker_job_t *job = work_; /* variables for onion processing */ - server_onion_keys_t onion_keys; + server_onion_keys_t *onion_keys = state->onion_keys; cpuworker_request_t req; cpuworker_reply_t rpl; - fd = fdarray[1]; /* this side is ours */ - tor_free(data); - - setup_server_onion_keys(&onion_keys); - - for (;;) { - if (read_all(fd, (void *)&req, sizeof(req), 1) != sizeof(req)) { - log_info(LD_OR, "read request failed. Exiting."); - goto end; - } - tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC); + memcpy(&req, &job->u.request, sizeof(req)); - memset(&rpl, 0, sizeof(rpl)); + tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC); + memset(&rpl, 0, sizeof(rpl)); - if (req.task == CPUWORKER_TASK_ONION) { + if (1) { const create_cell_t *cc = &req.create_cell; created_cell_t *cell_out = &rpl.created_cell; struct timeval tv_start = {0,0}, tv_end; @@ -439,7 +420,7 @@ cpuworker_main(void *data) tor_gettimeofday(&tv_start); n = onion_skin_server_handshake(cc->handshake_type, cc->onionskin, cc->handshake_len, - &onion_keys, + onion_keys, cell_out->reply, rpl.keys, CPATH_KEY_MATERIAL_LEN, rpl.rend_auth_material); @@ -447,12 +428,10 @@ cpuworker_main(void *data) /* failure */ log_debug(LD_OR,"onion_skin_server_handshake failed."); memset(&rpl, 0, sizeof(rpl)); - memcpy(rpl.tag, req.tag, TAG_LEN); rpl.success = 0; } else { /* success */ log_debug(LD_OR,"onion_skin_server_handshake succeeded."); - memcpy(rpl.tag, req.tag, TAG_LEN); cell_out->handshake_len = n; switch (cc->cell_type) { case CELL_CREATE: @@ -463,7 +442,7 @@ cpuworker_main(void *data) cell_out->cell_type = CELL_CREATED_FAST; break; default: tor_assert(0); - goto end; + return WQ_RPL_SHUTDOWN; } rpl.success = 1; } @@ -479,187 +458,55 @@ cpuworker_main(void *data) else rpl.n_usec = (uint32_t) usec; } - if (write_all(fd, (void*)&rpl, sizeof(rpl), 1) != sizeof(rpl)) { - log_err(LD_BUG,"writing response buf failed. Exiting."); - goto end; - } - log_debug(LD_OR,"finished writing response."); - } else if (req.task == CPUWORKER_TASK_SHUTDOWN) { - log_info(LD_OR,"Clean shutdown: exiting"); - goto end; - } - memwipe(&req, 0, sizeof(req)); - memwipe(&rpl, 0, sizeof(req)); - } - end: - memwipe(&req, 0, sizeof(req)); - memwipe(&rpl, 0, sizeof(req)); - release_server_onion_keys(&onion_keys); - tor_close_socket(fd); - crypto_thread_cleanup(); - spawn_exit(); -} - -/** Launch a new cpuworker. Return 0 if we're happy, -1 if we failed. - */ -static int -spawn_cpuworker(void) -{ - tor_socket_t *fdarray; - tor_socket_t fd; - connection_t *conn; - int err; - - fdarray = tor_calloc(2, sizeof(tor_socket_t)); - if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fdarray)) < 0) { - log_warn(LD_NET, "Couldn't construct socketpair for cpuworker: %s", - tor_socket_strerror(-err)); - tor_free(fdarray); - return -1; - } - - tor_assert(SOCKET_OK(fdarray[0])); - tor_assert(SOCKET_OK(fdarray[1])); - - fd = fdarray[0]; - if (spawn_func(cpuworker_main, (void*)fdarray) < 0) { - tor_close_socket(fdarray[0]); - tor_close_socket(fdarray[1]); - tor_free(fdarray); - return -1; } - log_debug(LD_OR,"just spawned a cpu worker."); - - conn = connection_new(CONN_TYPE_CPUWORKER, AF_UNIX); - - /* set up conn so it's got all the data we need to remember */ - conn->s = fd; - conn->address = tor_strdup("localhost"); - tor_addr_make_unspec(&conn->addr); - - if (set_socket_nonblocking(fd) == -1) { - connection_free(conn); /* this closes fd */ - return -1; - } - - if (connection_add(conn) < 0) { /* no space, forget it */ - log_warn(LD_NET,"connection_add for cpuworker failed. Giving up."); - connection_free(conn); /* this closes fd */ - return -1; - } - - conn->state = CPUWORKER_STATE_IDLE; - connection_start_reading(conn); - return 0; /* success */ -} - -/** If we have too few or too many active cpuworkers, try to spawn new ones - * or kill idle ones. - */ -static void -spawn_enough_cpuworkers(void) -{ - int num_cpuworkers_needed = get_num_cpus(get_options()); - int reseed = 0; - - if (num_cpuworkers_needed < MIN_CPUWORKERS) - num_cpuworkers_needed = MIN_CPUWORKERS; - if (num_cpuworkers_needed > MAX_CPUWORKERS) - num_cpuworkers_needed = MAX_CPUWORKERS; - - while (num_cpuworkers < num_cpuworkers_needed) { - if (spawn_cpuworker() < 0) { - log_warn(LD_GENERAL,"Cpuworker spawn failed. Will try again later."); - return; - } - num_cpuworkers++; - reseed++; - } + memcpy(&job->u.reply, &rpl, sizeof(rpl)); - if (reseed) - crypto_seed_weak_rng(&request_sample_rng); + memwipe(&req, 0, sizeof(req)); + memwipe(&rpl, 0, sizeof(req)); + return WQ_RPL_REPLY; } -/** Take a pending task from the queue and assign it to 'cpuworker'. */ +/** Take pending tasks from the queue and assign them to cpuworkers. */ static void -process_pending_task(connection_t *cpuworker) +queue_pending_tasks(void) { or_circuit_t *circ; create_cell_t *onionskin = NULL; - tor_assert(cpuworker); + while (total_pending_tasks < max_pending_tasks) { + circ = onion_next_task(&onionskin); - /* for now only process onion tasks */ - - circ = onion_next_task(&onionskin); - if (!circ) - return; - if (assign_onionskin_to_cpuworker(cpuworker, circ, onionskin)) - log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring."); -} - -/** How long should we let a cpuworker stay busy before we give - * up on it and decide that we have a bug or infinite loop? - * This value is high because some servers with low memory/cpu - * sometimes spend an hour or more swapping, and Tor starves. */ -#define CPUWORKER_BUSY_TIMEOUT (60*60*12) + if (!circ) + return; -/** We have a bug that I can't find. Sometimes, very rarely, cpuworkers get - * stuck in the 'busy' state, even though the cpuworker process thinks of - * itself as idle. I don't know why. But here's a workaround to kill any - * cpuworker that's been busy for more than CPUWORKER_BUSY_TIMEOUT. - */ -static void -cull_wedged_cpuworkers(void) -{ - time_t now = time(NULL); - smartlist_t *conns = get_connection_array(); - SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { - if (!conn->marked_for_close && - conn->type == CONN_TYPE_CPUWORKER && - conn->state == CPUWORKER_STATE_BUSY_ONION && - conn->timestamp_lastwritten + CPUWORKER_BUSY_TIMEOUT < now) { - log_notice(LD_BUG, - "closing wedged cpuworker. Can somebody find the bug?"); - num_cpuworkers_busy--; - num_cpuworkers--; - connection_mark_for_close(conn); - } - } SMARTLIST_FOREACH_END(conn); + if (assign_onionskin_to_cpuworker(circ, onionskin)) + log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring."); + } } /** Try to tell a cpuworker to perform the public key operations necessary to * respond to <b>onionskin</b> for the circuit <b>circ</b>. * - * If <b>cpuworker</b> is defined, assert that he's idle, and use him. Else, - * look for an idle cpuworker and use him. If none idle, queue task onto the - * pending onion list and return. Return 0 if we successfully assign the - * task, or -1 on failure. + * Return 0 if we successfully assign the task, or -1 on failure. */ int -assign_onionskin_to_cpuworker(connection_t *cpuworker, - or_circuit_t *circ, +assign_onionskin_to_cpuworker(or_circuit_t *circ, create_cell_t *onionskin) { + workqueue_entry_t *queue_entry; + cpuworker_job_t *job; cpuworker_request_t req; - time_t now = approx_time(); - static time_t last_culled_cpuworkers = 0; int should_time; - /* Checking for wedged cpuworkers requires a linear search over all - * connections, so let's do it only once a minute. - */ -#define CULL_CPUWORKERS_INTERVAL 60 - - if (last_culled_cpuworkers + CULL_CPUWORKERS_INTERVAL <= now) { - cull_wedged_cpuworkers(); - spawn_enough_cpuworkers(); - last_culled_cpuworkers = now; - } - if (1) { - if (num_cpuworkers_busy == num_cpuworkers) { + if (!circ->p_chan) { + log_info(LD_OR,"circ->p_chan gone. Failing circ."); + tor_free(onionskin); + return -1; + } + + if (total_pending_tasks >= max_pending_tasks) { log_debug(LD_OR,"No idle cpuworkers. Queuing."); if (onion_pending_add(circ, onionskin) < 0) { tor_free(onionskin); @@ -668,36 +515,14 @@ assign_onionskin_to_cpuworker(connection_t *cpuworker, return 0; } - if (!cpuworker) - cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER, - CPUWORKER_STATE_IDLE); - - tor_assert(cpuworker); - - if (!circ->p_chan) { - log_info(LD_OR,"circ->p_chan gone. Failing circ."); - tor_free(onionskin); - return -1; - } - if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest)) rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type); should_time = should_time_request(onionskin->handshake_type); memset(&req, 0, sizeof(req)); req.magic = CPUWORKER_REQUEST_MAGIC; - tag_pack(req.tag, circ->p_chan->global_identifier, - circ->p_circ_id); req.timed = should_time; - cpuworker->state = CPUWORKER_STATE_BUSY_ONION; - /* touch the lastwritten timestamp, since that's how we check to - * see how long it's been since we asked the question, and sometimes - * we check before the first call to connection_handle_write(). */ - cpuworker->timestamp_lastwritten = now; - num_cpuworkers_busy++; - - req.task = CPUWORKER_TASK_ONION; memcpy(&req.create_cell, onionskin, sizeof(create_cell_t)); tor_free(onionskin); @@ -705,9 +530,46 @@ assign_onionskin_to_cpuworker(connection_t *cpuworker, if (should_time) tor_gettimeofday(&req.started_at); - connection_write_to_buf((void*)&req, sizeof(req), cpuworker); + job = tor_malloc_zero(sizeof(cpuworker_job_t)); + job->chan_id = circ->p_chan->global_identifier; + job->circ_id = circ->p_circ_id; + memcpy(&job->u.request, &req, sizeof(req)); memwipe(&req, 0, sizeof(req)); + + ++total_pending_tasks; + queue_entry = threadpool_queue_work(threadpool, + cpuworker_onion_handshake_threadfn, + cpuworker_onion_handshake_replyfn, + job); + if (!queue_entry) { + log_warn(LD_BUG, "Couldn't queue work on threadpool"); + tor_free(job); + return -1; + } + log_debug(LD_OR, "Queued task %p (qe=%p, chanid="U64_FORMAT", circid=%u)", + job, queue_entry, U64_PRINTF_ARG(job->chan_id), job->circ_id); + + circ->workqueue_entry = queue_entry; } return 0; } +/** If <b>circ</b> has a pending handshake that hasn't been processed yet, + * remove it from the worker queue. */ +void +cpuworker_cancel_circ_handshake(or_circuit_t *circ) +{ + cpuworker_job_t *job; + if (circ->workqueue_entry == NULL) + return; + + job = workqueue_entry_cancel(circ->workqueue_entry); + if (job) { + /* It successfully cancelled. */ + memwipe(job, 0xe0, sizeof(*job)); + tor_free(job); + } + + circ->workqueue_entry = NULL; +} + |