diff options
author | Nick Mathewson <nickm@torproject.org> | 2011-07-20 09:50:53 -0400 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2011-07-20 09:50:53 -0400 |
commit | eaa1c05397c1a6cf2f58b7c41e388311d5aa8ffb (patch) | |
tree | b304aaca9b8d5364517d4ab23ffd232cf420858d /src | |
parent | 195bcb6150eeaebab31a44998e2c567d78f9b936 (diff) | |
parent | 9a7c16fb00c6ffc32ef7d6cc7fbede5258fe4390 (diff) | |
download | tor-eaa1c05397c1a6cf2f58b7c41e388311d5aa8ffb.tar.gz tor-eaa1c05397c1a6cf2f58b7c41e388311d5aa8ffb.zip |
Merge branch 'optimistic-client'
The conflicts are with the proposal 171 circuit isolation code, and
they're all trivial: they're just a matter of both branches adding
some unrelated code in the same places.
Conflicts:
src/or/circuituse.c
src/or/connection.c
Diffstat (limited to 'src')
-rw-r--r-- | src/or/buffers.c | 70 | ||||
-rw-r--r-- | src/or/buffers.h | 19 | ||||
-rw-r--r-- | src/or/circuituse.c | 21 | ||||
-rw-r--r-- | src/or/connection.c | 6 | ||||
-rw-r--r-- | src/or/connection_edge.c | 52 | ||||
-rw-r--r-- | src/or/or.h | 22 | ||||
-rw-r--r-- | src/or/relay.c | 44 | ||||
-rw-r--r-- | src/or/routerparse.c | 3 | ||||
-rw-r--r-- | src/test/test.c | 66 |
9 files changed, 300 insertions, 3 deletions
diff --git a/src/or/buffers.c b/src/or/buffers.c index e2b0bd0e6f..5b9e55ebd5 100644 --- a/src/or/buffers.c +++ b/src/or/buffers.c @@ -557,6 +557,39 @@ buf_free(buf_t *buf) tor_free(buf); } +/** Return a new copy of <b>in_chunk</b> */ +static chunk_t * +chunk_copy(const chunk_t *in_chunk) +{ + chunk_t *newch = tor_memdup(in_chunk, CHUNK_ALLOC_SIZE(in_chunk->memlen)); + newch->next = NULL; + if (in_chunk->data) { + off_t offset = in_chunk->data - in_chunk->mem; + newch->data = newch->mem + offset; + } + return newch; +} + +/** Return a new copy of <b>buf</b> */ +buf_t * +buf_copy(const buf_t *buf) +{ + chunk_t *ch; + buf_t *out = buf_new(); + out->default_chunk_size = buf->default_chunk_size; + for (ch = buf->head; ch; ch = ch->next) { + chunk_t *newch = chunk_copy(ch); + if (out->tail) { + out->tail->next = newch; + out->tail = newch; + } else { + out->head = out->tail = newch; + } + } + out->datalen = buf->datalen; + return out; +} + /** Append a new chunk with enough capacity to hold <b>capacity</b> bytes to * the tail of <b>buf</b>. If <b>capped</b>, don't allocate a chunk bigger * than MAX_CHUNK_ALLOC. */ @@ -2374,6 +2407,43 @@ write_to_evbuffer_zlib(struct evbuffer *buf, tor_zlib_state_t *state, } #endif +/** Set *<b>output</b> to contain a copy of the data in *<b>input</b> */ +int +generic_buffer_set_to_copy(generic_buffer_t **output, + const generic_buffer_t *input) +{ +#ifdef USE_BUFFEREVENTS + struct evbuffer_ptr ptr; + size_t remaining = evbuffer_get_length(input); + if (*output) { + evbuffer_drain(*output, evbuffer_get_length(*output)); + } else { + if (!(*output = evbuffer_new())) + return -1; + } + evbuffer_ptr_set((struct evbuffer*)input, &ptr, 0, EVBUFFER_PTR_SET); + while (remaining) { + struct evbuffer_iovec v[4]; + int n_used, i; + n_used = evbuffer_peek((struct evbuffer*)input, -1, &ptr, v, 4); + if (n_used < 0) + return -1; + for (i=0;i<n_used;++i) { + evbuffer_add(*output, v[i].iov_base, v[i].iov_len); + tor_assert(v[i].iov_len <= remaining); + remaining -= v[i].iov_len; + evbuffer_ptr_set((struct evbuffer*)input, + &ptr, v[i].iov_len, EVBUFFER_PTR_ADD); + } + } +#else + if (*output) + buf_free(*output); + *output = buf_copy(input); +#endif + return 0; +} + /** Log an error and exit if <b>buf</b> is corrupted. */ void diff --git a/src/or/buffers.h b/src/or/buffers.h index b0161b9c2c..7b2a2acc3c 100644 --- a/src/or/buffers.h +++ b/src/or/buffers.h @@ -16,6 +16,7 @@ buf_t *buf_new(void); buf_t *buf_new_with_capacity(size_t size); void buf_free(buf_t *buf); void buf_clear(buf_t *buf); +buf_t *buf_copy(const buf_t *buf); void buf_shrink(buf_t *buf); void buf_shrink_freelists(int free_all); void buf_dump_freelist_sizes(int severity); @@ -67,6 +68,24 @@ int write_to_evbuffer_zlib(struct evbuffer *buf, tor_zlib_state_t *state, int done); #endif +#ifdef USE_BUFFEREVENTS +#define generic_buffer_new() evbuffer_new() +#define generic_buffer_len(b) evbuffer_get_length((b)) +#define generic_buffer_add(b,dat,len) evbuffer_add((b),(dat),(len)) +#define generic_buffer_get(b,buf,buflen) evbuffer_remove((b),(buf),(buflen)) +#define generic_buffer_clear(b) evbuffer_drain((b), evbuffer_get_length((b))) +#define generic_buffer_free(b) evbuffer_free((b)) +#else +#define generic_buffer_new() buf_new() +#define generic_buffer_len(b) buf_datalen((b)) +#define generic_buffer_add(b,dat,len) write_to_buf((dat),(len),(b)) +#define generic_buffer_get(b,buf,buflen) fetch_from_buf((buf),(buflen),(b)) +#define generic_buffer_clear(b) buf_clear((b)) +#define generic_buffer_free(b) buf_free((b)) +#endif +int generic_buffer_set_to_copy(generic_buffer_t **output, + const generic_buffer_t *input); + void assert_buf_ok(buf_t *buf); #ifdef BUFFERS_PRIVATE diff --git a/src/or/circuituse.c b/src/or/circuituse.c index b4860440cb..1bc518b7d4 100644 --- a/src/or/circuituse.c +++ b/src/or/circuituse.c @@ -738,6 +738,7 @@ circuit_detach_stream(circuit_t *circ, edge_connection_t *conn) tor_assert(conn); conn->cpath_layer = NULL; /* make sure we don't keep a stale pointer */ + conn->exit_allows_optimistic_data = 0; conn->on_circuit = NULL; if (CIRCUIT_IS_ORIGIN(circ)) { @@ -1548,6 +1549,8 @@ static void link_apconn_to_circ(edge_connection_t *apconn, origin_circuit_t *circ, crypt_path_t *cpath) { + const node_t *exitnode; + /* add it into the linked list of streams on this circuit */ log_debug(LD_APP|LD_CIRC, "attaching new conn to circ. n_circ_id %d.", circ->_base.n_circ_id); @@ -1570,6 +1573,24 @@ link_apconn_to_circ(edge_connection_t *apconn, origin_circuit_t *circ, circ->isolation_any_streams_attached = 1; connection_edge_update_circuit_isolation(apconn, circ, 0); + + /* See if we can use optimistic data on this circuit */ + if (apconn->cpath_layer->extend_info && + (exitnode = node_get_by_id( + apconn->cpath_layer->extend_info->identity_digest)) && + exitnode->rs) { + /* Okay; we know what exit node this is. */ + if (circ->_base.purpose == CIRCUIT_PURPOSE_C_GENERAL && + exitnode->rs->version_supports_optimistic_data) + apconn->exit_allows_optimistic_data = 1; + else + apconn->exit_allows_optimistic_data = 0; + log_info(LD_APP, "Looks like completed circuit to %s %s allow " + "optimistic data for connection to %s", + safe_str_client(node_describe(exitnode)), + apconn->exit_allows_optimistic_data ? "does" : "doesn't", + safe_str_client(apconn->socks_request->address)); + } } /** Return true iff <b>address</b> is matched by one of the entries in diff --git a/src/or/connection.c b/src/or/connection.c index 59a7b80deb..3950f90152 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -470,6 +470,12 @@ _connection_free(connection_t *conn) tor_free(edge_conn->original_dest_address); if (edge_conn->socks_request) socks_request_free(edge_conn->socks_request); + if (edge_conn->pending_optimistic_data) { + generic_buffer_free(edge_conn->pending_optimistic_data); + } + if (edge_conn->sending_optimistic_data) { + generic_buffer_free(edge_conn->sending_optimistic_data); + } rend_data_free(edge_conn->rend_data); } if (conn->type == CONN_TYPE_CONTROL) { diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c index 4bbb080124..72394e8777 100644 --- a/src/or/connection_edge.c +++ b/src/or/connection_edge.c @@ -57,6 +57,7 @@ static int connection_exit_connect_dir(edge_connection_t *exitconn); static int address_is_in_virtual_range(const char *addr); static int consider_plaintext_ports(edge_connection_t *conn, uint16_t port); static void clear_trackexithost_mappings(const char *exitname); +static int connection_ap_supports_optimistic_data(const edge_connection_t *); /** An AP stream has failed/finished. If it hasn't already sent back * a socks reply, send one now (based on endreason). Also set @@ -154,10 +155,25 @@ connection_edge_process_inbuf(edge_connection_t *conn, int package_partial) return -1; } return 0; + case AP_CONN_STATE_CONNECT_WAIT: + if (connection_ap_supports_optimistic_data(conn)) { + log_info(LD_EDGE, + "data from edge while in '%s' state. Sending it anyway. " + "package_partial=%d, buflen=%ld", + conn_state_to_string(conn->_base.type, conn->_base.state), + package_partial, connection_get_inbuf_len(TO_CONN(conn))); + if (connection_edge_package_raw_inbuf(conn, package_partial, NULL)<0) { + /* (We already sent an end cell if possible) */ + connection_mark_for_close(TO_CONN(conn)); + return -1; + } + return 0; + } + /* Fall through if the connection is on a circuit without optimistic + * data support. */ case EXIT_CONN_STATE_CONNECTING: case AP_CONN_STATE_RENDDESC_WAIT: case AP_CONN_STATE_CIRCUIT_WAIT: - case AP_CONN_STATE_CONNECT_WAIT: case AP_CONN_STATE_RESOLVE_WAIT: case AP_CONN_STATE_CONTROLLER_WAIT: log_info(LD_EDGE, @@ -722,6 +738,12 @@ connection_ap_detach_retriable(edge_connection_t *conn, origin_circuit_t *circ, { control_event_stream_status(conn, STREAM_EVENT_FAILED_RETRIABLE, reason); conn->_base.timestamp_lastread = time(NULL); + + if (conn->pending_optimistic_data) { + generic_buffer_set_to_copy(&conn->sending_optimistic_data, + conn->pending_optimistic_data); + } + if (!get_options()->LeaveStreamsUnattached || conn->use_begindir) { /* If we're attaching streams ourself, or if this connection is * a tunneled directory connection, then just attach it. */ @@ -2337,6 +2359,22 @@ get_unique_stream_id_by_circ(origin_circuit_t *circ) return test_stream_id; } +/** Return true iff <b>conn</b> is linked to a circuit and configured to use + * an exit that supports optimistic data. */ +static int +connection_ap_supports_optimistic_data(const edge_connection_t *conn) +{ + tor_assert(conn->_base.type == CONN_TYPE_AP); + /* We can only send optimistic data if we're connected to an open + general circuit. */ + if (conn->on_circuit == NULL || + conn->on_circuit->state != CIRCUIT_STATE_OPEN || + conn->on_circuit->purpose != CIRCUIT_PURPOSE_C_GENERAL) + return 0; + + return conn->exit_allows_optimistic_data; +} + /** Write a relay begin cell, using destaddr and destport from ap_conn's * socks_request field, and send it down circ. * @@ -2398,6 +2436,18 @@ connection_ap_handshake_send_begin(edge_connection_t *ap_conn) log_info(LD_APP,"Address/port sent, ap socket %d, n_circ_id %d", ap_conn->_base.s, circ->_base.n_circ_id); control_event_stream_status(ap_conn, STREAM_EVENT_SENT_CONNECT, 0); + + /* If there's queued-up data, send it now */ + if ((connection_get_inbuf_len(TO_CONN(ap_conn)) || + ap_conn->sending_optimistic_data) && + connection_ap_supports_optimistic_data(ap_conn)) { + log_info(LD_APP, "Sending up to %ld bytes of queued-up data", + connection_get_inbuf_len(TO_CONN(ap_conn))); + if (connection_edge_package_raw_inbuf(ap_conn, 1, NULL) < 0) { + connection_mark_for_close(TO_CONN(ap_conn)); + } + } + return 0; } diff --git a/src/or/or.h b/src/or/or.h index 47cee35e5b..16c792771e 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -930,6 +930,11 @@ typedef struct { typedef struct buf_t buf_t; typedef struct socks_request_t socks_request_t; +#ifdef USE_BUFFEREVENTS +#define generic_buffer_t struct evbuffer +#else +#define generic_buffer_t buf_t +#endif /* Values for connection_t.magic: used to make sure that downcasts (casts from * connection_t to foo_connection_t) are safe. */ @@ -1263,6 +1268,20 @@ typedef struct edge_connection_t { * NATd connection */ unsigned int is_transparent_ap:1; + /** For AP connections only: Set if this connection's target exit node + * allows optimistic data. (That is, data sent on this stream before + * the exit has sent a CONNECTED cell.)*/ + unsigned int exit_allows_optimistic_data : 1; + + /** For AP connections only: buffer for data that we have sent + * optimistically, which we might need to re-send if we have to + * retry this connection. */ + generic_buffer_t *pending_optimistic_data; + /* For AP connections only: buffer for data that we previously sent + * optimistically which we are currently re-sending as we retry this + * connection. */ + generic_buffer_t *sending_optimistic_data; + /** If this is a DNSPort connection, this field holds the pending DNS * request that we're going to try to answer. */ struct evdns_server_request *dns_server_request; @@ -1706,6 +1725,9 @@ typedef struct routerstatus_t { /** True iff this router is a version that, if it caches directory info, * we can get microdescriptors from. */ unsigned int version_supports_microdesc_cache:1; + /** True iff this router is a version that allows DATA cells to arrive on + * a stream before it has sent a CONNECTED cell. */ + unsigned int version_supports_optimistic_data:1; unsigned int has_bandwidth:1; /**< The vote/consensus had bw info */ unsigned int has_exitsummary:1; /**< The vote/consensus had exit summaries */ diff --git a/src/or/relay.c b/src/or/relay.c index df6d0a8a5f..3fa31a3f33 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -944,6 +944,12 @@ connection_edge_process_relay_cell_not_open( break; } } + /* This is definitely a success, so forget about any pending data we + * had sent. */ + if (conn->pending_optimistic_data) { + generic_buffer_free(conn->pending_optimistic_data); + conn->pending_optimistic_data = NULL; + } /* handle anything that might have queued */ if (connection_edge_package_raw_inbuf(conn, 1, NULL) < 0) { @@ -1343,6 +1349,10 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, char payload[CELL_PAYLOAD_SIZE]; circuit_t *circ; unsigned domain = conn->cpath_layer ? LD_APP : LD_EXIT; + int sending_from_optimistic = 0; + const int sending_optimistically = + conn->_base.type == CONN_TYPE_AP && + conn->_base.state != AP_CONN_STATE_OPEN; tor_assert(conn); @@ -1375,7 +1385,18 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, return 0; } - amount_to_process = connection_get_inbuf_len(TO_CONN(conn)); + sending_from_optimistic = conn->sending_optimistic_data != NULL; + + if (PREDICT_UNLIKELY(sending_from_optimistic)) { + amount_to_process = generic_buffer_len(conn->sending_optimistic_data); + if (PREDICT_UNLIKELY(!amount_to_process)) { + log_warn(LD_BUG, "sending_optimistic_data was non-NULL but empty"); + amount_to_process = connection_get_inbuf_len(TO_CONN(conn)); + sending_from_optimistic = 0; + } + } else { + amount_to_process = connection_get_inbuf_len(TO_CONN(conn)); + } if (!amount_to_process) return 0; @@ -1391,11 +1412,30 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, stats_n_data_bytes_packaged += length; stats_n_data_cells_packaged += 1; - connection_fetch_from_buf(payload, length, TO_CONN(conn)); + if (PREDICT_UNLIKELY(sending_from_optimistic)) { + /* XXX023 We could be more efficient here by sometimes packing + * previously-sent optimistic data in the same cell with data + * from the inbuf. */ + generic_buffer_get(conn->sending_optimistic_data, payload, length); + if (!generic_buffer_len(conn->sending_optimistic_data)) { + generic_buffer_free(conn->sending_optimistic_data); + conn->sending_optimistic_data = NULL; + } + } else { + connection_fetch_from_buf(payload, length, TO_CONN(conn)); + } log_debug(domain,"(%d) Packaging %d bytes (%d waiting).", conn->_base.s, (int)length, (int)connection_get_inbuf_len(TO_CONN(conn))); + if (sending_optimistically && !sending_from_optimistic) { + /* This is new optimistic data; remember it in case we need to detach and + retry */ + if (!conn->pending_optimistic_data) + conn->pending_optimistic_data = generic_buffer_new(); + generic_buffer_add(conn->pending_optimistic_data, payload, length); + } + if (connection_edge_send_command(conn, RELAY_COMMAND_DATA, payload, length) < 0 ) /* circuit got marked for close, don't continue, don't need to mark conn */ diff --git a/src/or/routerparse.c b/src/or/routerparse.c index d1b2cd0fb7..5f160d054e 100644 --- a/src/or/routerparse.c +++ b/src/or/routerparse.c @@ -2092,6 +2092,7 @@ routerstatus_parse_entry_from_string(memarea_t *area, rs->version_supports_extrainfo_upload = 1; rs->version_supports_conditional_consensus = 1; rs->version_supports_microdesc_cache = 1; + rs->version_supports_optimistic_data = 1; } else { rs->version_supports_begindir = tor_version_as_new_as(tok->args[0], "0.2.0.1-alpha"); @@ -2109,6 +2110,8 @@ routerstatus_parse_entry_from_string(memarea_t *area, */ rs->version_supports_microdesc_cache = tor_version_as_new_as(tok->args[0], "0.2.3.0-alpha"); + rs->version_supports_optimistic_data = + tor_version_as_new_as(tok->args[0], "0.2.3.1-alpha"); } if (vote_rs) { vote_rs->version = tor_strdup(tok->args[0]); diff --git a/src/test/test.c b/src/test/test.c index 05f5922a5b..e7d4ccbd79 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -569,6 +569,71 @@ test_socks_5_auth_before_negotiation(void *ptr) ; } +static void +test_buffer_copy(void *arg) +{ + generic_buffer_t *buf=NULL, *buf2=NULL; + const char *s; + size_t len; + char b[256]; + int i; + (void)arg; + + buf = generic_buffer_new(); + tt_assert(buf); + + /* Copy an empty buffer. */ + tt_int_op(0, ==, generic_buffer_set_to_copy(&buf2, buf)); + tt_assert(buf2); + tt_int_op(0, ==, generic_buffer_len(buf2)); + + /* Now try with a short buffer. */ + s = "And now comes an act of enormous enormance!"; + len = strlen(s); + generic_buffer_add(buf, s, len); + tt_int_op(len, ==, generic_buffer_len(buf)); + /* Add junk to buf2 so we can test replacing.*/ + generic_buffer_add(buf2, "BLARG", 5); + tt_int_op(0, ==, generic_buffer_set_to_copy(&buf2, buf)); + tt_int_op(len, ==, generic_buffer_len(buf2)); + generic_buffer_get(buf2, b, len); + test_mem_op(b, ==, s, len); + /* Now free buf2 and retry so we can test allocating */ + generic_buffer_free(buf2); + buf2 = NULL; + tt_int_op(0, ==, generic_buffer_set_to_copy(&buf2, buf)); + tt_int_op(len, ==, generic_buffer_len(buf2)); + generic_buffer_get(buf2, b, len); + test_mem_op(b, ==, s, len); + /* Clear buf for next test */ + generic_buffer_get(buf, b, len); + tt_int_op(generic_buffer_len(buf),==,0); + + /* Okay, now let's try a bigger buffer. */ + s = "Quis autem vel eum iure reprehenderit qui in ea voluptate velit " + "esse quam nihil molestiae consequatur, vel illum qui dolorem eum " + "fugiat quo voluptas nulla pariatur?"; + len = strlen(s); + for (i = 0; i < 256; ++i) { + b[0]=i; + generic_buffer_add(buf, b, 1); + generic_buffer_add(buf, s, len); + } + tt_int_op(0, ==, generic_buffer_set_to_copy(&buf2, buf)); + tt_int_op(generic_buffer_len(buf2), ==, generic_buffer_len(buf)); + for (i = 0; i < 256; ++i) { + generic_buffer_get(buf2, b, len+1); + tt_int_op((unsigned char)b[0],==,i); + test_mem_op(b+1, ==, s, len); + } + + done: + if (buf) + generic_buffer_free(buf); + if (buf2) + generic_buffer_free(buf2); +} + /** Run unit tests for buffers.c */ static void test_buffers(void) @@ -1612,6 +1677,7 @@ const struct testcase_setup_t legacy_setup = { static struct testcase_t test_array[] = { ENT(buffers), + { "buffer_copy", test_buffer_copy, 0, NULL, NULL }, ENT(onion_handshake), ENT(circuit_timeout), ENT(policies), |