aboutsummaryrefslogtreecommitdiff
path: root/src/or/relay.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/or/relay.c')
-rw-r--r--src/or/relay.c574
1 files changed, 449 insertions, 125 deletions
diff --git a/src/or/relay.c b/src/or/relay.c
index 7f06c6e145..f8b0deedb9 100644
--- a/src/or/relay.c
+++ b/src/or/relay.c
@@ -15,6 +15,7 @@
#include "addressmap.h"
#include "buffers.h"
#include "channel.h"
+#include "circpathbias.h"
#include "circuitbuild.h"
#include "circuitlist.h"
#include "circuituse.h"
@@ -58,6 +59,9 @@ static void adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ,
entry_connection_t *conn,
node_t *node,
const tor_addr_t *addr);
+#if 0
+static int get_max_middle_cells(void);
+#endif
/** Stop reading on edge connections when we have this many cells
* waiting on the appropriate queue. */
@@ -968,7 +972,7 @@ remap_event_helper(entry_connection_t *conn, const tor_addr_t *new_addr)
* <b>addr_out</b> to the address we're connected to, and <b>ttl_out</b> to
* the ttl of that address, in seconds, and return 0. On failure, return
* -1. */
-int
+STATIC int
connected_cell_parse(const relay_header_t *rh, const cell_t *cell,
tor_addr_t *addr_out, int *ttl_out)
{
@@ -1005,6 +1009,254 @@ connected_cell_parse(const relay_header_t *rh, const cell_t *cell,
return 0;
}
+/** Drop all storage held by <b>addr</b>. */
+STATIC void
+address_ttl_free(address_ttl_t *addr)
+{
+ if (!addr)
+ return;
+ tor_free(addr->hostname);
+ tor_free(addr);
+}
+
+/** Parse a resolved cell in <b>cell</b>, with parsed header in <b>rh</b>.
+ * Return -1 on parse error. On success, add one or more newly allocated
+ * address_ttl_t to <b>addresses_out</b>; set *<b>errcode_out</b> to
+ * one of 0, RESOLVED_TYPE_ERROR, or RESOLVED_TYPE_ERROR_TRANSIENT, and
+ * return 0. */
+STATIC int
+resolved_cell_parse(const cell_t *cell, const relay_header_t *rh,
+ smartlist_t *addresses_out, int *errcode_out)
+{
+ const uint8_t *cp;
+ uint8_t answer_type;
+ size_t answer_len;
+ address_ttl_t *addr;
+ size_t remaining;
+ int errcode = 0;
+ smartlist_t *addrs;
+
+ tor_assert(cell);
+ tor_assert(rh);
+ tor_assert(addresses_out);
+ tor_assert(errcode_out);
+
+ *errcode_out = 0;
+
+ if (rh->length > RELAY_PAYLOAD_SIZE)
+ return -1;
+
+ addrs = smartlist_new();
+
+ cp = cell->payload + RELAY_HEADER_SIZE;
+
+ remaining = rh->length;
+ while (remaining) {
+ const uint8_t *cp_orig = cp;
+ if (remaining < 2)
+ goto err;
+ answer_type = *cp++;
+ answer_len = *cp++;
+ if (remaining < 2 + answer_len + 4) {
+ goto err;
+ }
+ if (answer_type == RESOLVED_TYPE_IPV4) {
+ if (answer_len != 4) {
+ goto err;
+ }
+ addr = tor_malloc_zero(sizeof(*addr));
+ tor_addr_from_ipv4n(&addr->addr, get_uint32(cp));
+ cp += 4;
+ addr->ttl = ntohl(get_uint32(cp));
+ cp += 4;
+ smartlist_add(addrs, addr);
+ } else if (answer_type == RESOLVED_TYPE_IPV6) {
+ if (answer_len != 16)
+ goto err;
+ addr = tor_malloc_zero(sizeof(*addr));
+ tor_addr_from_ipv6_bytes(&addr->addr, (const char*) cp);
+ cp += 16;
+ addr->ttl = ntohl(get_uint32(cp));
+ cp += 4;
+ smartlist_add(addrs, addr);
+ } else if (answer_type == RESOLVED_TYPE_HOSTNAME) {
+ if (answer_len == 0) {
+ goto err;
+ }
+ addr = tor_malloc_zero(sizeof(*addr));
+ addr->hostname = tor_memdup_nulterm(cp, answer_len);
+ cp += answer_len;
+ addr->ttl = ntohl(get_uint32(cp));
+ cp += 4;
+ smartlist_add(addrs, addr);
+ } else if (answer_type == RESOLVED_TYPE_ERROR_TRANSIENT ||
+ answer_type == RESOLVED_TYPE_ERROR) {
+ errcode = answer_type;
+ /* Ignore the error contents */
+ cp += answer_len + 4;
+ } else {
+ cp += answer_len + 4;
+ }
+ tor_assert(((ssize_t)remaining) >= (cp - cp_orig));
+ remaining -= (cp - cp_orig);
+ }
+
+ if (errcode && smartlist_len(addrs) == 0) {
+ /* Report an error only if there were no results. */
+ *errcode_out = errcode;
+ }
+
+ smartlist_add_all(addresses_out, addrs);
+ smartlist_free(addrs);
+
+ return 0;
+
+ err:
+ /* On parse error, don't report any results */
+ SMARTLIST_FOREACH(addrs, address_ttl_t *, a, address_ttl_free(a));
+ smartlist_free(addrs);
+ return -1;
+}
+
+/** Helper for connection_edge_process_resolved_cell: given an error code,
+ * an entry_connection, and a list of address_ttl_t *, report the best answer
+ * to the entry_connection. */
+static void
+connection_ap_handshake_socks_got_resolved_cell(entry_connection_t *conn,
+ int error_code,
+ smartlist_t *results)
+{
+ address_ttl_t *addr_ipv4 = NULL;
+ address_ttl_t *addr_ipv6 = NULL;
+ address_ttl_t *addr_hostname = NULL;
+ address_ttl_t *addr_best = NULL;
+
+ /* If it's an error code, that's easy. */
+ if (error_code) {
+ tor_assert(error_code == RESOLVED_TYPE_ERROR ||
+ error_code == RESOLVED_TYPE_ERROR_TRANSIENT);
+ connection_ap_handshake_socks_resolved(conn,
+ error_code,0,NULL,-1,-1);
+ return;
+ }
+
+ /* Get the first answer of each type. */
+ SMARTLIST_FOREACH_BEGIN(results, address_ttl_t *, addr) {
+ if (addr->hostname) {
+ if (!addr_hostname) {
+ addr_hostname = addr;
+ }
+ } else if (tor_addr_family(&addr->addr) == AF_INET) {
+ if (!addr_ipv4 && conn->ipv4_traffic_ok) {
+ addr_ipv4 = addr;
+ }
+ } else if (tor_addr_family(&addr->addr) == AF_INET6) {
+ if (!addr_ipv6 && conn->ipv6_traffic_ok) {
+ addr_ipv6 = addr;
+ }
+ }
+ } SMARTLIST_FOREACH_END(addr);
+
+ /* Now figure out which type we wanted to deliver. */
+ if (conn->socks_request->command == SOCKS_COMMAND_RESOLVE_PTR) {
+ if (addr_hostname) {
+ connection_ap_handshake_socks_resolved(conn,
+ RESOLVED_TYPE_HOSTNAME,
+ strlen(addr_hostname->hostname),
+ (uint8_t*)addr_hostname->hostname,
+ addr_hostname->ttl,-1);
+ } else {
+ connection_ap_handshake_socks_resolved(conn,
+ RESOLVED_TYPE_ERROR,0,NULL,-1,-1);
+ }
+ return;
+ }
+
+ if (conn->prefer_ipv6_traffic) {
+ addr_best = addr_ipv6 ? addr_ipv6 : addr_ipv4;
+ } else {
+ addr_best = addr_ipv4 ? addr_ipv4 : addr_ipv6;
+ }
+
+ /* Now convert it to the ugly old interface */
+ if (! addr_best) {
+ connection_ap_handshake_socks_resolved(conn,
+ RESOLVED_TYPE_ERROR,0,NULL,-1,-1);
+ return;
+ }
+
+ connection_ap_handshake_socks_resolved_addr(conn,
+ &addr_best->addr,
+ addr_best->ttl,
+ -1);
+
+ remap_event_helper(conn, &addr_best->addr);
+}
+
+/** Handle a RELAY_COMMAND_RESOLVED cell that we received on a non-open AP
+ * stream. */
+STATIC int
+connection_edge_process_resolved_cell(edge_connection_t *conn,
+ const cell_t *cell,
+ const relay_header_t *rh)
+{
+ entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn);
+ smartlist_t *resolved_addresses = NULL;
+ int errcode = 0;
+
+ if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) {
+ log_fn(LOG_PROTOCOL_WARN, LD_APP, "Got a 'resolved' cell while "
+ "not in state resolve_wait. Dropping.");
+ return 0;
+ }
+ tor_assert(SOCKS_COMMAND_IS_RESOLVE(entry_conn->socks_request->command));
+
+ resolved_addresses = smartlist_new();
+ if (resolved_cell_parse(cell, rh, resolved_addresses, &errcode)) {
+ log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
+ "Dropping malformed 'resolved' cell");
+ connection_mark_unattached_ap(entry_conn, END_STREAM_REASON_TORPROTOCOL);
+ goto done;
+ }
+
+ if (get_options()->ClientDNSRejectInternalAddresses) {
+ int orig_len = smartlist_len(resolved_addresses);
+ SMARTLIST_FOREACH_BEGIN(resolved_addresses, address_ttl_t *, addr) {
+ if (addr->hostname == NULL && tor_addr_is_internal(&addr->addr, 0)) {
+ log_info(LD_APP, "Got a resolved cell with answer %s; dropping that "
+ "answer.",
+ safe_str_client(fmt_addr(&addr->addr)));
+ address_ttl_free(addr);
+ SMARTLIST_DEL_CURRENT(resolved_addresses, addr);
+ }
+ } SMARTLIST_FOREACH_END(addr);
+ if (orig_len && smartlist_len(resolved_addresses) == 0) {
+ log_info(LD_APP, "Got a resolved cell with only private addresses; "
+ "dropping it.");
+ connection_ap_handshake_socks_resolved(entry_conn,
+ RESOLVED_TYPE_ERROR_TRANSIENT,
+ 0, NULL, 0, TIME_MAX);
+ connection_mark_unattached_ap(entry_conn,
+ END_STREAM_REASON_TORPROTOCOL);
+ goto done;
+ }
+ }
+
+ connection_ap_handshake_socks_got_resolved_cell(entry_conn,
+ errcode,
+ resolved_addresses);
+
+ connection_mark_unattached_ap(entry_conn,
+ END_STREAM_REASON_DONE |
+ END_STREAM_REASON_FLAG_ALREADY_SOCKS_REPLIED);
+
+ done:
+ SMARTLIST_FOREACH(resolved_addresses, address_ttl_t *, addr,
+ address_ttl_free(addr));
+ smartlist_free(resolved_addresses);
+ return 0;
+}
+
/** An incoming relay cell has arrived from circuit <b>circ</b> to
* stream <b>conn</b>.
*
@@ -1106,8 +1358,9 @@ connection_edge_process_relay_cell_not_open(
break;
case DIR_PURPOSE_FETCH_SERVERDESC:
case DIR_PURPOSE_FETCH_MICRODESC:
- control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_DESCRIPTORS,
- count_loading_descriptors_progress());
+ if (TO_DIR_CONN(dirconn)->router_purpose == ROUTER_PURPOSE_GENERAL)
+ control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_DESCRIPTORS,
+ count_loading_descriptors_progress());
break;
}
}
@@ -1128,67 +1381,7 @@ connection_edge_process_relay_cell_not_open(
}
if (conn->base_.type == CONN_TYPE_AP &&
rh->command == RELAY_COMMAND_RESOLVED) {
- int ttl;
- int answer_len;
- uint8_t answer_type;
- entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn);
- if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) {
- log_fn(LOG_PROTOCOL_WARN, LD_APP, "Got a 'resolved' cell while "
- "not in state resolve_wait. Dropping.");
- return 0;
- }
- tor_assert(SOCKS_COMMAND_IS_RESOLVE(entry_conn->socks_request->command));
- answer_len = cell->payload[RELAY_HEADER_SIZE+1];
- if (rh->length < 2 || answer_len+2>rh->length) {
- log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
- "Dropping malformed 'resolved' cell");
- connection_mark_unattached_ap(entry_conn, END_STREAM_REASON_TORPROTOCOL);
- return 0;
- }
- answer_type = cell->payload[RELAY_HEADER_SIZE];
- if (rh->length >= answer_len+6)
- ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+
- 2+answer_len));
- else
- ttl = -1;
- if (answer_type == RESOLVED_TYPE_IPV4 ||
- answer_type == RESOLVED_TYPE_IPV6) {
- tor_addr_t addr;
- if (decode_address_from_payload(&addr, cell->payload+RELAY_HEADER_SIZE,
- rh->length) &&
- tor_addr_is_internal(&addr, 0) &&
- get_options()->ClientDNSRejectInternalAddresses) {
- log_info(LD_APP,"Got a resolve with answer %s. Rejecting.",
- fmt_addr(&addr));
- connection_ap_handshake_socks_resolved(entry_conn,
- RESOLVED_TYPE_ERROR_TRANSIENT,
- 0, NULL, 0, TIME_MAX);
- connection_mark_unattached_ap(entry_conn,
- END_STREAM_REASON_TORPROTOCOL);
- return 0;
- }
- }
- connection_ap_handshake_socks_resolved(entry_conn,
- answer_type,
- cell->payload[RELAY_HEADER_SIZE+1], /*answer_len*/
- cell->payload+RELAY_HEADER_SIZE+2, /*answer*/
- ttl,
- -1);
- if (answer_type == RESOLVED_TYPE_IPV4 && answer_len == 4) {
- tor_addr_t addr;
- tor_addr_from_ipv4n(&addr,
- get_uint32(cell->payload+RELAY_HEADER_SIZE+2));
- remap_event_helper(entry_conn, &addr);
- } else if (answer_type == RESOLVED_TYPE_IPV6 && answer_len == 16) {
- tor_addr_t addr;
- tor_addr_from_ipv6_bytes(&addr,
- (char*)(cell->payload+RELAY_HEADER_SIZE+2));
- remap_event_helper(entry_conn, &addr);
- }
- connection_mark_unattached_ap(entry_conn,
- END_STREAM_REASON_DONE |
- END_STREAM_REASON_FLAG_ALREADY_SOCKS_REPLIED);
- return 0;
+ return connection_edge_process_resolved_cell(conn, cell, rh);
}
log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
@@ -1497,7 +1690,8 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
if (layer_hint) {
if (layer_hint->package_window + CIRCWINDOW_INCREMENT >
CIRCWINDOW_START_MAX) {
- log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
+ static struct ratelim_t exit_warn_ratelim = RATELIM_INIT(600);
+ log_fn_ratelim(&exit_warn_ratelim, LOG_WARN, LD_PROTOCOL,
"Unexpected sendme cell from exit relay. "
"Closing circ.");
return -END_CIRC_REASON_TORPROTOCOL;
@@ -1509,7 +1703,8 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
} else {
if (circ->package_window + CIRCWINDOW_INCREMENT >
CIRCWINDOW_START_MAX) {
- log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
+ static struct ratelim_t client_warn_ratelim = RATELIM_INIT(600);
+ log_fn_ratelim(&client_warn_ratelim, LOG_WARN, LD_PROTOCOL,
"Unexpected sendme cell from client. "
"Closing circ (window %d).",
circ->package_window);
@@ -2050,8 +2245,8 @@ init_cell_pool(void)
cell_pool = mp_pool_new(sizeof(packed_cell_t), 128*1024);
}
-/** Free all storage used to hold cells (and insertion times if we measure
- * cell statistics). */
+/** Free all storage used to hold cells (and insertion times/commands if we
+ * measure cell statistics and/or if CELL_STATS events are enabled). */
void
free_cell_pool(void)
{
@@ -2079,7 +2274,7 @@ packed_cell_free_unchecked(packed_cell_t *cell)
}
/** Allocate and return a new packed_cell_t. */
-static INLINE packed_cell_t *
+STATIC packed_cell_t *
packed_cell_new(void)
{
++total_cells_allocated;
@@ -2090,6 +2285,8 @@ packed_cell_new(void)
void
packed_cell_free(packed_cell_t *cell)
{
+ if (!cell)
+ return;
packed_cell_free_unchecked(cell);
}
@@ -2101,7 +2298,7 @@ dump_cell_pool_usage(int severity)
circuit_t *c;
int n_circs = 0;
int n_cells = 0;
- for (c = circuit_get_global_list_(); c; c = c->next) {
+ TOR_LIST_FOREACH(c, circuit_get_global_list(), head) {
n_cells += c->n_chan_cells.n;
if (!CIRCUIT_IS_ORIGIN(c))
n_cells += TO_OR_CIRCUIT(c)->p_chan_cells.n;
@@ -2119,7 +2316,6 @@ packed_cell_copy(const cell_t *cell, int wide_circ_ids)
{
packed_cell_t *c = packed_cell_new();
cell_pack(c, cell, wide_circ_ids);
- c->next = NULL;
return c;
}
@@ -2127,58 +2323,61 @@ packed_cell_copy(const cell_t *cell, int wide_circ_ids)
void
cell_queue_append(cell_queue_t *queue, packed_cell_t *cell)
{
- if (queue->tail) {
- tor_assert(!queue->tail->next);
- queue->tail->next = cell;
- } else {
- queue->head = cell;
- }
- queue->tail = cell;
- cell->next = NULL;
+ TOR_SIMPLEQ_INSERT_TAIL(&queue->head, cell, next);
++queue->n;
}
-/** Append a newly allocated copy of <b>cell</b> to the end of <b>queue</b> */
+/** Append a newly allocated copy of <b>cell</b> to the end of the
+ * <b>exitward</b> (or app-ward) <b>queue</b> of <b>circ</b>. If
+ * <b>use_stats</b> is true, record statistics about the cell.
+ */
void
-cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell,
- int wide_circ_ids)
+cell_queue_append_packed_copy(circuit_t *circ, cell_queue_t *queue,
+ int exitward, const cell_t *cell,
+ int wide_circ_ids, int use_stats)
{
struct timeval now;
packed_cell_t *copy = packed_cell_copy(cell, wide_circ_ids);
- tor_gettimeofday_cached(&now);
+ (void)circ;
+ (void)exitward;
+ (void)use_stats;
+ tor_gettimeofday_cached_monotonic(&now);
+
copy->inserted_time = (uint32_t)tv_to_msec(&now);
cell_queue_append(queue, copy);
}
+/** Initialize <b>queue</b> as an empty cell queue. */
+void
+cell_queue_init(cell_queue_t *queue)
+{
+ memset(queue, 0, sizeof(cell_queue_t));
+ TOR_SIMPLEQ_INIT(&queue->head);
+}
+
/** Remove and free every cell in <b>queue</b>. */
void
cell_queue_clear(cell_queue_t *queue)
{
- packed_cell_t *cell, *next;
- cell = queue->head;
- while (cell) {
- next = cell->next;
+ packed_cell_t *cell;
+ while ((cell = TOR_SIMPLEQ_FIRST(&queue->head))) {
+ TOR_SIMPLEQ_REMOVE_HEAD(&queue->head, next);
packed_cell_free_unchecked(cell);
- cell = next;
}
- queue->head = queue->tail = NULL;
+ TOR_SIMPLEQ_INIT(&queue->head);
queue->n = 0;
}
/** Extract and return the cell at the head of <b>queue</b>; return NULL if
* <b>queue</b> is empty. */
-static INLINE packed_cell_t *
+STATIC packed_cell_t *
cell_queue_pop(cell_queue_t *queue)
{
- packed_cell_t *cell = queue->head;
+ packed_cell_t *cell = TOR_SIMPLEQ_FIRST(&queue->head);
if (!cell)
return NULL;
- queue->head = cell->next;
- if (cell == queue->tail) {
- tor_assert(!queue->head);
- queue->tail = NULL;
- }
+ TOR_SIMPLEQ_REMOVE_HEAD(&queue->head, next);
--queue->n;
return cell;
}
@@ -2191,13 +2390,21 @@ packed_cell_mem_cost(void)
return sizeof(packed_cell_t) + MP_POOL_ITEM_OVERHEAD;
}
+/** DOCDOC */
+STATIC size_t
+cell_queues_get_total_allocation(void)
+{
+ return total_cells_allocated * packed_cell_mem_cost();
+}
+
/** Check whether we've got too much space used for cells. If so,
* call the OOM handler and return 1. Otherwise, return 0. */
-static int
+STATIC int
cell_queues_check_size(void)
{
- size_t alloc = total_cells_allocated * packed_cell_mem_cost();
- if (alloc >= get_options()->MaxMemInCellQueues) {
+ size_t alloc = cell_queues_get_total_allocation();
+ alloc += buf_get_total_allocation();
+ if (alloc >= get_options()->MaxMemInQueues) {
circuits_handle_oom(alloc);
return 1;
}
@@ -2252,14 +2459,18 @@ update_circuit_on_cmux_(circuit_t *circ, cell_direction_t direction,
assert_cmux_ok_paranoid(chan);
}
-/** Remove all circuits from the cmux on <b>chan</b>. */
+/** Remove all circuits from the cmux on <b>chan</b>.
+ *
+ * If <b>circuits_out</b> is non-NULL, add all detached circuits to
+ * <b>circuits_out</b>.
+ **/
void
-channel_unlink_all_circuits(channel_t *chan)
+channel_unlink_all_circuits(channel_t *chan, smartlist_t *circuits_out)
{
tor_assert(chan);
tor_assert(chan->cmux);
- circuitmux_detach_all_circuits(chan->cmux);
+ circuitmux_detach_all_circuits(chan->cmux, circuits_out);
chan->num_n_circuits = 0;
chan->num_p_circuits = 0;
}
@@ -2318,6 +2529,17 @@ set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
return n;
}
+/** Extract the command from a packed cell. */
+static uint8_t
+packed_cell_get_command(const packed_cell_t *cell, int wide_circ_ids)
+{
+ if (wide_circ_ids) {
+ return get_uint8(cell->body+4);
+ } else {
+ return get_uint8(cell->body+2);
+ }
+}
+
/** Pull as many cells as possible (but no more than <b>max</b>) from the
* queue of the first active circuit on <b>chan</b>, and write them to
* <b>chan</b>-&gt;outbuf. Return the number of cells written. Advance
@@ -2327,7 +2549,7 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
{
circuitmux_t *cmux = NULL;
int n_flushed = 0;
- cell_queue_t *queue;
+ cell_queue_t *queue, *destroy_queue=NULL;
circuit_t *circ;
or_circuit_t *or_circ;
int streams_blocked;
@@ -2340,7 +2562,18 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
/* Main loop: pick a circuit, send a cell, update the cmux */
while (n_flushed < max) {
- circ = circuitmux_get_first_active_circuit(cmux);
+ circ = circuitmux_get_first_active_circuit(cmux, &destroy_queue);
+ if (destroy_queue) {
+ /* this code is duplicated from some of the logic below. Ugly! XXXX */
+ tor_assert(destroy_queue->n > 0);
+ cell = cell_queue_pop(destroy_queue);
+ channel_write_packed_cell(chan, cell);
+ /* Update the cmux destroy counter */
+ circuitmux_notify_xmit_destroy(cmux);
+ cell = NULL;
+ ++n_flushed;
+ continue;
+ }
/* If it returns NULL, no cells left to send */
if (!circ) break;
assert_cmux_ok_paranoid(chan);
@@ -2366,15 +2599,33 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
cell = cell_queue_pop(queue);
/* Calculate the exact time that this cell has spent in the queue. */
- if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) {
+ if (get_options()->CellStatistics ||
+ get_options()->TestingEnableCellStatsEvent) {
uint32_t msec_waiting;
struct timeval tvnow;
- or_circ = TO_OR_CIRCUIT(circ);
tor_gettimeofday_cached(&tvnow);
msec_waiting = ((uint32_t)tv_to_msec(&tvnow)) - cell->inserted_time;
- or_circ->total_cell_waiting_time += msec_waiting;
- or_circ->processed_cells++;
+ if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) {
+ or_circ = TO_OR_CIRCUIT(circ);
+ or_circ->total_cell_waiting_time += msec_waiting;
+ or_circ->processed_cells++;
+ }
+
+ if (get_options()->TestingEnableCellStatsEvent) {
+ uint8_t command = packed_cell_get_command(cell, chan->wide_circ_ids);
+
+ testing_cell_stats_entry_t *ent =
+ tor_malloc_zero(sizeof(testing_cell_stats_entry_t));
+ ent->command = command;
+ ent->waiting_time = msec_waiting / 10;
+ ent->removed = 1;
+ if (circ->n_chan == chan)
+ ent->exitward = 1;
+ if (!circ->testing_cell_stats)
+ circ->testing_cell_stats = smartlist_new();
+ smartlist_add(circ->testing_cell_stats, ent);
+ }
}
/* If we just flushed our queue and this circuit is used for a
@@ -2420,6 +2671,20 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
return n_flushed;
}
+#if 0
+/** Indicate the current preferred cap for middle circuits; zero disables
+ * the cap. Right now it's just a constant, ORCIRC_MAX_MIDDLE_CELLS, but
+ * the logic in append_cell_to_circuit_queue() is written to be correct
+ * if we want to base it on a consensus param or something that might change
+ * in the future.
+ */
+static int
+get_max_middle_cells(void)
+{
+ return ORCIRC_MAX_MIDDLE_CELLS;
+}
+#endif
+
/** Add <b>cell</b> to the queue of <b>circ</b> writing to <b>chan</b>
* transmitting in <b>direction</b>. */
void
@@ -2430,11 +2695,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
or_circuit_t *orcirc = NULL;
cell_queue_t *queue;
int streams_blocked;
+#if 0
+ uint32_t tgt_max_middle_cells, p_len, n_len, tmp, hard_max_middle_cells;
+#endif
+ int exitward;
if (circ->marked_for_close)
return;
- if (direction == CELL_DIRECTION_OUT) {
+ exitward = (direction == CELL_DIRECTION_OUT);
+ if (exitward) {
queue = &circ->n_chan_cells;
streams_blocked = circ->streams_blocked_on_n_chan;
} else {
@@ -2451,28 +2721,82 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
if ((circ->n_chan != NULL) && CIRCUIT_IS_ORCIRC(circ)) {
orcirc = TO_OR_CIRCUIT(circ);
if (orcirc->p_chan) {
- if (queue->n + 1 >= ORCIRC_MAX_MIDDLE_CELLS) {
- /* Queueing this cell would put queue over the cap */
- log_warn(LD_CIRC,
- "Got a cell exceeding the cap of %u in the %s direction "
- "on middle circ ID %u on chan ID " U64_FORMAT
- "; killing the circuit.",
- ORCIRC_MAX_MIDDLE_CELLS,
- (direction == CELL_DIRECTION_OUT) ? "n" : "p",
- (direction == CELL_DIRECTION_OUT) ?
- circ->n_circ_id : orcirc->p_circ_id,
- U64_PRINTF_ARG(
+ /* We are a middle circuit if we have both n_chan and p_chan */
+ /* We'll need to know the current preferred maximum */
+ tgt_max_middle_cells = get_max_middle_cells();
+ if (tgt_max_middle_cells > 0) {
+ /* Do we need to initialize middle_max_cells? */
+ if (orcirc->max_middle_cells == 0) {
+ orcirc->max_middle_cells = tgt_max_middle_cells;
+ } else {
+ if (tgt_max_middle_cells > orcirc->max_middle_cells) {
+ /* If we want to increase the cap, we can do so right away */
+ orcirc->max_middle_cells = tgt_max_middle_cells;
+ } else if (tgt_max_middle_cells < orcirc->max_middle_cells) {
+ /*
+ * If we're shrinking the cap, we can't shrink past either queue;
+ * compare tgt_max_middle_cells rather than tgt_max_middle_cells *
+ * ORCIRC_MAX_MIDDLE_KILL_THRESH so the queues don't shrink enough
+ * to generate spurious warnings, either.
+ */
+ n_len = circ->n_chan_cells.n;
+ p_len = orcirc->p_chan_cells.n;
+ tmp = tgt_max_middle_cells;
+ if (tmp < n_len) tmp = n_len;
+ if (tmp < p_len) tmp = p_len;
+ orcirc->max_middle_cells = tmp;
+ }
+ /* else no change */
+ }
+ } else {
+ /* tgt_max_middle_cells == 0 indicates we should disable the cap */
+ orcirc->max_middle_cells = 0;
+ }
+
+ /* Now we know orcirc->max_middle_cells is set correctly */
+ if (orcirc->max_middle_cells > 0) {
+ hard_max_middle_cells =
+ (uint32_t)(((double)orcirc->max_middle_cells) *
+ ORCIRC_MAX_MIDDLE_KILL_THRESH);
+
+ if ((unsigned)queue->n + 1 >= hard_max_middle_cells) {
+ /* Queueing this cell would put queue over the kill theshold */
+ log_warn(LD_CIRC,
+ "Got a cell exceeding the hard cap of %u in the "
+ "%s direction on middle circ ID %u on chan ID "
+ U64_FORMAT "; killing the circuit.",
+ hard_max_middle_cells,
+ (direction == CELL_DIRECTION_OUT) ? "n" : "p",
(direction == CELL_DIRECTION_OUT) ?
- circ->n_chan->global_identifier :
- orcirc->p_chan->global_identifier));
- circuit_mark_for_close(circ, END_CIRC_REASON_RESOURCELIMIT);
- return;
+ circ->n_circ_id : orcirc->p_circ_id,
+ U64_PRINTF_ARG(
+ (direction == CELL_DIRECTION_OUT) ?
+ circ->n_chan->global_identifier :
+ orcirc->p_chan->global_identifier));
+ circuit_mark_for_close(circ, END_CIRC_REASON_RESOURCELIMIT);
+ return;
+ } else if ((unsigned)queue->n + 1 == orcirc->max_middle_cells) {
+ /* Only use ==, not >= for this test so we don't spam the log */
+ log_warn(LD_CIRC,
+ "While trying to queue a cell, reached the soft cap of %u "
+ "in the %s direction on middle circ ID %u "
+ "on chan ID " U64_FORMAT ".",
+ orcirc->max_middle_cells,
+ (direction == CELL_DIRECTION_OUT) ? "n" : "p",
+ (direction == CELL_DIRECTION_OUT) ?
+ circ->n_circ_id : orcirc->p_circ_id,
+ U64_PRINTF_ARG(
+ (direction == CELL_DIRECTION_OUT) ?
+ circ->n_chan->global_identifier :
+ orcirc->p_chan->global_identifier));
+ }
}
}
}
#endif
- cell_queue_append_packed_copy(queue, cell, chan->wide_circ_ids);
+ cell_queue_append_packed_copy(circ, queue, exitward, cell,
+ chan->wide_circ_ids, 1);
if (PREDICT_UNLIKELY(cell_queues_check_size())) {
/* We ran the OOM handler */