aboutsummaryrefslogtreecommitdiff
path: root/src/or/relay.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/or/relay.c')
-rw-r--r--src/or/relay.c329
1 files changed, 217 insertions, 112 deletions
diff --git a/src/or/relay.c b/src/or/relay.c
index b637fadf59..5f7fcd8b7c 100644
--- a/src/or/relay.c
+++ b/src/or/relay.c
@@ -1,7 +1,7 @@
/* Copyright (c) 2001 Matej Pfajfar.
* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
- * Copyright (c) 2007-2011, The Tor Project, Inc. */
+ * Copyright (c) 2007-2012, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
@@ -11,6 +11,7 @@
**/
#include <math.h>
+#define RELAY_PRIVATE
#include "or.h"
#include "buffers.h"
#include "circuitbuild.h"
@@ -24,6 +25,7 @@
#include "main.h"
#include "mempool.h"
#include "networkstatus.h"
+#include "nodelist.h"
#include "policies.h"
#include "reasons.h"
#include "relay.h"
@@ -32,9 +34,6 @@
#include "routerlist.h"
#include "routerparse.h"
-static int relay_crypt(circuit_t *circ, cell_t *cell,
- cell_direction_t cell_direction,
- crypt_path_t **layer_hint, char *recognized);
static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
cell_direction_t cell_direction,
crypt_path_t *layer_hint);
@@ -53,11 +52,6 @@ static int circuit_consider_stop_edge_reading(circuit_t *circ,
crypt_path_t *layer_hint);
static int circuit_queue_streams_are_blocked(circuit_t *circ);
-/* XXXX023 move this all to compat_libevent */
-/** Cache the current hi-res time; the cache gets reset when libevent
- * calls us. */
-static struct timeval cached_time_hires = {0, 0};
-
/** Stop reading on edge connections when we have this many cells
* waiting on the appropriate queue. */
#define CELL_QUEUE_HIGHWATER_SIZE 256
@@ -65,21 +59,6 @@ static struct timeval cached_time_hires = {0, 0};
* cells. */
#define CELL_QUEUE_LOWWATER_SIZE 64
-static void
-tor_gettimeofday_cached(struct timeval *tv)
-{
- if (cached_time_hires.tv_sec == 0) {
- tor_gettimeofday(&cached_time_hires);
- }
- *tv = cached_time_hires;
-}
-
-void
-tor_gettimeofday_cache_clear(void)
-{
- cached_time_hires.tv_sec = 0;
-}
-
/** Stats: how many relay cells have originated at this hop, or have
* been relayed onward (not recognized at this hop)?
*/
@@ -93,7 +72,7 @@ uint64_t stats_n_relay_cells_delivered = 0;
* cell.
*/
static void
-relay_set_digest(crypto_digest_env_t *digest, cell_t *cell)
+relay_set_digest(crypto_digest_t *digest, cell_t *cell)
{
char integrity[4];
relay_header_t rh;
@@ -114,11 +93,11 @@ relay_set_digest(crypto_digest_env_t *digest, cell_t *cell)
* and cell to their original state and return 0.
*/
static int
-relay_digest_matches(crypto_digest_env_t *digest, cell_t *cell)
+relay_digest_matches(crypto_digest_t *digest, cell_t *cell)
{
char received_integrity[4], calculated_integrity[4];
relay_header_t rh;
- crypto_digest_env_t *backup_digest=NULL;
+ crypto_digest_t *backup_digest=NULL;
backup_digest = crypto_digest_dup(digest);
@@ -142,10 +121,10 @@ relay_digest_matches(crypto_digest_env_t *digest, cell_t *cell)
/* restore the relay header */
memcpy(rh.integrity, received_integrity, 4);
relay_header_pack(cell->payload, &rh);
- crypto_free_digest_env(backup_digest);
+ crypto_digest_free(backup_digest);
return 0;
}
- crypto_free_digest_env(backup_digest);
+ crypto_digest_free(backup_digest);
return 1;
}
@@ -157,7 +136,7 @@ relay_digest_matches(crypto_digest_env_t *digest, cell_t *cell)
* Return -1 if the crypto fails, else return 0.
*/
static int
-relay_crypt_one_payload(crypto_cipher_env_t *cipher, uint8_t *in,
+relay_crypt_one_payload(crypto_cipher_t *cipher, uint8_t *in,
int encrypt_mode)
{
int r;
@@ -296,7 +275,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
* Return -1 to indicate that we should mark the circuit for close,
* else return 0.
*/
-static int
+int
relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction,
crypt_path_t **layer_hint, char *recognized)
{
@@ -608,7 +587,7 @@ relay_send_command_from_edge(streamid_t stream_id, circuit_t *circ,
/* If no RELAY_EARLY cells can be sent over this circuit, log which
* commands have been sent as RELAY_EARLY cells before; helps debug
* task 878. */
- smartlist_t *commands_list = smartlist_create();
+ smartlist_t *commands_list = smartlist_new();
int i = 0;
char *commands = NULL;
for (; i < origin_circ->relay_early_cells_sent; i++)
@@ -648,6 +627,7 @@ connection_edge_send_command(edge_connection_t *fromconn,
{
/* XXXX NM Split this function into a separate versions per circuit type? */
circuit_t *circ;
+ crypt_path_t *cpath_layer = fromconn->cpath_layer;
tor_assert(fromconn);
circ = fromconn->on_circuit;
@@ -662,7 +642,8 @@ connection_edge_send_command(edge_connection_t *fromconn,
if (!circ) {
if (fromconn->_base.type == CONN_TYPE_AP) {
log_info(LD_APP,"no circ. Closing conn.");
- connection_mark_unattached_ap(fromconn, END_STREAM_REASON_INTERNAL);
+ connection_mark_unattached_ap(EDGE_TO_ENTRY_CONN(fromconn),
+ END_STREAM_REASON_INTERNAL);
} else {
log_info(LD_EXIT,"no circ. Closing conn.");
fromconn->edge_has_sent_end = 1; /* no circ to send to */
@@ -674,7 +655,7 @@ connection_edge_send_command(edge_connection_t *fromconn,
return relay_send_command_from_edge(fromconn->stream_id, circ,
relay_command, payload,
- payload_len, fromconn->cpath_layer);
+ payload_len, cpath_layer);
}
/** How many times will I retry a stream that fails due to DNS
@@ -702,22 +683,24 @@ edge_reason_is_retriable(int reason)
static int
connection_ap_process_end_not_open(
relay_header_t *rh, cell_t *cell, origin_circuit_t *circ,
- edge_connection_t *conn, crypt_path_t *layer_hint)
+ entry_connection_t *conn, crypt_path_t *layer_hint)
{
struct in_addr in;
- routerinfo_t *exitrouter;
+ node_t *exitrouter;
int reason = *(cell->payload+RELAY_HEADER_SIZE);
int control_reason = reason | END_STREAM_REASON_FLAG_REMOTE;
+ edge_connection_t *edge_conn = ENTRY_TO_EDGE_CONN(conn);
(void) layer_hint; /* unused */
if (rh->length > 0 && edge_reason_is_retriable(reason) &&
- !connection_edge_is_rendezvous_stream(conn) /* avoid retry if rend */
- ) {
+ /* avoid retry if rend */
+ !connection_edge_is_rendezvous_stream(edge_conn)) {
+ const char *chosen_exit_digest =
+ circ->build_state->chosen_exit->identity_digest;
log_info(LD_APP,"Address '%s' refused due to '%s'. Considering retrying.",
safe_str(conn->socks_request->address),
stream_end_reason_to_string(reason));
- exitrouter =
- router_get_by_digest(circ->build_state->chosen_exit->identity_digest);
+ exitrouter = node_get_mutable_by_id(chosen_exit_digest);
switch (reason) {
case END_STREAM_REASON_EXITPOLICY:
if (rh->length >= 5) {
@@ -752,13 +735,13 @@ connection_ap_process_end_not_open(
log_info(LD_APP,
"Exitrouter %s seems to be more restrictive than its exit "
"policy. Not using this router as exit for now.",
- router_describe(exitrouter));
- policies_set_router_exitpolicy_to_reject_all(exitrouter);
+ node_describe(exitrouter));
+ policies_set_node_exitpolicy_to_reject_all(exitrouter);
}
/* rewrite it to an IP if we learned one. */
if (addressmap_rewrite(conn->socks_request->address,
sizeof(conn->socks_request->address),
- NULL)) {
+ NULL, NULL)) {
control_event_stream_status(conn, STREAM_EVENT_REMAP, 0);
}
if (conn->chosen_exit_optional ||
@@ -793,7 +776,7 @@ connection_ap_process_end_not_open(
/* We haven't retried too many times; reattach the connection. */
circuit_log_path(LOG_INFO,LD_APP,circ);
/* Mark this circuit "unusable for new streams". */
- /* XXXX023 this is a kludgy way to do this. */
+ /* XXXX024 this is a kludgy way to do this. */
tor_assert(circ->_base.timestamp_dirty);
circ->_base.timestamp_dirty -= get_options()->MaxCircuitDirtiness;
@@ -818,7 +801,7 @@ connection_ap_process_end_not_open(
case END_STREAM_REASON_HIBERNATING:
case END_STREAM_REASON_RESOURCELIMIT:
if (exitrouter) {
- policies_set_router_exitpolicy_to_reject_all(exitrouter);
+ policies_set_node_exitpolicy_to_reject_all(exitrouter);
}
if (conn->chosen_exit_optional) {
/* stop wanting a specific exit */
@@ -838,7 +821,7 @@ connection_ap_process_end_not_open(
stream_end_reason_to_string(rh->length > 0 ? reason : -1));
circuit_log_path(LOG_INFO,LD_APP,circ);
/* need to test because of detach_retriable */
- if (!conn->_base.marked_for_close)
+ if (!ENTRY_TO_CONN(conn)->marked_for_close)
connection_mark_unattached_ap(conn, control_reason);
return 0;
}
@@ -847,7 +830,7 @@ connection_ap_process_end_not_open(
* dotted-quad representation of <b>new_addr</b> (given in host order),
* and send an appropriate REMAP event. */
static void
-remap_event_helper(edge_connection_t *conn, uint32_t new_addr)
+remap_event_helper(entry_connection_t *conn, uint32_t new_addr)
{
struct in_addr in;
@@ -873,7 +856,8 @@ connection_edge_process_relay_cell_not_open(
if (rh->command == RELAY_COMMAND_END) {
if (CIRCUIT_IS_ORIGIN(circ) && conn->_base.type == CONN_TYPE_AP) {
return connection_ap_process_end_not_open(rh, cell,
- TO_ORIGIN_CIRCUIT(circ), conn,
+ TO_ORIGIN_CIRCUIT(circ),
+ EDGE_TO_ENTRY_CONN(conn),
layer_hint);
} else {
/* we just got an 'end', don't need to send one */
@@ -887,6 +871,7 @@ connection_edge_process_relay_cell_not_open(
if (conn->_base.type == CONN_TYPE_AP &&
rh->command == RELAY_COMMAND_CONNECTED) {
+ entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn);
tor_assert(CIRCUIT_IS_ORIGIN(circ));
if (conn->_base.state != AP_CONN_STATE_CONNECT_WAIT) {
log_fn(LOG_PROTOCOL_WARN, LD_APP,
@@ -901,29 +886,27 @@ connection_edge_process_relay_cell_not_open(
int ttl;
if (!addr || (get_options()->ClientDNSRejectInternalAddresses &&
is_internal_IP(addr, 0))) {
- char buf[INET_NTOA_BUF_LEN];
- struct in_addr a;
- a.s_addr = htonl(addr);
- tor_inet_ntoa(&a, buf, sizeof(buf));
- log_info(LD_APP,
- "...but it claims the IP address was %s. Closing.", buf);
+ log_info(LD_APP, "...but it claims the IP address was %s. Closing.",
+ fmt_addr32(addr));
connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
- connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
+ connection_mark_unattached_ap(entry_conn,
+ END_STREAM_REASON_TORPROTOCOL);
return 0;
}
if (rh->length >= 8)
ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+4));
else
ttl = -1;
- client_dns_set_addressmap(conn->socks_request->address, addr,
- conn->chosen_exit_name, ttl);
+ client_dns_set_addressmap(entry_conn->socks_request->address, addr,
+ entry_conn->chosen_exit_name, ttl);
- remap_event_helper(conn, addr);
+ remap_event_helper(entry_conn, addr);
}
circuit_log_path(LOG_INFO,LD_APP,TO_ORIGIN_CIRCUIT(circ));
/* don't send a socks reply to transparent conns */
- if (!conn->socks_request->has_finished)
- connection_ap_handshake_socks_reply(conn, NULL, 0, 0);
+ tor_assert(entry_conn->socks_request != NULL);
+ if (!entry_conn->socks_request->has_finished)
+ connection_ap_handshake_socks_reply(entry_conn, NULL, 0, 0);
/* Was it a linked dir conn? If so, a dir request just started to
* fetch something; this could be a bootstrap status milestone. */
@@ -946,6 +929,12 @@ connection_edge_process_relay_cell_not_open(
break;
}
}
+ /* This is definitely a success, so forget about any pending data we
+ * had sent. */
+ if (entry_conn->pending_optimistic_data) {
+ generic_buffer_free(entry_conn->pending_optimistic_data);
+ entry_conn->pending_optimistic_data = NULL;
+ }
/* handle anything that might have queued */
if (connection_edge_package_raw_inbuf(conn, 1, NULL) < 0) {
@@ -960,17 +949,18 @@ connection_edge_process_relay_cell_not_open(
int ttl;
int answer_len;
uint8_t answer_type;
+ entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn);
if (conn->_base.state != AP_CONN_STATE_RESOLVE_WAIT) {
log_fn(LOG_PROTOCOL_WARN, LD_APP, "Got a 'resolved' cell while "
"not in state resolve_wait. Dropping.");
return 0;
}
- tor_assert(SOCKS_COMMAND_IS_RESOLVE(conn->socks_request->command));
+ tor_assert(SOCKS_COMMAND_IS_RESOLVE(entry_conn->socks_request->command));
answer_len = cell->payload[RELAY_HEADER_SIZE+1];
if (rh->length < 2 || answer_len+2>rh->length) {
log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
"Dropping malformed 'resolved' cell");
- connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
+ connection_mark_unattached_ap(entry_conn, END_STREAM_REASON_TORPROTOCOL);
return 0;
}
answer_type = cell->payload[RELAY_HEADER_SIZE];
@@ -983,19 +973,17 @@ connection_edge_process_relay_cell_not_open(
uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+2));
if (get_options()->ClientDNSRejectInternalAddresses &&
is_internal_IP(addr, 0)) {
- char buf[INET_NTOA_BUF_LEN];
- struct in_addr a;
- a.s_addr = htonl(addr);
- tor_inet_ntoa(&a, buf, sizeof(buf));
- log_info(LD_APP,"Got a resolve with answer %s. Rejecting.", buf);
- connection_ap_handshake_socks_resolved(conn,
+ log_info(LD_APP,"Got a resolve with answer %s. Rejecting.",
+ fmt_addr32(addr));
+ connection_ap_handshake_socks_resolved(entry_conn,
RESOLVED_TYPE_ERROR_TRANSIENT,
0, NULL, 0, TIME_MAX);
- connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
+ connection_mark_unattached_ap(entry_conn,
+ END_STREAM_REASON_TORPROTOCOL);
return 0;
}
}
- connection_ap_handshake_socks_resolved(conn,
+ connection_ap_handshake_socks_resolved(entry_conn,
answer_type,
cell->payload[RELAY_HEADER_SIZE+1], /*answer_len*/
cell->payload+RELAY_HEADER_SIZE+2, /*answer*/
@@ -1003,9 +991,9 @@ connection_edge_process_relay_cell_not_open(
-1);
if (answer_type == RESOLVED_TYPE_IPV4 && answer_len == 4) {
uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+2));
- remap_event_helper(conn, addr);
+ remap_event_helper(entry_conn, addr);
}
- connection_mark_unattached_ap(conn,
+ connection_mark_unattached_ap(entry_conn,
END_STREAM_REASON_DONE |
END_STREAM_REASON_FLAG_ALREADY_SOCKS_REPLIED);
return 0;
@@ -1039,6 +1027,9 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
relay_header_t rh;
unsigned domain = layer_hint?LD_APP:LD_EXIT;
int reason;
+ int optimistic_data = 0; /* Set to 1 if we receive data on a stream
+ * that's in the EXIT_CONN_STATE_RESOLVING
+ * or EXIT_CONN_STATE_CONNECTING states. */
tor_assert(cell);
tor_assert(circ);
@@ -1058,9 +1049,20 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
/* either conn is NULL, in which case we've got a control cell, or else
* conn points to the recognized stream. */
- if (conn && !connection_state_is_open(TO_CONN(conn)))
- return connection_edge_process_relay_cell_not_open(
- &rh, cell, circ, conn, layer_hint);
+ if (conn && !connection_state_is_open(TO_CONN(conn))) {
+ if (conn->_base.type == CONN_TYPE_EXIT &&
+ (conn->_base.state == EXIT_CONN_STATE_CONNECTING ||
+ conn->_base.state == EXIT_CONN_STATE_RESOLVING) &&
+ rh.command == RELAY_COMMAND_DATA) {
+ /* Allow DATA cells to be delivered to an exit node in state
+ * EXIT_CONN_STATE_CONNECTING or EXIT_CONN_STATE_RESOLVING.
+ * This speeds up HTTP, for example. */
+ optimistic_data = 1;
+ } else {
+ return connection_edge_process_relay_cell_not_open(
+ &rh, cell, circ, conn, layer_hint);
+ }
+ }
switch (rh.command) {
case RELAY_COMMAND_DROP:
@@ -1103,8 +1105,12 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
(!layer_hint && --circ->deliver_window < 0)) {
log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
"(relay data) circ deliver_window below 0. Killing.");
- connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
- connection_mark_for_close(TO_CONN(conn));
+ if (conn) {
+ /* XXXX Do we actually need to do this? Will killing the circuit
+ * not send an END and mark the stream for close as appropriate? */
+ connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
+ connection_mark_for_close(TO_CONN(conn));
+ }
return -END_CIRC_REASON_TORPROTOCOL;
}
log_debug(domain,"circ deliver_window now %d.", layer_hint ?
@@ -1127,7 +1133,14 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
stats_n_data_bytes_received += rh.length;
connection_write_to_buf((char*)(cell->payload + RELAY_HEADER_SIZE),
rh.length, TO_CONN(conn));
- connection_edge_consider_sending_sendme(conn);
+
+ if (!optimistic_data) {
+ /* Only send a SENDME if we're not getting optimistic data; otherwise
+ * a SENDME could arrive before the CONNECTED.
+ */
+ connection_edge_consider_sending_sendme(conn);
+ }
+
return 0;
case RELAY_COMMAND_END:
reason = rh.length > 0 ?
@@ -1142,9 +1155,13 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
conn->_base.s,
stream_end_reason_to_string(reason),
conn->stream_id);
- if (conn->socks_request && !conn->socks_request->has_finished)
- log_warn(LD_BUG,
- "open stream hasn't sent socks answer yet? Closing.");
+ if (conn->_base.type == CONN_TYPE_AP) {
+ entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn);
+ if (entry_conn->socks_request &&
+ !entry_conn->socks_request->has_finished)
+ log_warn(LD_BUG,
+ "open stream hasn't sent socks answer yet? Closing.");
+ }
/* We just *got* an end; no reason to send one. */
conn->edge_has_sent_end = 1;
if (!conn->end_reason)
@@ -1152,17 +1169,43 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
if (!conn->_base.marked_for_close) {
/* only mark it if not already marked. it's possible to
* get the 'end' right around when the client hangs up on us. */
- connection_mark_for_close(TO_CONN(conn));
- conn->_base.hold_open_until_flushed = 1;
+ connection_mark_and_flush(TO_CONN(conn));
}
return 0;
- case RELAY_COMMAND_EXTEND:
+ case RELAY_COMMAND_EXTEND: {
+ static uint64_t total_n_extend=0, total_nonearly=0;
+ total_n_extend++;
if (conn) {
log_fn(LOG_PROTOCOL_WARN, domain,
"'extend' cell received for non-zero stream. Dropping.");
return 0;
}
+ if (cell->command != CELL_RELAY_EARLY &&
+ !networkstatus_get_param(NULL,"AllowNonearlyExtend",0,0,1)) {
+#define EARLY_WARNING_INTERVAL 3600
+ static ratelim_t early_warning_limit =
+ RATELIM_INIT(EARLY_WARNING_INTERVAL);
+ char *m;
+ if (cell->command == CELL_RELAY) {
+ ++total_nonearly;
+ if ((m = rate_limit_log(&early_warning_limit, approx_time()))) {
+ double percentage = ((double)total_nonearly)/total_n_extend;
+ percentage *= 100;
+ log_fn(LOG_PROTOCOL_WARN, domain, "EXTEND cell received, "
+ "but not via RELAY_EARLY. Dropping.%s", m);
+ log_fn(LOG_PROTOCOL_WARN, domain, " (We have dropped %.02f%% of "
+ "all EXTEND cells for this reason)", percentage);
+ tor_free(m);
+ }
+ } else {
+ log_fn(LOG_WARN, domain,
+ "EXTEND cell received, in a cell with type %d! Dropping.",
+ cell->command);
+ }
+ return 0;
+ }
return circuit_extend(cell, circ);
+ }
case RELAY_COMMAND_EXTENDED:
if (!layer_hint) {
log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
@@ -1220,13 +1263,27 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
"'connected' received, no conn attached anymore. Ignoring.");
return 0;
case RELAY_COMMAND_SENDME:
- if (!conn) {
+ if (!rh.stream_id) {
if (layer_hint) {
+ if (layer_hint->package_window + CIRCWINDOW_INCREMENT >
+ CIRCWINDOW_START_MAX) {
+ log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
+ "Bug/attack: unexpected sendme cell from exit relay. "
+ "Closing circ.");
+ return -END_CIRC_REASON_TORPROTOCOL;
+ }
layer_hint->package_window += CIRCWINDOW_INCREMENT;
log_debug(LD_APP,"circ-level sendme at origin, packagewindow %d.",
layer_hint->package_window);
circuit_resume_edge_reading(circ, layer_hint);
} else {
+ if (circ->package_window + CIRCWINDOW_INCREMENT >
+ CIRCWINDOW_START_MAX) {
+ log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
+ "Bug/attack: unexpected sendme cell from client. "
+ "Closing circ.");
+ return -END_CIRC_REASON_TORPROTOCOL;
+ }
circ->package_window += CIRCWINDOW_INCREMENT;
log_debug(LD_APP,
"circ-level sendme at non-origin, packagewindow %d.",
@@ -1235,6 +1292,11 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
}
return 0;
}
+ if (!conn) {
+ log_info(domain,"sendme cell dropped, unknown stream (streamid %d).",
+ rh.stream_id);
+ return 0;
+ }
conn->package_window += STREAMWINDOW_INCREMENT;
log_debug(domain,"stream-level sendme, packagewindow now %d.",
conn->package_window);
@@ -1324,10 +1386,17 @@ int
connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
int *max_cells)
{
- size_t amount_to_process, length;
+ size_t bytes_to_process, length;
char payload[CELL_PAYLOAD_SIZE];
circuit_t *circ;
- unsigned domain = conn->cpath_layer ? LD_APP : LD_EXIT;
+ const unsigned domain = conn->_base.type == CONN_TYPE_AP ? LD_APP : LD_EXIT;
+ int sending_from_optimistic = 0;
+ const int sending_optimistically =
+ conn->_base.type == CONN_TYPE_AP &&
+ conn->_base.state != AP_CONN_STATE_OPEN;
+ entry_connection_t *entry_conn =
+ conn->_base.type == CONN_TYPE_AP ? EDGE_TO_ENTRY_CONN(conn) : NULL;
+ crypt_path_t *cpath_layer = conn->cpath_layer;
tor_assert(conn);
@@ -1350,7 +1419,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
return -1;
}
- if (circuit_consider_stop_edge_reading(circ, conn->cpath_layer))
+ if (circuit_consider_stop_edge_reading(circ, cpath_layer))
return 0;
if (conn->package_window <= 0) {
@@ -1360,44 +1429,75 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
return 0;
}
- amount_to_process = buf_datalen(conn->_base.inbuf);
+ sending_from_optimistic = entry_conn &&
+ entry_conn->sending_optimistic_data != NULL;
+
+ if (PREDICT_UNLIKELY(sending_from_optimistic)) {
+ bytes_to_process = generic_buffer_len(entry_conn->sending_optimistic_data);
+ if (PREDICT_UNLIKELY(!bytes_to_process)) {
+ log_warn(LD_BUG, "sending_optimistic_data was non-NULL but empty");
+ bytes_to_process = connection_get_inbuf_len(TO_CONN(conn));
+ sending_from_optimistic = 0;
+ }
+ } else {
+ bytes_to_process = connection_get_inbuf_len(TO_CONN(conn));
+ }
- if (!amount_to_process)
+ if (!bytes_to_process)
return 0;
- if (!package_partial && amount_to_process < RELAY_PAYLOAD_SIZE)
+ if (!package_partial && bytes_to_process < RELAY_PAYLOAD_SIZE)
return 0;
- if (amount_to_process > RELAY_PAYLOAD_SIZE) {
+ if (bytes_to_process > RELAY_PAYLOAD_SIZE) {
length = RELAY_PAYLOAD_SIZE;
} else {
- length = amount_to_process;
+ length = bytes_to_process;
}
stats_n_data_bytes_packaged += length;
stats_n_data_cells_packaged += 1;
- connection_fetch_from_buf(payload, length, TO_CONN(conn));
+ if (PREDICT_UNLIKELY(sending_from_optimistic)) {
+ /* XXXX We could be more efficient here by sometimes packing
+ * previously-sent optimistic data in the same cell with data
+ * from the inbuf. */
+ generic_buffer_get(entry_conn->sending_optimistic_data, payload, length);
+ if (!generic_buffer_len(entry_conn->sending_optimistic_data)) {
+ generic_buffer_free(entry_conn->sending_optimistic_data);
+ entry_conn->sending_optimistic_data = NULL;
+ }
+ } else {
+ connection_fetch_from_buf(payload, length, TO_CONN(conn));
+ }
log_debug(domain,"(%d) Packaging %d bytes (%d waiting).", conn->_base.s,
- (int)length, (int)buf_datalen(conn->_base.inbuf));
+ (int)length, (int)connection_get_inbuf_len(TO_CONN(conn)));
+
+ if (sending_optimistically && !sending_from_optimistic) {
+ /* This is new optimistic data; remember it in case we need to detach and
+ retry */
+ if (!entry_conn->pending_optimistic_data)
+ entry_conn->pending_optimistic_data = generic_buffer_new();
+ generic_buffer_add(entry_conn->pending_optimistic_data, payload, length);
+ }
if (connection_edge_send_command(conn, RELAY_COMMAND_DATA,
payload, length) < 0 )
/* circuit got marked for close, don't continue, don't need to mark conn */
return 0;
- if (!conn->cpath_layer) { /* non-rendezvous exit */
+ if (!cpath_layer) { /* non-rendezvous exit */
tor_assert(circ->package_window > 0);
circ->package_window--;
} else { /* we're an AP, or an exit on a rendezvous circ */
- tor_assert(conn->cpath_layer->package_window > 0);
- conn->cpath_layer->package_window--;
+ tor_assert(cpath_layer->package_window > 0);
+ cpath_layer->package_window--;
}
if (--conn->package_window <= 0) { /* is it 0 after decrement? */
connection_stop_reading(TO_CONN(conn));
log_debug(domain,"conn->package_window reached 0.");
- circuit_consider_stop_edge_reading(circ, conn->cpath_layer);
+ circuit_consider_stop_edge_reading(circ, cpath_layer);
return 0; /* don't process the inbuf any more */
}
log_debug(domain,"conn->package_window is now %d",conn->package_window);
@@ -1436,7 +1536,7 @@ connection_edge_consider_sending_sendme(edge_connection_t *conn)
}
while (conn->deliver_window <= STREAMWINDOW_START - STREAMWINDOW_INCREMENT) {
- log_debug(conn->cpath_layer?LD_APP:LD_EXIT,
+ log_debug(conn->_base.type == CONN_TYPE_AP ?LD_APP:LD_EXIT,
"Outbuf %d, Queuing stream sendme.",
(int)conn->_base.outbuf_flushlen);
conn->deliver_window += STREAMWINDOW_INCREMENT;
@@ -1532,7 +1632,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
- if (buf_datalen(conn->_base.inbuf) > 0)
+ if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
++n_packaging_streams;
}
}
@@ -1543,7 +1643,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
- if (buf_datalen(conn->_base.inbuf) > 0)
+ if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
++n_packaging_streams;
}
}
@@ -1582,7 +1682,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
}
/* If there's still data to read, we'll be coming back to this stream. */
- if (buf_datalen(conn->_base.inbuf))
+ if (connection_get_inbuf_len(TO_CONN(conn)))
++n_streams_left;
/* If the circuit won't accept any more data, return without looking
@@ -1638,9 +1738,10 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
if (layer_hint->package_window <= 0) {
log_debug(domain,"yes, at-origin. stopped.");
for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn;
- conn=conn->next_stream)
+ conn=conn->next_stream) {
if (conn->cpath_layer == layer_hint)
connection_stop_reading(TO_CONN(conn));
+ }
return 1;
}
return 0;
@@ -1988,14 +2089,18 @@ cell_ewma_get_tick(void)
* has value ewma_scale_factor ** N.)
*/
static double ewma_scale_factor = 0.1;
+/* DOCDOC ewma_enabled */
static int ewma_enabled = 0;
+/*DOCDOC*/
#define EPSILON 0.00001
+/*DOCDOC*/
#define LOG_ONEHALF -0.69314718055994529
/** Adjust the global cell scale factor based on <b>options</b> */
void
-cell_ewma_set_scale_factor(or_options_t *options, networkstatus_t *consensus)
+cell_ewma_set_scale_factor(const or_options_t *options,
+ const networkstatus_t *consensus)
{
int32_t halflife_ms;
double halflife;
@@ -2246,7 +2351,7 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
edge->edge_blocked_on_circ = block;
}
- if (!conn->read_event) {
+ if (!conn->read_event && !HAS_BUFFEREVENT(conn)) {
/* This connection is a placeholder for something; probably a DNS
* request. It can't actually stop or start reading.*/
continue;
@@ -2321,13 +2426,13 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
/* Calculate the exact time that this cell has spent in the queue. */
if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) {
- struct timeval now;
+ struct timeval tvnow;
uint32_t flushed;
uint32_t cell_waiting_time;
insertion_time_queue_t *it_queue = queue->insertion_times;
- tor_gettimeofday_cached(&now);
- flushed = (uint32_t)((now.tv_sec % SECONDS_IN_A_DAY) * 100L +
- (uint32_t)now.tv_usec / (uint32_t)10000L);
+ tor_gettimeofday_cached(&tvnow);
+ flushed = (uint32_t)((tvnow.tv_sec % SECONDS_IN_A_DAY) * 100L +
+ (uint32_t)tvnow.tv_usec / (uint32_t)10000L);
if (!it_queue || !it_queue->first) {
log_info(LD_GENERAL, "Cannot determine insertion time of cell. "
"Looks like the CellStatistics option was "
@@ -2373,7 +2478,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
tor_assert(tmp == cell_ewma);
add_cell_ewma_to_conn(conn, cell_ewma);
}
- if (circ != conn->active_circuits) {
+ if (!ewma_enabled && circ != conn->active_circuits) {
/* If this happens, the current circuit just got made inactive by
* a call in connection_write_to_buf(). That's nothing to worry about:
* circuit_make_inactive_on_conn() already advanced conn->active_circuits
@@ -2443,7 +2548,7 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
make_circuit_active_on_conn(circ, orconn);
}
- if (! buf_datalen(orconn->_base.outbuf)) {
+ if (! connection_get_outbuf_len(TO_CONN(orconn))) {
/* There is no data at all waiting to be sent on the outbuf. Add a
* cell, so that we can notice when it gets flushed, flushed_some can
* get called, and we can start putting more data onto the buffer then.