From 326d5c156dce64c115a76427a4dabb0e93a90033 Mon Sep 17 00:00:00 2001 From: Ian Goldberg Date: Mon, 18 Jul 2011 12:56:45 -0400 Subject: Implement the client side of optimistic data (proposal 174) --- src/or/connection_edge.c | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c index f2ddfc76dc..7551c39439 100644 --- a/src/or/connection_edge.c +++ b/src/or/connection_edge.c @@ -157,13 +157,22 @@ connection_edge_process_inbuf(edge_connection_t *conn, int package_partial) case EXIT_CONN_STATE_CONNECTING: case AP_CONN_STATE_RENDDESC_WAIT: case AP_CONN_STATE_CIRCUIT_WAIT: - case AP_CONN_STATE_CONNECT_WAIT: case AP_CONN_STATE_RESOLVE_WAIT: case AP_CONN_STATE_CONTROLLER_WAIT: log_info(LD_EDGE, "data from edge while in '%s' state. Leaving it on buffer.", conn_state_to_string(conn->_base.type, conn->_base.state)); return 0; + case AP_CONN_STATE_CONNECT_WAIT: + log_info(LD_EDGE, + "data from edge while in '%s' state. Sending it anyway. package_partial=%d, buflen=%d", + conn_state_to_string(conn->_base.type, conn->_base.state), package_partial, buf_datalen(TO_CONN(conn)->inbuf)); + if (connection_edge_package_raw_inbuf(conn, package_partial, NULL) < 0) { + /* (We already sent an end cell if possible) */ + connection_mark_for_close(TO_CONN(conn)); + return -1; + } + return 0; } log_warn(LD_BUG,"Got unexpected state %d. Closing.",conn->_base.state); tor_fragile_assert(); @@ -2395,6 +2404,13 @@ connection_ap_handshake_send_begin(edge_connection_t *ap_conn) log_info(LD_APP,"Address/port sent, ap socket %d, n_circ_id %d", ap_conn->_base.s, circ->_base.n_circ_id); control_event_stream_status(ap_conn, STREAM_EVENT_SENT_CONNECT, 0); + + /* If there's queued-up data, send it now */ + log_warn(LD_APP, "Possibly sending queued-up data: %d", buf_datalen(TO_CONN(ap_conn)->inbuf)); + if (connection_edge_package_raw_inbuf(ap_conn, 1, NULL) < 0) { + connection_mark_for_close(TO_CONN(ap_conn)); + } + return 0; } -- cgit v1.2.3-54-g00ecf From ba5d75810493f237edbb7f4f149d61f1ca08e605 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 18 Jul 2011 13:00:48 -0400 Subject: Initial optimistic_client fixes - Conform to make check-spaces - Build without warnings from passing size_t to %d - Use connection_get_inbuf_len(), not buf_datalen (otherwise bufferevents won't work). - Don't log that we're using this feature at warn. --- src/or/connection_edge.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c index 7551c39439..6f91d1a293 100644 --- a/src/or/connection_edge.c +++ b/src/or/connection_edge.c @@ -165,8 +165,10 @@ connection_edge_process_inbuf(edge_connection_t *conn, int package_partial) return 0; case AP_CONN_STATE_CONNECT_WAIT: log_info(LD_EDGE, - "data from edge while in '%s' state. Sending it anyway. package_partial=%d, buflen=%d", - conn_state_to_string(conn->_base.type, conn->_base.state), package_partial, buf_datalen(TO_CONN(conn)->inbuf)); + "data from edge while in '%s' state. Sending it anyway. " + "package_partial=%d, buflen=%ld", + conn_state_to_string(conn->_base.type, conn->_base.state), + package_partial, connection_get_inbuf_len(TO_CONN(conn))); if (connection_edge_package_raw_inbuf(conn, package_partial, NULL) < 0) { /* (We already sent an end cell if possible) */ connection_mark_for_close(TO_CONN(conn)); @@ -2406,7 +2408,8 @@ connection_ap_handshake_send_begin(edge_connection_t *ap_conn) control_event_stream_status(ap_conn, STREAM_EVENT_SENT_CONNECT, 0); /* If there's queued-up data, send it now */ - log_warn(LD_APP, "Possibly sending queued-up data: %d", buf_datalen(TO_CONN(ap_conn)->inbuf)); + log_info(LD_APP, "Possibly sending queued-up data: %ld", + connection_get_inbuf_len(TO_CONN(ap_conn))); if (connection_edge_package_raw_inbuf(ap_conn, 1, NULL) < 0) { connection_mark_for_close(TO_CONN(ap_conn)); } -- cgit v1.2.3-54-g00ecf From 1e441df2d0d06aa66eaf2c0e00a42d7fe9c39c87 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 18 Jul 2011 13:56:22 -0400 Subject: Only use optimistic data with exits that support it This adds a little code complexity: we need to remember for each node whether it supports the right feature, and then check for each connection whether it's exiting at such a node. We store this in a flag in the edge_connection_t, and set that flag at link time. --- src/or/circuituse.c | 21 ++++++++++++++++++ src/or/connection_edge.c | 56 ++++++++++++++++++++++++++++++++++-------------- src/or/or.h | 8 +++++++ src/or/routerparse.c | 3 +++ 4 files changed, 72 insertions(+), 16 deletions(-) (limited to 'src') diff --git a/src/or/circuituse.c b/src/or/circuituse.c index 460c41f75d..3c8cacb2cd 100644 --- a/src/or/circuituse.c +++ b/src/or/circuituse.c @@ -677,6 +677,7 @@ circuit_detach_stream(circuit_t *circ, edge_connection_t *conn) tor_assert(conn); conn->cpath_layer = NULL; /* make sure we don't keep a stale pointer */ + conn->exit_allows_optimistic_data = 0; conn->on_circuit = NULL; if (CIRCUIT_IS_ORIGIN(circ)) { @@ -1449,6 +1450,8 @@ static void link_apconn_to_circ(edge_connection_t *apconn, origin_circuit_t *circ, crypt_path_t *cpath) { + const node_t *exitnode; + /* add it into the linked list of streams on this circuit */ log_debug(LD_APP|LD_CIRC, "attaching new conn to circ. n_circ_id %d.", circ->_base.n_circ_id); @@ -1468,6 +1471,24 @@ link_apconn_to_circ(edge_connection_t *apconn, origin_circuit_t *circ, tor_assert(circ->cpath->prev->state == CPATH_STATE_OPEN); apconn->cpath_layer = circ->cpath->prev; } + + /* See if we can use optimistic data on this circuit */ + if (apconn->cpath_layer->extend_info && + (exitnode = node_get_by_id( + apconn->cpath_layer->extend_info->identity_digest)) && + exitnode->rs) { + /* Okay; we know what exit node this is. */ + if (circ->_base.purpose == CIRCUIT_PURPOSE_C_GENERAL && + exitnode->rs->version_supports_optimistic_data) + apconn->exit_allows_optimistic_data = 1; + else + apconn->exit_allows_optimistic_data = 0; + log_info(LD_APP, "Looks like completed circuit to %s %s allow " + "optimistic data for connection to %s", + safe_str_client(node_describe(exitnode)), + apconn->exit_allows_optimistic_data ? "does" : "doesn't", + safe_str_client(apconn->socks_request->address)); + } } /** Return true iff address is matched by one of the entries in diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c index 6f91d1a293..20f8bc9160 100644 --- a/src/or/connection_edge.c +++ b/src/or/connection_edge.c @@ -57,6 +57,7 @@ static int connection_exit_connect_dir(edge_connection_t *exitconn); static int address_is_in_virtual_range(const char *addr); static int consider_plaintext_ports(edge_connection_t *conn, uint16_t port); static void clear_trackexithost_mappings(const char *exitname); +static int connection_ap_supports_optimistic_data(const edge_connection_t *); /** An AP stream has failed/finished. If it hasn't already sent back * a socks reply, send one now (based on endreason). Also set @@ -154,6 +155,22 @@ connection_edge_process_inbuf(edge_connection_t *conn, int package_partial) return -1; } return 0; + case AP_CONN_STATE_CONNECT_WAIT: + if (connection_ap_supports_optimistic_data(conn)) { + log_info(LD_EDGE, + "data from edge while in '%s' state. Sending it anyway. " + "package_partial=%d, buflen=%ld", + conn_state_to_string(conn->_base.type, conn->_base.state), + package_partial, connection_get_inbuf_len(TO_CONN(conn))); + if (connection_edge_package_raw_inbuf(conn, package_partial, NULL)<0) { + /* (We already sent an end cell if possible) */ + connection_mark_for_close(TO_CONN(conn)); + return -1; + } + return 0; + } + /* Fall through if the connection is on a circuit without optimistic + * data support. */ case EXIT_CONN_STATE_CONNECTING: case AP_CONN_STATE_RENDDESC_WAIT: case AP_CONN_STATE_CIRCUIT_WAIT: @@ -163,18 +180,6 @@ connection_edge_process_inbuf(edge_connection_t *conn, int package_partial) "data from edge while in '%s' state. Leaving it on buffer.", conn_state_to_string(conn->_base.type, conn->_base.state)); return 0; - case AP_CONN_STATE_CONNECT_WAIT: - log_info(LD_EDGE, - "data from edge while in '%s' state. Sending it anyway. " - "package_partial=%d, buflen=%ld", - conn_state_to_string(conn->_base.type, conn->_base.state), - package_partial, connection_get_inbuf_len(TO_CONN(conn))); - if (connection_edge_package_raw_inbuf(conn, package_partial, NULL) < 0) { - /* (We already sent an end cell if possible) */ - connection_mark_for_close(TO_CONN(conn)); - return -1; - } - return 0; } log_warn(LD_BUG,"Got unexpected state %d. Closing.",conn->_base.state); tor_fragile_assert(); @@ -2345,6 +2350,22 @@ get_unique_stream_id_by_circ(origin_circuit_t *circ) return test_stream_id; } +/** Return true iff conn is linked to a circuit and configured to use + * an exit that supports optimistic data. */ +static int +connection_ap_supports_optimistic_data(const edge_connection_t *conn) +{ + tor_assert(conn->_base.type == CONN_TYPE_AP); + /* We can only send optimistic data if we're connected to an open + general circuit. */ + if (conn->on_circuit == NULL || + conn->on_circuit->state != CIRCUIT_STATE_OPEN || + conn->on_circuit->purpose != CIRCUIT_PURPOSE_C_GENERAL) + return 0; + + return conn->exit_allows_optimistic_data; +} + /** Write a relay begin cell, using destaddr and destport from ap_conn's * socks_request field, and send it down circ. * @@ -2408,10 +2429,13 @@ connection_ap_handshake_send_begin(edge_connection_t *ap_conn) control_event_stream_status(ap_conn, STREAM_EVENT_SENT_CONNECT, 0); /* If there's queued-up data, send it now */ - log_info(LD_APP, "Possibly sending queued-up data: %ld", - connection_get_inbuf_len(TO_CONN(ap_conn))); - if (connection_edge_package_raw_inbuf(ap_conn, 1, NULL) < 0) { - connection_mark_for_close(TO_CONN(ap_conn)); + if (connection_get_inbuf_len(TO_CONN(ap_conn)) && + connection_ap_supports_optimistic_data(ap_conn)) { + log_info(LD_APP, "Sending up to %ld bytes of queued-up data", + connection_get_inbuf_len(TO_CONN(ap_conn))); + if (connection_edge_package_raw_inbuf(ap_conn, 1, NULL) < 0) { + connection_mark_for_close(TO_CONN(ap_conn)); + } } return 0; diff --git a/src/or/or.h b/src/or/or.h index 7a2bde59fe..f805215b14 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1232,6 +1232,11 @@ typedef struct edge_connection_t { * NATd connection */ unsigned int is_transparent_ap:1; + /** Set if this connection's target exit node allows optimistic data. + * (That is, data sent on this stream before the exit has sent a + * CONNECTED cell.)*/ + unsigned int exit_allows_optimistic_data : 1; + /** If this is a DNSPort connection, this field holds the pending DNS * request that we're going to try to answer. */ struct evdns_server_request *dns_server_request; @@ -1667,6 +1672,9 @@ typedef struct routerstatus_t { /** True iff this router is a version that, if it caches directory info, * we can get microdescriptors from. */ unsigned int version_supports_microdesc_cache:1; + /** True iff this router is a version that allows DATA cells to arrive on + * a stream before it has sent a CONNECTED cell. */ + unsigned int version_supports_optimistic_data:1; unsigned int has_bandwidth:1; /**< The vote/consensus had bw info */ unsigned int has_exitsummary:1; /**< The vote/consensus had exit summaries */ diff --git a/src/or/routerparse.c b/src/or/routerparse.c index d1b2cd0fb7..5f160d054e 100644 --- a/src/or/routerparse.c +++ b/src/or/routerparse.c @@ -2092,6 +2092,7 @@ routerstatus_parse_entry_from_string(memarea_t *area, rs->version_supports_extrainfo_upload = 1; rs->version_supports_conditional_consensus = 1; rs->version_supports_microdesc_cache = 1; + rs->version_supports_optimistic_data = 1; } else { rs->version_supports_begindir = tor_version_as_new_as(tok->args[0], "0.2.0.1-alpha"); @@ -2109,6 +2110,8 @@ routerstatus_parse_entry_from_string(memarea_t *area, */ rs->version_supports_microdesc_cache = tor_version_as_new_as(tok->args[0], "0.2.3.0-alpha"); + rs->version_supports_optimistic_data = + tor_version_as_new_as(tok->args[0], "0.2.3.1-alpha"); } if (vote_rs) { vote_rs->version = tor_strdup(tok->args[0]); -- cgit v1.2.3-54-g00ecf From 34a52534bb43b30e19267968807a540ba33abe3b Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 18 Jul 2011 15:36:20 -0400 Subject: Add a generic_buffer_t to use the best buffer type we have on hand Also add a quick function to copy all the data in a buffer. (This one could be done much better, but let's see if it matters.) --- src/or/buffers.c | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/or/buffers.h | 19 +++++++++++++++ src/or/or.h | 5 ++++ 3 files changed, 94 insertions(+) (limited to 'src') diff --git a/src/or/buffers.c b/src/or/buffers.c index 256b507729..63132db204 100644 --- a/src/or/buffers.c +++ b/src/or/buffers.c @@ -557,6 +557,39 @@ buf_free(buf_t *buf) tor_free(buf); } +/** Return a new copy of in_chunk */ +static chunk_t * +chunk_copy(const chunk_t *in_chunk) +{ + chunk_t *newch = tor_memdup(in_chunk, CHUNK_ALLOC_SIZE(in_chunk->memlen)); + newch->next = NULL; + if (in_chunk->data) { + off_t offset = in_chunk->data - in_chunk->mem; + newch->data = newch->mem + offset; + } + return newch; +} + +/** Return a new copy of buf */ +buf_t * +buf_copy(const buf_t *buf) +{ + chunk_t *ch; + buf_t *out = buf_new(); + out->default_chunk_size = buf->default_chunk_size; + for (ch = buf->head; ch; ch = ch->next) { + chunk_t *newch = chunk_copy(ch); + if (out->tail) { + out->tail->next = newch; + out->tail = newch; + } else { + out->head = out->tail = newch; + } + } + out->datalen = buf->datalen; + return out; +} + /** Append a new chunk with enough capacity to hold capacity bytes to * the tail of buf. If capped, don't allocate a chunk bigger * than MAX_CHUNK_ALLOC. */ @@ -2374,6 +2407,43 @@ write_to_evbuffer_zlib(struct evbuffer *buf, tor_zlib_state_t *state, } #endif +/** Set *output to contain a copy of the data in *input */ +int +generic_buffer_set_to_copy(generic_buffer_t **output, + const generic_buffer_t *input) +{ +#ifdef USE_BUFFEREVENTS + struct evbuffer_ptr ptr; + size_t remaining = evbuffer_get_length(input); + if (*output) { + evbuffer_drain(*output, evbuffer_get_length(*output)); + } else { + if (!(*output = evbuffer_new())) + return -1; + } + evbuffer_ptr_set((struct evbuffer*)input, &ptr, 0, EVBUFFER_PTR_SET); + while (remaining) { + struct evbuffer_iovec v[4]; + int n_used, i; + n_used = evbuffer_peek((struct evbuffer*)input, -1, &ptr, v, 4); + if (n_used < 0) + return -1; + for (i=0;ibuf is corrupted. */ void diff --git a/src/or/buffers.h b/src/or/buffers.h index b0161b9c2c..7b2a2acc3c 100644 --- a/src/or/buffers.h +++ b/src/or/buffers.h @@ -16,6 +16,7 @@ buf_t *buf_new(void); buf_t *buf_new_with_capacity(size_t size); void buf_free(buf_t *buf); void buf_clear(buf_t *buf); +buf_t *buf_copy(const buf_t *buf); void buf_shrink(buf_t *buf); void buf_shrink_freelists(int free_all); void buf_dump_freelist_sizes(int severity); @@ -67,6 +68,24 @@ int write_to_evbuffer_zlib(struct evbuffer *buf, tor_zlib_state_t *state, int done); #endif +#ifdef USE_BUFFEREVENTS +#define generic_buffer_new() evbuffer_new() +#define generic_buffer_len(b) evbuffer_get_length((b)) +#define generic_buffer_add(b,dat,len) evbuffer_add((b),(dat),(len)) +#define generic_buffer_get(b,buf,buflen) evbuffer_remove((b),(buf),(buflen)) +#define generic_buffer_clear(b) evbuffer_drain((b), evbuffer_get_length((b))) +#define generic_buffer_free(b) evbuffer_free((b)) +#else +#define generic_buffer_new() buf_new() +#define generic_buffer_len(b) buf_datalen((b)) +#define generic_buffer_add(b,dat,len) write_to_buf((dat),(len),(b)) +#define generic_buffer_get(b,buf,buflen) fetch_from_buf((buf),(buflen),(b)) +#define generic_buffer_clear(b) buf_clear((b)) +#define generic_buffer_free(b) buf_free((b)) +#endif +int generic_buffer_set_to_copy(generic_buffer_t **output, + const generic_buffer_t *input); + void assert_buf_ok(buf_t *buf); #ifdef BUFFERS_PRIVATE diff --git a/src/or/or.h b/src/or/or.h index f805215b14..c784267333 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -930,6 +930,11 @@ typedef struct { typedef struct buf_t buf_t; typedef struct socks_request_t socks_request_t; +#ifdef USE_BUFFEREVENTS +#define generic_buffer_t struct evbuffer +#else +#define generic_buffer_t buf_t +#endif /* Values for connection_t.magic: used to make sure that downcasts (casts from * connection_t to foo_connection_t) are safe. */ -- cgit v1.2.3-54-g00ecf From 218e84b634d10c25a6d5d002a983ddb051278bdf Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 18 Jul 2011 15:38:05 -0400 Subject: Remember optimistically sent data until we have gotten a CONNECTED Since we can retry failed streams under some circumstances, we need to be ready to send data queued on them. --- src/or/connection.c | 7 ++++++- src/or/connection_edge.c | 9 ++++++++- src/or/or.h | 15 ++++++++++++--- src/or/relay.c | 44 ++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 68 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/or/connection.c b/src/or/connection.c index 8b9fb126d3..1ccd2b6608 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -444,7 +444,12 @@ _connection_free(connection_t *conn) tor_free(edge_conn->chosen_exit_name); if (edge_conn->socks_request) socks_request_free(edge_conn->socks_request); - + if (edge_conn->pending_optimistic_data) { + generic_buffer_free(edge_conn->pending_optimistic_data); + } + if (edge_conn->sending_optimistic_data) { + generic_buffer_free(edge_conn->sending_optimistic_data); + } rend_data_free(edge_conn->rend_data); } if (conn->type == CONN_TYPE_CONTROL) { diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c index 20f8bc9160..79f0db037d 100644 --- a/src/or/connection_edge.c +++ b/src/or/connection_edge.c @@ -738,6 +738,12 @@ connection_ap_detach_retriable(edge_connection_t *conn, origin_circuit_t *circ, { control_event_stream_status(conn, STREAM_EVENT_FAILED_RETRIABLE, reason); conn->_base.timestamp_lastread = time(NULL); + + if (conn->pending_optimistic_data) { + generic_buffer_set_to_copy(&conn->sending_optimistic_data, + conn->pending_optimistic_data); + } + if (!get_options()->LeaveStreamsUnattached || conn->use_begindir) { /* If we're attaching streams ourself, or if this connection is * a tunneled directory connection, then just attach it. */ @@ -2429,7 +2435,8 @@ connection_ap_handshake_send_begin(edge_connection_t *ap_conn) control_event_stream_status(ap_conn, STREAM_EVENT_SENT_CONNECT, 0); /* If there's queued-up data, send it now */ - if (connection_get_inbuf_len(TO_CONN(ap_conn)) && + if ((connection_get_inbuf_len(TO_CONN(ap_conn)) || + ap_conn->sending_optimistic_data) && connection_ap_supports_optimistic_data(ap_conn)) { log_info(LD_APP, "Sending up to %ld bytes of queued-up data", connection_get_inbuf_len(TO_CONN(ap_conn))); diff --git a/src/or/or.h b/src/or/or.h index c784267333..7097b5f0fb 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1237,11 +1237,20 @@ typedef struct edge_connection_t { * NATd connection */ unsigned int is_transparent_ap:1; - /** Set if this connection's target exit node allows optimistic data. - * (That is, data sent on this stream before the exit has sent a - * CONNECTED cell.)*/ + /** For AP connections only: Set if this connection's target exit node + * allows optimistic data. (That is, data sent on this stream before + * the exit has sent a CONNECTED cell.)*/ unsigned int exit_allows_optimistic_data : 1; + /** For AP connections only: buffer for data that we have sent + * optimistically, which we might need to re-send if we have to + * retry this connection. */ + generic_buffer_t *pending_optimistic_data; + /* For AP connections only: buffer for data that we previously sent + * optimistically which we are currently re-sending as we retry this + * connection. */ + generic_buffer_t *sending_optimistic_data; + /** If this is a DNSPort connection, this field holds the pending DNS * request that we're going to try to answer. */ struct evdns_server_request *dns_server_request; diff --git a/src/or/relay.c b/src/or/relay.c index df6d0a8a5f..3fa31a3f33 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -944,6 +944,12 @@ connection_edge_process_relay_cell_not_open( break; } } + /* This is definitely a success, so forget about any pending data we + * had sent. */ + if (conn->pending_optimistic_data) { + generic_buffer_free(conn->pending_optimistic_data); + conn->pending_optimistic_data = NULL; + } /* handle anything that might have queued */ if (connection_edge_package_raw_inbuf(conn, 1, NULL) < 0) { @@ -1343,6 +1349,10 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, char payload[CELL_PAYLOAD_SIZE]; circuit_t *circ; unsigned domain = conn->cpath_layer ? LD_APP : LD_EXIT; + int sending_from_optimistic = 0; + const int sending_optimistically = + conn->_base.type == CONN_TYPE_AP && + conn->_base.state != AP_CONN_STATE_OPEN; tor_assert(conn); @@ -1375,7 +1385,18 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, return 0; } - amount_to_process = connection_get_inbuf_len(TO_CONN(conn)); + sending_from_optimistic = conn->sending_optimistic_data != NULL; + + if (PREDICT_UNLIKELY(sending_from_optimistic)) { + amount_to_process = generic_buffer_len(conn->sending_optimistic_data); + if (PREDICT_UNLIKELY(!amount_to_process)) { + log_warn(LD_BUG, "sending_optimistic_data was non-NULL but empty"); + amount_to_process = connection_get_inbuf_len(TO_CONN(conn)); + sending_from_optimistic = 0; + } + } else { + amount_to_process = connection_get_inbuf_len(TO_CONN(conn)); + } if (!amount_to_process) return 0; @@ -1391,11 +1412,30 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, stats_n_data_bytes_packaged += length; stats_n_data_cells_packaged += 1; - connection_fetch_from_buf(payload, length, TO_CONN(conn)); + if (PREDICT_UNLIKELY(sending_from_optimistic)) { + /* XXX023 We could be more efficient here by sometimes packing + * previously-sent optimistic data in the same cell with data + * from the inbuf. */ + generic_buffer_get(conn->sending_optimistic_data, payload, length); + if (!generic_buffer_len(conn->sending_optimistic_data)) { + generic_buffer_free(conn->sending_optimistic_data); + conn->sending_optimistic_data = NULL; + } + } else { + connection_fetch_from_buf(payload, length, TO_CONN(conn)); + } log_debug(domain,"(%d) Packaging %d bytes (%d waiting).", conn->_base.s, (int)length, (int)connection_get_inbuf_len(TO_CONN(conn))); + if (sending_optimistically && !sending_from_optimistic) { + /* This is new optimistic data; remember it in case we need to detach and + retry */ + if (!conn->pending_optimistic_data) + conn->pending_optimistic_data = generic_buffer_new(); + generic_buffer_add(conn->pending_optimistic_data, payload, length); + } + if (connection_edge_send_command(conn, RELAY_COMMAND_DATA, payload, length) < 0 ) /* circuit got marked for close, don't continue, don't need to mark conn */ -- cgit v1.2.3-54-g00ecf From 9a7c16fb00c6ffc32ef7d6cc7fbede5258fe4390 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 20 Jul 2011 09:34:19 -0400 Subject: Unit test for generic_buffer_set_to_copy --- src/test/test.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) (limited to 'src') diff --git a/src/test/test.c b/src/test/test.c index 05f5922a5b..e7d4ccbd79 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -569,6 +569,71 @@ test_socks_5_auth_before_negotiation(void *ptr) ; } +static void +test_buffer_copy(void *arg) +{ + generic_buffer_t *buf=NULL, *buf2=NULL; + const char *s; + size_t len; + char b[256]; + int i; + (void)arg; + + buf = generic_buffer_new(); + tt_assert(buf); + + /* Copy an empty buffer. */ + tt_int_op(0, ==, generic_buffer_set_to_copy(&buf2, buf)); + tt_assert(buf2); + tt_int_op(0, ==, generic_buffer_len(buf2)); + + /* Now try with a short buffer. */ + s = "And now comes an act of enormous enormance!"; + len = strlen(s); + generic_buffer_add(buf, s, len); + tt_int_op(len, ==, generic_buffer_len(buf)); + /* Add junk to buf2 so we can test replacing.*/ + generic_buffer_add(buf2, "BLARG", 5); + tt_int_op(0, ==, generic_buffer_set_to_copy(&buf2, buf)); + tt_int_op(len, ==, generic_buffer_len(buf2)); + generic_buffer_get(buf2, b, len); + test_mem_op(b, ==, s, len); + /* Now free buf2 and retry so we can test allocating */ + generic_buffer_free(buf2); + buf2 = NULL; + tt_int_op(0, ==, generic_buffer_set_to_copy(&buf2, buf)); + tt_int_op(len, ==, generic_buffer_len(buf2)); + generic_buffer_get(buf2, b, len); + test_mem_op(b, ==, s, len); + /* Clear buf for next test */ + generic_buffer_get(buf, b, len); + tt_int_op(generic_buffer_len(buf),==,0); + + /* Okay, now let's try a bigger buffer. */ + s = "Quis autem vel eum iure reprehenderit qui in ea voluptate velit " + "esse quam nihil molestiae consequatur, vel illum qui dolorem eum " + "fugiat quo voluptas nulla pariatur?"; + len = strlen(s); + for (i = 0; i < 256; ++i) { + b[0]=i; + generic_buffer_add(buf, b, 1); + generic_buffer_add(buf, s, len); + } + tt_int_op(0, ==, generic_buffer_set_to_copy(&buf2, buf)); + tt_int_op(generic_buffer_len(buf2), ==, generic_buffer_len(buf)); + for (i = 0; i < 256; ++i) { + generic_buffer_get(buf2, b, len+1); + tt_int_op((unsigned char)b[0],==,i); + test_mem_op(b+1, ==, s, len); + } + + done: + if (buf) + generic_buffer_free(buf); + if (buf2) + generic_buffer_free(buf2); +} + /** Run unit tests for buffers.c */ static void test_buffers(void) @@ -1612,6 +1677,7 @@ const struct testcase_setup_t legacy_setup = { static struct testcase_t test_array[] = { ENT(buffers), + { "buffer_copy", test_buffer_copy, 0, NULL, NULL }, ENT(onion_handshake), ENT(circuit_timeout), ENT(policies), -- cgit v1.2.3-54-g00ecf