aboutsummaryrefslogtreecommitdiff
path: root/src/or/cpuworker.c
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2012-12-05 22:34:49 -0500
committerNick Mathewson <nickm@torproject.org>2013-01-03 11:29:46 -0500
commit2802ccaeb6b95e693af7736e58e91434d28ac6a2 (patch)
treee11e0d9753d2b6bd40761c3b03f491468a1c8d58 /src/or/cpuworker.c
parent5d15d597a9059d0f87ced081e187db622caa7978 (diff)
downloadtor-2802ccaeb6b95e693af7736e58e91434d28ac6a2.tar.gz
tor-2802ccaeb6b95e693af7736e58e91434d28ac6a2.zip
Teach cpuworker and others about create_cell_t and friends
The unit of work sent to a cpuworker is now a create_cell_t; its response is now a created_cell_t. Several of the things that call or get called by this chain of logic now take create_cell_t or created_cell_t too. Since all cpuworkers are forked or spawned by Tor, they don't need a stable wire protocol, so we can just send structs. This saves us some insanity, and helps p
Diffstat (limited to 'src/or/cpuworker.c')
-rw-r--r--src/or/cpuworker.c189
1 files changed, 103 insertions, 86 deletions
diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c
index e8087c2b8c..a8ec0275a5 100644
--- a/src/or/cpuworker.c
+++ b/src/or/cpuworker.c
@@ -23,7 +23,6 @@
#include "cpuworker.h"
#include "main.h"
#include "onion.h"
-#include "onion_tap.h"
#include "router.h"
/** The maximum number of cpuworker processes we will keep around. */
@@ -33,9 +32,6 @@
/** The tag specifies which circuit this onionskin was from. */
#define TAG_LEN 10
-/** How many bytes are sent from the cpuworker back to tor? */
-#define LEN_ONION_RESPONSE \
- (1+TAG_LEN+TAP_ONIONSKIN_REPLY_LEN+CPATH_KEY_MATERIAL_LEN)
/** How many cpuworkers we have running right now. */
static int num_cpuworkers=0;
@@ -71,7 +67,7 @@ connection_cpu_finished_flushing(connection_t *conn)
/** Pack global_id and circ_id; set *tag to the result. (See note on
* cpuworker_main for wire format.) */
static void
-tag_pack(char *tag, uint64_t chan_id, circid_t circ_id)
+tag_pack(uint8_t *tag, uint64_t chan_id, circid_t circ_id)
{
/*XXXX RETHINK THIS WHOLE MESS !!!! !NM NM NM NM*/
/*XXXX DOUBLEPLUSTHIS!!!! AS AS AS AS*/
@@ -82,12 +78,38 @@ tag_pack(char *tag, uint64_t chan_id, circid_t circ_id)
/** Unpack <b>tag</b> into addr, port, and circ_id.
*/
static void
-tag_unpack(const char *tag, uint64_t *chan_id, circid_t *circ_id)
+tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id)
{
*chan_id = get_uint64(tag);
*circ_id = get_uint16(tag+8);
}
+/** DOCDOC */
+#define CPUWORKER_REQUEST_MAGIC 0xda4afeed
+#define CPUWORKER_REPLY_MAGIC 0x5eedf00d
+
+/**DOCDOC*/
+typedef struct cpuworker_request_t {
+ uint32_t magic;
+ /** Opaque tag to identify the job */
+ uint8_t tag[TAG_LEN];
+ uint8_t task;
+
+ create_cell_t create_cell;
+ /* Turn the above into a tagged union if needed. */
+} cpuworker_request_t;
+
+/**DOCDOC*/
+typedef struct cpuworker_reply_t {
+ uint32_t magic;
+ uint8_t tag[TAG_LEN];
+ uint8_t success;
+
+ created_cell_t created_cell;
+ uint8_t keys[CPATH_KEY_MATERIAL_LEN];
+ uint8_t rend_auth_material[DIGEST_LEN];
+} cpuworker_reply_t;
+
/** Called when the onion key has changed and we need to spawn new
* cpuworkers. Close all currently idle cpuworkers, and mark the last
* rotation time as now.
@@ -133,8 +155,6 @@ connection_cpu_reached_eof(connection_t *conn)
int
connection_cpu_process_inbuf(connection_t *conn)
{
- char success;
- char buf[LEN_ONION_RESPONSE];
uint64_t chan_id;
circid_t circ_id;
channel_t *p_chan = NULL;
@@ -147,15 +167,16 @@ connection_cpu_process_inbuf(connection_t *conn)
return 0;
if (conn->state == CPUWORKER_STATE_BUSY_ONION) {
- if (connection_get_inbuf_len(conn) < LEN_ONION_RESPONSE)
+ cpuworker_reply_t rpl;
+ if (connection_get_inbuf_len(conn) < sizeof(cpuworker_reply_t))
return 0; /* not yet */
- tor_assert(connection_get_inbuf_len(conn) == LEN_ONION_RESPONSE);
+ tor_assert(connection_get_inbuf_len(conn) == sizeof(cpuworker_reply_t));
- connection_fetch_from_buf(&success,1,conn);
- connection_fetch_from_buf(buf,LEN_ONION_RESPONSE-1,conn);
+ connection_fetch_from_buf((void*)&rpl,sizeof(cpuworker_reply_t),conn);
+ tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
/* parse out the circ it was talking about */
- tag_unpack(buf, &chan_id, &circ_id);
+ tag_unpack(rpl.tag, &chan_id, &circ_id);
circ = NULL;
log_debug(LD_OR,
"Unpacking cpuworker reply, chan_id is " U64_FORMAT
@@ -166,7 +187,7 @@ connection_cpu_process_inbuf(connection_t *conn)
if (p_chan)
circ = circuit_get_by_circid_channel(circ_id, p_chan);
- if (success == 0) {
+ if (rpl.success == 0) {
log_debug(LD_OR,
"decoding onionskin failed. "
"(Old key or bad software.) Closing.");
@@ -184,9 +205,12 @@ connection_cpu_process_inbuf(connection_t *conn)
goto done_processing;
}
tor_assert(! CIRCUIT_IS_ORIGIN(circ));
- if (onionskin_answer(TO_OR_CIRCUIT(circ), CELL_CREATED, buf+TAG_LEN,
- TAP_ONIONSKIN_REPLY_LEN,
- buf+TAG_LEN+TAP_ONIONSKIN_REPLY_LEN) < 0) {
+ if (onionskin_answer(TO_OR_CIRCUIT(circ),
+ rpl.created_cell.cell_type,
+ (const char*)rpl.created_cell.reply,
+ rpl.created_cell.handshake_len,
+ (const char*)rpl.keys,
+ rpl.rend_auth_material) < 0) {
log_warn(LD_OR,"onionskin_answer failed. Closing.");
circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
goto done_processing;
@@ -213,32 +237,21 @@ connection_cpu_process_inbuf(connection_t *conn)
* Read and writes from fdarray[1]. Reads requests, writes answers.
*
* Request format:
- * Task type [1 byte, always CPUWORKER_TASK_ONION]
- * Opaque tag TAG_LEN
- * Onionskin challenge TAP_ONIONSKIN_CHALLENGE_LEN
+ * cpuworker_request_t.
* Response format:
- * Success/failure [1 byte, boolean.]
- * Opaque tag TAG_LEN
- * Onionskin challenge TAP_ONIONSKIN_REPLY_LEN
- * Negotiated keys KEY_LEN*2+DIGEST_LEN*2
- *
- * (Note: this _should_ be by addr/port, since we're concerned with specific
- * connections, not with routers (where we'd use identity).)
+ * cpuworker_reply_t
*/
static void
cpuworker_main(void *data)
{
- char question[TAP_ONIONSKIN_CHALLENGE_LEN];
- uint8_t question_type;
+ /* For talking to the parent thread/process */
tor_socket_t *fdarray = data;
tor_socket_t fd;
/* variables for onion processing */
- char keys[CPATH_KEY_MATERIAL_LEN];
- char reply_to_proxy[MAX_ONIONSKIN_REPLY_LEN];
- char buf[LEN_ONION_RESPONSE];
- char tag[TAG_LEN];
server_onion_keys_t onion_keys;
+ cpuworker_request_t req;
+ cpuworker_reply_t rpl;
fd = fdarray[1]; /* this side is ours */
#ifndef TOR_IS_MULTITHREADED
@@ -252,65 +265,64 @@ cpuworker_main(void *data)
setup_server_onion_keys(&onion_keys);
for (;;) {
- ssize_t r;
-
- if ((r = recv(fd, (void *)&question_type, 1, 0)) != 1) {
-// log_fn(LOG_ERR,"read type failed. Exiting.");
- if (r == 0) {
- log_info(LD_OR,
- "CPU worker exiting because Tor process closed connection "
- "(either rotated keys or died).");
- } else {
- log_info(LD_OR,
- "CPU worker exiting because of error on connection to Tor "
- "process.");
- log_info(LD_OR,"(Error on %d was %s)",
- fd, tor_socket_strerror(tor_socket_errno(fd)));
- }
+ if (read_all(fd, (void *)&req, sizeof(req), 1) != sizeof(req)) {
+ log_info(LD_OR, "read request failed. Exiting.");
goto end;
}
- tor_assert(question_type == CPUWORKER_TASK_ONION);
-
- if (read_all(fd, tag, TAG_LEN, 1) != TAG_LEN) {
- log_err(LD_BUG,"read tag failed. Exiting.");
- goto end;
- }
-
- if (read_all(fd, question, TAP_ONIONSKIN_CHALLENGE_LEN, 1) !=
- TAP_ONIONSKIN_CHALLENGE_LEN) {
- log_err(LD_BUG,"read question failed. Exiting.");
- goto end;
- }
-
- if (question_type == CPUWORKER_TASK_ONION) {
- if (onion_skin_server_handshake(ONION_HANDSHAKE_TYPE_TAP,
- (const uint8_t*)question,
- &onion_keys,
- (uint8_t*)reply_to_proxy,
- (uint8_t*)keys, CPATH_KEY_MATERIAL_LEN) < 0) {
+ tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
+
+ memset(&rpl, 0, sizeof(rpl));
+
+ if (req.task == CPUWORKER_TASK_ONION) {
+ const create_cell_t *cc = &req.create_cell;
+ created_cell_t *cell_out = &rpl.created_cell;
+ int n;
+ n = onion_skin_server_handshake(cc->handshake_type,
+ cc->onionskin, cc->handshake_len,
+ &onion_keys,
+ cell_out->reply,
+ rpl.keys, CPATH_KEY_MATERIAL_LEN,
+ rpl.rend_auth_material);
+ if (n < 0) {
/* failure */
log_debug(LD_OR,"onion_skin_server_handshake failed.");
- *buf = 0; /* indicate failure in first byte */
- memcpy(buf+1,tag,TAG_LEN);
- /* send all zeros as answer */
- memset(buf+1+TAG_LEN, 0, LEN_ONION_RESPONSE-(1+TAG_LEN));
+ memset(&rpl, 0, sizeof(rpl));
+ memcpy(rpl.tag, req.tag, TAG_LEN);
+ rpl.success = 0;
} else {
/* success */
log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
- buf[0] = 1; /* 1 means success */
- memcpy(buf+1,tag,TAG_LEN);
- memcpy(buf+1+TAG_LEN,reply_to_proxy,TAP_ONIONSKIN_REPLY_LEN);
- memcpy(buf+1+TAG_LEN+TAP_ONIONSKIN_REPLY_LEN,keys,
- CPATH_KEY_MATERIAL_LEN);
+ memcpy(rpl.tag, req.tag, TAG_LEN);
+ cell_out->handshake_len = n;
+ switch (cc->cell_type) {
+ case CELL_CREATE:
+ cell_out->cell_type = CELL_CREATED; break;
+ case CELL_CREATE2:
+ cell_out->cell_type = CELL_CREATED2; break;
+ case CELL_CREATE_FAST:
+ cell_out->cell_type = CELL_CREATED_FAST; break;
+ default:
+ tor_assert(0);
+ goto end;
+ }
+ rpl.success = 1;
}
- if (write_all(fd, buf, LEN_ONION_RESPONSE, 1) != LEN_ONION_RESPONSE) {
+ rpl.magic = CPUWORKER_REPLY_MAGIC;
+ if (write_all(fd, (void*)&rpl, sizeof(rpl), 1) != sizeof(rpl)) {
log_err(LD_BUG,"writing response buf failed. Exiting.");
goto end;
}
log_debug(LD_OR,"finished writing response.");
+ } else if (req.task == CPUWORKER_TASK_SHUTDOWN) {
+ log_info(LD_OR,"Clean shutdown: exiting");
+ goto end;
}
+ memwipe(&req, 0, sizeof(req));
+ memwipe(&rpl, 0, sizeof(req));
}
end:
+ memwipe(&req, 0, sizeof(req));
+ memwipe(&rpl, 0, sizeof(req));
release_server_onion_keys(&onion_keys);
tor_close_socket(fd);
crypto_thread_cleanup();
@@ -394,7 +406,7 @@ static void
process_pending_task(connection_t *cpuworker)
{
or_circuit_t *circ;
- char *onionskin = NULL;
+ create_cell_t *onionskin = NULL;
tor_assert(cpuworker);
@@ -447,10 +459,10 @@ cull_wedged_cpuworkers(void)
*/
int
assign_onionskin_to_cpuworker(connection_t *cpuworker,
- or_circuit_t *circ, char *onionskin)
+ or_circuit_t *circ,
+ create_cell_t *onionskin)
{
- char qbuf[1];
- char tag[TAG_LEN];
+ cpuworker_request_t req;
time_t now = approx_time();
static time_t last_culled_cpuworkers = 0;
@@ -486,7 +498,10 @@ assign_onionskin_to_cpuworker(connection_t *cpuworker,
tor_free(onionskin);
return -1;
}
- tag_pack(tag, circ->p_chan->global_identifier,
+
+ memset(&req, 0, sizeof(req));
+ req.magic = CPUWORKER_REQUEST_MAGIC;
+ tag_pack(req.tag, circ->p_chan->global_identifier,
circ->p_circ_id);
cpuworker->state = CPUWORKER_STATE_BUSY_ONION;
@@ -496,11 +511,13 @@ assign_onionskin_to_cpuworker(connection_t *cpuworker,
cpuworker->timestamp_lastwritten = time(NULL);
num_cpuworkers_busy++;
- qbuf[0] = CPUWORKER_TASK_ONION;
- connection_write_to_buf(qbuf, 1, cpuworker);
- connection_write_to_buf(tag, sizeof(tag), cpuworker);
- connection_write_to_buf(onionskin, TAP_ONIONSKIN_CHALLENGE_LEN, cpuworker);
+ req.task = CPUWORKER_TASK_ONION;
+ memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
+
tor_free(onionskin);
+
+ connection_write_to_buf((void*)&req, sizeof(req), cpuworker);
+ memwipe(&req, 0, sizeof(req));
}
return 0;
}