diff options
Diffstat (limited to 'src/or/relay.c')
-rw-r--r-- | src/or/relay.c | 329 |
1 files changed, 217 insertions, 112 deletions
diff --git a/src/or/relay.c b/src/or/relay.c index b637fadf59..5f7fcd8b7c 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -1,7 +1,7 @@ /* Copyright (c) 2001 Matej Pfajfar. * Copyright (c) 2001-2004, Roger Dingledine. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. - * Copyright (c) 2007-2011, The Tor Project, Inc. */ + * Copyright (c) 2007-2012, The Tor Project, Inc. */ /* See LICENSE for licensing information */ /** @@ -11,6 +11,7 @@ **/ #include <math.h> +#define RELAY_PRIVATE #include "or.h" #include "buffers.h" #include "circuitbuild.h" @@ -24,6 +25,7 @@ #include "main.h" #include "mempool.h" #include "networkstatus.h" +#include "nodelist.h" #include "policies.h" #include "reasons.h" #include "relay.h" @@ -32,9 +34,6 @@ #include "routerlist.h" #include "routerparse.h" -static int relay_crypt(circuit_t *circ, cell_t *cell, - cell_direction_t cell_direction, - crypt_path_t **layer_hint, char *recognized); static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, crypt_path_t *layer_hint); @@ -53,11 +52,6 @@ static int circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint); static int circuit_queue_streams_are_blocked(circuit_t *circ); -/* XXXX023 move this all to compat_libevent */ -/** Cache the current hi-res time; the cache gets reset when libevent - * calls us. */ -static struct timeval cached_time_hires = {0, 0}; - /** Stop reading on edge connections when we have this many cells * waiting on the appropriate queue. */ #define CELL_QUEUE_HIGHWATER_SIZE 256 @@ -65,21 +59,6 @@ static struct timeval cached_time_hires = {0, 0}; * cells. */ #define CELL_QUEUE_LOWWATER_SIZE 64 -static void -tor_gettimeofday_cached(struct timeval *tv) -{ - if (cached_time_hires.tv_sec == 0) { - tor_gettimeofday(&cached_time_hires); - } - *tv = cached_time_hires; -} - -void -tor_gettimeofday_cache_clear(void) -{ - cached_time_hires.tv_sec = 0; -} - /** Stats: how many relay cells have originated at this hop, or have * been relayed onward (not recognized at this hop)? */ @@ -93,7 +72,7 @@ uint64_t stats_n_relay_cells_delivered = 0; * cell. */ static void -relay_set_digest(crypto_digest_env_t *digest, cell_t *cell) +relay_set_digest(crypto_digest_t *digest, cell_t *cell) { char integrity[4]; relay_header_t rh; @@ -114,11 +93,11 @@ relay_set_digest(crypto_digest_env_t *digest, cell_t *cell) * and cell to their original state and return 0. */ static int -relay_digest_matches(crypto_digest_env_t *digest, cell_t *cell) +relay_digest_matches(crypto_digest_t *digest, cell_t *cell) { char received_integrity[4], calculated_integrity[4]; relay_header_t rh; - crypto_digest_env_t *backup_digest=NULL; + crypto_digest_t *backup_digest=NULL; backup_digest = crypto_digest_dup(digest); @@ -142,10 +121,10 @@ relay_digest_matches(crypto_digest_env_t *digest, cell_t *cell) /* restore the relay header */ memcpy(rh.integrity, received_integrity, 4); relay_header_pack(cell->payload, &rh); - crypto_free_digest_env(backup_digest); + crypto_digest_free(backup_digest); return 0; } - crypto_free_digest_env(backup_digest); + crypto_digest_free(backup_digest); return 1; } @@ -157,7 +136,7 @@ relay_digest_matches(crypto_digest_env_t *digest, cell_t *cell) * Return -1 if the crypto fails, else return 0. */ static int -relay_crypt_one_payload(crypto_cipher_env_t *cipher, uint8_t *in, +relay_crypt_one_payload(crypto_cipher_t *cipher, uint8_t *in, int encrypt_mode) { int r; @@ -296,7 +275,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ, * Return -1 to indicate that we should mark the circuit for close, * else return 0. */ -static int +int relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, crypt_path_t **layer_hint, char *recognized) { @@ -608,7 +587,7 @@ relay_send_command_from_edge(streamid_t stream_id, circuit_t *circ, /* If no RELAY_EARLY cells can be sent over this circuit, log which * commands have been sent as RELAY_EARLY cells before; helps debug * task 878. */ - smartlist_t *commands_list = smartlist_create(); + smartlist_t *commands_list = smartlist_new(); int i = 0; char *commands = NULL; for (; i < origin_circ->relay_early_cells_sent; i++) @@ -648,6 +627,7 @@ connection_edge_send_command(edge_connection_t *fromconn, { /* XXXX NM Split this function into a separate versions per circuit type? */ circuit_t *circ; + crypt_path_t *cpath_layer = fromconn->cpath_layer; tor_assert(fromconn); circ = fromconn->on_circuit; @@ -662,7 +642,8 @@ connection_edge_send_command(edge_connection_t *fromconn, if (!circ) { if (fromconn->_base.type == CONN_TYPE_AP) { log_info(LD_APP,"no circ. Closing conn."); - connection_mark_unattached_ap(fromconn, END_STREAM_REASON_INTERNAL); + connection_mark_unattached_ap(EDGE_TO_ENTRY_CONN(fromconn), + END_STREAM_REASON_INTERNAL); } else { log_info(LD_EXIT,"no circ. Closing conn."); fromconn->edge_has_sent_end = 1; /* no circ to send to */ @@ -674,7 +655,7 @@ connection_edge_send_command(edge_connection_t *fromconn, return relay_send_command_from_edge(fromconn->stream_id, circ, relay_command, payload, - payload_len, fromconn->cpath_layer); + payload_len, cpath_layer); } /** How many times will I retry a stream that fails due to DNS @@ -702,22 +683,24 @@ edge_reason_is_retriable(int reason) static int connection_ap_process_end_not_open( relay_header_t *rh, cell_t *cell, origin_circuit_t *circ, - edge_connection_t *conn, crypt_path_t *layer_hint) + entry_connection_t *conn, crypt_path_t *layer_hint) { struct in_addr in; - routerinfo_t *exitrouter; + node_t *exitrouter; int reason = *(cell->payload+RELAY_HEADER_SIZE); int control_reason = reason | END_STREAM_REASON_FLAG_REMOTE; + edge_connection_t *edge_conn = ENTRY_TO_EDGE_CONN(conn); (void) layer_hint; /* unused */ if (rh->length > 0 && edge_reason_is_retriable(reason) && - !connection_edge_is_rendezvous_stream(conn) /* avoid retry if rend */ - ) { + /* avoid retry if rend */ + !connection_edge_is_rendezvous_stream(edge_conn)) { + const char *chosen_exit_digest = + circ->build_state->chosen_exit->identity_digest; log_info(LD_APP,"Address '%s' refused due to '%s'. Considering retrying.", safe_str(conn->socks_request->address), stream_end_reason_to_string(reason)); - exitrouter = - router_get_by_digest(circ->build_state->chosen_exit->identity_digest); + exitrouter = node_get_mutable_by_id(chosen_exit_digest); switch (reason) { case END_STREAM_REASON_EXITPOLICY: if (rh->length >= 5) { @@ -752,13 +735,13 @@ connection_ap_process_end_not_open( log_info(LD_APP, "Exitrouter %s seems to be more restrictive than its exit " "policy. Not using this router as exit for now.", - router_describe(exitrouter)); - policies_set_router_exitpolicy_to_reject_all(exitrouter); + node_describe(exitrouter)); + policies_set_node_exitpolicy_to_reject_all(exitrouter); } /* rewrite it to an IP if we learned one. */ if (addressmap_rewrite(conn->socks_request->address, sizeof(conn->socks_request->address), - NULL)) { + NULL, NULL)) { control_event_stream_status(conn, STREAM_EVENT_REMAP, 0); } if (conn->chosen_exit_optional || @@ -793,7 +776,7 @@ connection_ap_process_end_not_open( /* We haven't retried too many times; reattach the connection. */ circuit_log_path(LOG_INFO,LD_APP,circ); /* Mark this circuit "unusable for new streams". */ - /* XXXX023 this is a kludgy way to do this. */ + /* XXXX024 this is a kludgy way to do this. */ tor_assert(circ->_base.timestamp_dirty); circ->_base.timestamp_dirty -= get_options()->MaxCircuitDirtiness; @@ -818,7 +801,7 @@ connection_ap_process_end_not_open( case END_STREAM_REASON_HIBERNATING: case END_STREAM_REASON_RESOURCELIMIT: if (exitrouter) { - policies_set_router_exitpolicy_to_reject_all(exitrouter); + policies_set_node_exitpolicy_to_reject_all(exitrouter); } if (conn->chosen_exit_optional) { /* stop wanting a specific exit */ @@ -838,7 +821,7 @@ connection_ap_process_end_not_open( stream_end_reason_to_string(rh->length > 0 ? reason : -1)); circuit_log_path(LOG_INFO,LD_APP,circ); /* need to test because of detach_retriable */ - if (!conn->_base.marked_for_close) + if (!ENTRY_TO_CONN(conn)->marked_for_close) connection_mark_unattached_ap(conn, control_reason); return 0; } @@ -847,7 +830,7 @@ connection_ap_process_end_not_open( * dotted-quad representation of <b>new_addr</b> (given in host order), * and send an appropriate REMAP event. */ static void -remap_event_helper(edge_connection_t *conn, uint32_t new_addr) +remap_event_helper(entry_connection_t *conn, uint32_t new_addr) { struct in_addr in; @@ -873,7 +856,8 @@ connection_edge_process_relay_cell_not_open( if (rh->command == RELAY_COMMAND_END) { if (CIRCUIT_IS_ORIGIN(circ) && conn->_base.type == CONN_TYPE_AP) { return connection_ap_process_end_not_open(rh, cell, - TO_ORIGIN_CIRCUIT(circ), conn, + TO_ORIGIN_CIRCUIT(circ), + EDGE_TO_ENTRY_CONN(conn), layer_hint); } else { /* we just got an 'end', don't need to send one */ @@ -887,6 +871,7 @@ connection_edge_process_relay_cell_not_open( if (conn->_base.type == CONN_TYPE_AP && rh->command == RELAY_COMMAND_CONNECTED) { + entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn); tor_assert(CIRCUIT_IS_ORIGIN(circ)); if (conn->_base.state != AP_CONN_STATE_CONNECT_WAIT) { log_fn(LOG_PROTOCOL_WARN, LD_APP, @@ -901,29 +886,27 @@ connection_edge_process_relay_cell_not_open( int ttl; if (!addr || (get_options()->ClientDNSRejectInternalAddresses && is_internal_IP(addr, 0))) { - char buf[INET_NTOA_BUF_LEN]; - struct in_addr a; - a.s_addr = htonl(addr); - tor_inet_ntoa(&a, buf, sizeof(buf)); - log_info(LD_APP, - "...but it claims the IP address was %s. Closing.", buf); + log_info(LD_APP, "...but it claims the IP address was %s. Closing.", + fmt_addr32(addr)); connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL); - connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL); + connection_mark_unattached_ap(entry_conn, + END_STREAM_REASON_TORPROTOCOL); return 0; } if (rh->length >= 8) ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+4)); else ttl = -1; - client_dns_set_addressmap(conn->socks_request->address, addr, - conn->chosen_exit_name, ttl); + client_dns_set_addressmap(entry_conn->socks_request->address, addr, + entry_conn->chosen_exit_name, ttl); - remap_event_helper(conn, addr); + remap_event_helper(entry_conn, addr); } circuit_log_path(LOG_INFO,LD_APP,TO_ORIGIN_CIRCUIT(circ)); /* don't send a socks reply to transparent conns */ - if (!conn->socks_request->has_finished) - connection_ap_handshake_socks_reply(conn, NULL, 0, 0); + tor_assert(entry_conn->socks_request != NULL); + if (!entry_conn->socks_request->has_finished) + connection_ap_handshake_socks_reply(entry_conn, NULL, 0, 0); /* Was it a linked dir conn? If so, a dir request just started to * fetch something; this could be a bootstrap status milestone. */ @@ -946,6 +929,12 @@ connection_edge_process_relay_cell_not_open( break; } } + /* This is definitely a success, so forget about any pending data we + * had sent. */ + if (entry_conn->pending_optimistic_data) { + generic_buffer_free(entry_conn->pending_optimistic_data); + entry_conn->pending_optimistic_data = NULL; + } /* handle anything that might have queued */ if (connection_edge_package_raw_inbuf(conn, 1, NULL) < 0) { @@ -960,17 +949,18 @@ connection_edge_process_relay_cell_not_open( int ttl; int answer_len; uint8_t answer_type; + entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn); if (conn->_base.state != AP_CONN_STATE_RESOLVE_WAIT) { log_fn(LOG_PROTOCOL_WARN, LD_APP, "Got a 'resolved' cell while " "not in state resolve_wait. Dropping."); return 0; } - tor_assert(SOCKS_COMMAND_IS_RESOLVE(conn->socks_request->command)); + tor_assert(SOCKS_COMMAND_IS_RESOLVE(entry_conn->socks_request->command)); answer_len = cell->payload[RELAY_HEADER_SIZE+1]; if (rh->length < 2 || answer_len+2>rh->length) { log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, "Dropping malformed 'resolved' cell"); - connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL); + connection_mark_unattached_ap(entry_conn, END_STREAM_REASON_TORPROTOCOL); return 0; } answer_type = cell->payload[RELAY_HEADER_SIZE]; @@ -983,19 +973,17 @@ connection_edge_process_relay_cell_not_open( uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+2)); if (get_options()->ClientDNSRejectInternalAddresses && is_internal_IP(addr, 0)) { - char buf[INET_NTOA_BUF_LEN]; - struct in_addr a; - a.s_addr = htonl(addr); - tor_inet_ntoa(&a, buf, sizeof(buf)); - log_info(LD_APP,"Got a resolve with answer %s. Rejecting.", buf); - connection_ap_handshake_socks_resolved(conn, + log_info(LD_APP,"Got a resolve with answer %s. Rejecting.", + fmt_addr32(addr)); + connection_ap_handshake_socks_resolved(entry_conn, RESOLVED_TYPE_ERROR_TRANSIENT, 0, NULL, 0, TIME_MAX); - connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL); + connection_mark_unattached_ap(entry_conn, + END_STREAM_REASON_TORPROTOCOL); return 0; } } - connection_ap_handshake_socks_resolved(conn, + connection_ap_handshake_socks_resolved(entry_conn, answer_type, cell->payload[RELAY_HEADER_SIZE+1], /*answer_len*/ cell->payload+RELAY_HEADER_SIZE+2, /*answer*/ @@ -1003,9 +991,9 @@ connection_edge_process_relay_cell_not_open( -1); if (answer_type == RESOLVED_TYPE_IPV4 && answer_len == 4) { uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+2)); - remap_event_helper(conn, addr); + remap_event_helper(entry_conn, addr); } - connection_mark_unattached_ap(conn, + connection_mark_unattached_ap(entry_conn, END_STREAM_REASON_DONE | END_STREAM_REASON_FLAG_ALREADY_SOCKS_REPLIED); return 0; @@ -1039,6 +1027,9 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, relay_header_t rh; unsigned domain = layer_hint?LD_APP:LD_EXIT; int reason; + int optimistic_data = 0; /* Set to 1 if we receive data on a stream + * that's in the EXIT_CONN_STATE_RESOLVING + * or EXIT_CONN_STATE_CONNECTING states. */ tor_assert(cell); tor_assert(circ); @@ -1058,9 +1049,20 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, /* either conn is NULL, in which case we've got a control cell, or else * conn points to the recognized stream. */ - if (conn && !connection_state_is_open(TO_CONN(conn))) - return connection_edge_process_relay_cell_not_open( - &rh, cell, circ, conn, layer_hint); + if (conn && !connection_state_is_open(TO_CONN(conn))) { + if (conn->_base.type == CONN_TYPE_EXIT && + (conn->_base.state == EXIT_CONN_STATE_CONNECTING || + conn->_base.state == EXIT_CONN_STATE_RESOLVING) && + rh.command == RELAY_COMMAND_DATA) { + /* Allow DATA cells to be delivered to an exit node in state + * EXIT_CONN_STATE_CONNECTING or EXIT_CONN_STATE_RESOLVING. + * This speeds up HTTP, for example. */ + optimistic_data = 1; + } else { + return connection_edge_process_relay_cell_not_open( + &rh, cell, circ, conn, layer_hint); + } + } switch (rh.command) { case RELAY_COMMAND_DROP: @@ -1103,8 +1105,12 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, (!layer_hint && --circ->deliver_window < 0)) { log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, "(relay data) circ deliver_window below 0. Killing."); - connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL); - connection_mark_for_close(TO_CONN(conn)); + if (conn) { + /* XXXX Do we actually need to do this? Will killing the circuit + * not send an END and mark the stream for close as appropriate? */ + connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL); + connection_mark_for_close(TO_CONN(conn)); + } return -END_CIRC_REASON_TORPROTOCOL; } log_debug(domain,"circ deliver_window now %d.", layer_hint ? @@ -1127,7 +1133,14 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, stats_n_data_bytes_received += rh.length; connection_write_to_buf((char*)(cell->payload + RELAY_HEADER_SIZE), rh.length, TO_CONN(conn)); - connection_edge_consider_sending_sendme(conn); + + if (!optimistic_data) { + /* Only send a SENDME if we're not getting optimistic data; otherwise + * a SENDME could arrive before the CONNECTED. + */ + connection_edge_consider_sending_sendme(conn); + } + return 0; case RELAY_COMMAND_END: reason = rh.length > 0 ? @@ -1142,9 +1155,13 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, conn->_base.s, stream_end_reason_to_string(reason), conn->stream_id); - if (conn->socks_request && !conn->socks_request->has_finished) - log_warn(LD_BUG, - "open stream hasn't sent socks answer yet? Closing."); + if (conn->_base.type == CONN_TYPE_AP) { + entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn); + if (entry_conn->socks_request && + !entry_conn->socks_request->has_finished) + log_warn(LD_BUG, + "open stream hasn't sent socks answer yet? Closing."); + } /* We just *got* an end; no reason to send one. */ conn->edge_has_sent_end = 1; if (!conn->end_reason) @@ -1152,17 +1169,43 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, if (!conn->_base.marked_for_close) { /* only mark it if not already marked. it's possible to * get the 'end' right around when the client hangs up on us. */ - connection_mark_for_close(TO_CONN(conn)); - conn->_base.hold_open_until_flushed = 1; + connection_mark_and_flush(TO_CONN(conn)); } return 0; - case RELAY_COMMAND_EXTEND: + case RELAY_COMMAND_EXTEND: { + static uint64_t total_n_extend=0, total_nonearly=0; + total_n_extend++; if (conn) { log_fn(LOG_PROTOCOL_WARN, domain, "'extend' cell received for non-zero stream. Dropping."); return 0; } + if (cell->command != CELL_RELAY_EARLY && + !networkstatus_get_param(NULL,"AllowNonearlyExtend",0,0,1)) { +#define EARLY_WARNING_INTERVAL 3600 + static ratelim_t early_warning_limit = + RATELIM_INIT(EARLY_WARNING_INTERVAL); + char *m; + if (cell->command == CELL_RELAY) { + ++total_nonearly; + if ((m = rate_limit_log(&early_warning_limit, approx_time()))) { + double percentage = ((double)total_nonearly)/total_n_extend; + percentage *= 100; + log_fn(LOG_PROTOCOL_WARN, domain, "EXTEND cell received, " + "but not via RELAY_EARLY. Dropping.%s", m); + log_fn(LOG_PROTOCOL_WARN, domain, " (We have dropped %.02f%% of " + "all EXTEND cells for this reason)", percentage); + tor_free(m); + } + } else { + log_fn(LOG_WARN, domain, + "EXTEND cell received, in a cell with type %d! Dropping.", + cell->command); + } + return 0; + } return circuit_extend(cell, circ); + } case RELAY_COMMAND_EXTENDED: if (!layer_hint) { log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, @@ -1220,13 +1263,27 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, "'connected' received, no conn attached anymore. Ignoring."); return 0; case RELAY_COMMAND_SENDME: - if (!conn) { + if (!rh.stream_id) { if (layer_hint) { + if (layer_hint->package_window + CIRCWINDOW_INCREMENT > + CIRCWINDOW_START_MAX) { + log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, + "Bug/attack: unexpected sendme cell from exit relay. " + "Closing circ."); + return -END_CIRC_REASON_TORPROTOCOL; + } layer_hint->package_window += CIRCWINDOW_INCREMENT; log_debug(LD_APP,"circ-level sendme at origin, packagewindow %d.", layer_hint->package_window); circuit_resume_edge_reading(circ, layer_hint); } else { + if (circ->package_window + CIRCWINDOW_INCREMENT > + CIRCWINDOW_START_MAX) { + log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, + "Bug/attack: unexpected sendme cell from client. " + "Closing circ."); + return -END_CIRC_REASON_TORPROTOCOL; + } circ->package_window += CIRCWINDOW_INCREMENT; log_debug(LD_APP, "circ-level sendme at non-origin, packagewindow %d.", @@ -1235,6 +1292,11 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, } return 0; } + if (!conn) { + log_info(domain,"sendme cell dropped, unknown stream (streamid %d).", + rh.stream_id); + return 0; + } conn->package_window += STREAMWINDOW_INCREMENT; log_debug(domain,"stream-level sendme, packagewindow now %d.", conn->package_window); @@ -1324,10 +1386,17 @@ int connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, int *max_cells) { - size_t amount_to_process, length; + size_t bytes_to_process, length; char payload[CELL_PAYLOAD_SIZE]; circuit_t *circ; - unsigned domain = conn->cpath_layer ? LD_APP : LD_EXIT; + const unsigned domain = conn->_base.type == CONN_TYPE_AP ? LD_APP : LD_EXIT; + int sending_from_optimistic = 0; + const int sending_optimistically = + conn->_base.type == CONN_TYPE_AP && + conn->_base.state != AP_CONN_STATE_OPEN; + entry_connection_t *entry_conn = + conn->_base.type == CONN_TYPE_AP ? EDGE_TO_ENTRY_CONN(conn) : NULL; + crypt_path_t *cpath_layer = conn->cpath_layer; tor_assert(conn); @@ -1350,7 +1419,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, return -1; } - if (circuit_consider_stop_edge_reading(circ, conn->cpath_layer)) + if (circuit_consider_stop_edge_reading(circ, cpath_layer)) return 0; if (conn->package_window <= 0) { @@ -1360,44 +1429,75 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, return 0; } - amount_to_process = buf_datalen(conn->_base.inbuf); + sending_from_optimistic = entry_conn && + entry_conn->sending_optimistic_data != NULL; + + if (PREDICT_UNLIKELY(sending_from_optimistic)) { + bytes_to_process = generic_buffer_len(entry_conn->sending_optimistic_data); + if (PREDICT_UNLIKELY(!bytes_to_process)) { + log_warn(LD_BUG, "sending_optimistic_data was non-NULL but empty"); + bytes_to_process = connection_get_inbuf_len(TO_CONN(conn)); + sending_from_optimistic = 0; + } + } else { + bytes_to_process = connection_get_inbuf_len(TO_CONN(conn)); + } - if (!amount_to_process) + if (!bytes_to_process) return 0; - if (!package_partial && amount_to_process < RELAY_PAYLOAD_SIZE) + if (!package_partial && bytes_to_process < RELAY_PAYLOAD_SIZE) return 0; - if (amount_to_process > RELAY_PAYLOAD_SIZE) { + if (bytes_to_process > RELAY_PAYLOAD_SIZE) { length = RELAY_PAYLOAD_SIZE; } else { - length = amount_to_process; + length = bytes_to_process; } stats_n_data_bytes_packaged += length; stats_n_data_cells_packaged += 1; - connection_fetch_from_buf(payload, length, TO_CONN(conn)); + if (PREDICT_UNLIKELY(sending_from_optimistic)) { + /* XXXX We could be more efficient here by sometimes packing + * previously-sent optimistic data in the same cell with data + * from the inbuf. */ + generic_buffer_get(entry_conn->sending_optimistic_data, payload, length); + if (!generic_buffer_len(entry_conn->sending_optimistic_data)) { + generic_buffer_free(entry_conn->sending_optimistic_data); + entry_conn->sending_optimistic_data = NULL; + } + } else { + connection_fetch_from_buf(payload, length, TO_CONN(conn)); + } log_debug(domain,"(%d) Packaging %d bytes (%d waiting).", conn->_base.s, - (int)length, (int)buf_datalen(conn->_base.inbuf)); + (int)length, (int)connection_get_inbuf_len(TO_CONN(conn))); + + if (sending_optimistically && !sending_from_optimistic) { + /* This is new optimistic data; remember it in case we need to detach and + retry */ + if (!entry_conn->pending_optimistic_data) + entry_conn->pending_optimistic_data = generic_buffer_new(); + generic_buffer_add(entry_conn->pending_optimistic_data, payload, length); + } if (connection_edge_send_command(conn, RELAY_COMMAND_DATA, payload, length) < 0 ) /* circuit got marked for close, don't continue, don't need to mark conn */ return 0; - if (!conn->cpath_layer) { /* non-rendezvous exit */ + if (!cpath_layer) { /* non-rendezvous exit */ tor_assert(circ->package_window > 0); circ->package_window--; } else { /* we're an AP, or an exit on a rendezvous circ */ - tor_assert(conn->cpath_layer->package_window > 0); - conn->cpath_layer->package_window--; + tor_assert(cpath_layer->package_window > 0); + cpath_layer->package_window--; } if (--conn->package_window <= 0) { /* is it 0 after decrement? */ connection_stop_reading(TO_CONN(conn)); log_debug(domain,"conn->package_window reached 0."); - circuit_consider_stop_edge_reading(circ, conn->cpath_layer); + circuit_consider_stop_edge_reading(circ, cpath_layer); return 0; /* don't process the inbuf any more */ } log_debug(domain,"conn->package_window is now %d",conn->package_window); @@ -1436,7 +1536,7 @@ connection_edge_consider_sending_sendme(edge_connection_t *conn) } while (conn->deliver_window <= STREAMWINDOW_START - STREAMWINDOW_INCREMENT) { - log_debug(conn->cpath_layer?LD_APP:LD_EXIT, + log_debug(conn->_base.type == CONN_TYPE_AP ?LD_APP:LD_EXIT, "Outbuf %d, Queuing stream sendme.", (int)conn->_base.outbuf_flushlen); conn->deliver_window += STREAMWINDOW_INCREMENT; @@ -1532,7 +1632,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); - if (buf_datalen(conn->_base.inbuf) > 0) + if (connection_get_inbuf_len(TO_CONN(conn)) > 0) ++n_packaging_streams; } } @@ -1543,7 +1643,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); - if (buf_datalen(conn->_base.inbuf) > 0) + if (connection_get_inbuf_len(TO_CONN(conn)) > 0) ++n_packaging_streams; } } @@ -1582,7 +1682,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, } /* If there's still data to read, we'll be coming back to this stream. */ - if (buf_datalen(conn->_base.inbuf)) + if (connection_get_inbuf_len(TO_CONN(conn))) ++n_streams_left; /* If the circuit won't accept any more data, return without looking @@ -1638,9 +1738,10 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) if (layer_hint->package_window <= 0) { log_debug(domain,"yes, at-origin. stopped."); for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn; - conn=conn->next_stream) + conn=conn->next_stream) { if (conn->cpath_layer == layer_hint) connection_stop_reading(TO_CONN(conn)); + } return 1; } return 0; @@ -1988,14 +2089,18 @@ cell_ewma_get_tick(void) * has value ewma_scale_factor ** N.) */ static double ewma_scale_factor = 0.1; +/* DOCDOC ewma_enabled */ static int ewma_enabled = 0; +/*DOCDOC*/ #define EPSILON 0.00001 +/*DOCDOC*/ #define LOG_ONEHALF -0.69314718055994529 /** Adjust the global cell scale factor based on <b>options</b> */ void -cell_ewma_set_scale_factor(or_options_t *options, networkstatus_t *consensus) +cell_ewma_set_scale_factor(const or_options_t *options, + const networkstatus_t *consensus) { int32_t halflife_ms; double halflife; @@ -2246,7 +2351,7 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, edge->edge_blocked_on_circ = block; } - if (!conn->read_event) { + if (!conn->read_event && !HAS_BUFFEREVENT(conn)) { /* This connection is a placeholder for something; probably a DNS * request. It can't actually stop or start reading.*/ continue; @@ -2321,13 +2426,13 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, /* Calculate the exact time that this cell has spent in the queue. */ if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) { - struct timeval now; + struct timeval tvnow; uint32_t flushed; uint32_t cell_waiting_time; insertion_time_queue_t *it_queue = queue->insertion_times; - tor_gettimeofday_cached(&now); - flushed = (uint32_t)((now.tv_sec % SECONDS_IN_A_DAY) * 100L + - (uint32_t)now.tv_usec / (uint32_t)10000L); + tor_gettimeofday_cached(&tvnow); + flushed = (uint32_t)((tvnow.tv_sec % SECONDS_IN_A_DAY) * 100L + + (uint32_t)tvnow.tv_usec / (uint32_t)10000L); if (!it_queue || !it_queue->first) { log_info(LD_GENERAL, "Cannot determine insertion time of cell. " "Looks like the CellStatistics option was " @@ -2373,7 +2478,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, tor_assert(tmp == cell_ewma); add_cell_ewma_to_conn(conn, cell_ewma); } - if (circ != conn->active_circuits) { + if (!ewma_enabled && circ != conn->active_circuits) { /* If this happens, the current circuit just got made inactive by * a call in connection_write_to_buf(). That's nothing to worry about: * circuit_make_inactive_on_conn() already advanced conn->active_circuits @@ -2443,7 +2548,7 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, make_circuit_active_on_conn(circ, orconn); } - if (! buf_datalen(orconn->_base.outbuf)) { + if (! connection_get_outbuf_len(TO_CONN(orconn))) { /* There is no data at all waiting to be sent on the outbuf. Add a * cell, so that we can notice when it gets flushed, flushed_some can * get called, and we can start putting more data onto the buffer then. |