aboutsummaryrefslogtreecommitdiff
path: root/src/or/relay.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/or/relay.c')
-rw-r--r--src/or/relay.c1149
1 files changed, 528 insertions, 621 deletions
diff --git a/src/or/relay.c b/src/or/relay.c
index a17c333310..1da993269d 100644
--- a/src/or/relay.c
+++ b/src/or/relay.c
@@ -1,7 +1,7 @@
/* Copyright (c) 2001 Matej Pfajfar.
* Copyright (c) 2001-2004, Roger Dingledine.
* Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
- * Copyright (c) 2007-2012, The Tor Project, Inc. */
+ * Copyright (c) 2007-2013, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
@@ -10,12 +10,14 @@
* receiving from circuits, plus queuing on circuits.
**/
-#include <math.h>
#define RELAY_PRIVATE
#include "or.h"
+#include "addressmap.h"
#include "buffers.h"
+#include "channel.h"
#include "circuitbuild.h"
#include "circuitlist.h"
+#include "circuituse.h"
#include "config.h"
#include "connection.h"
#include "connection_edge.h"
@@ -26,6 +28,7 @@
#include "mempool.h"
#include "networkstatus.h"
#include "nodelist.h"
+#include "onion.h"
#include "policies.h"
#include "reasons.h"
#include "relay.h"
@@ -51,6 +54,10 @@ static int circuit_resume_edge_reading_helper(edge_connection_t *conn,
static int circuit_consider_stop_edge_reading(circuit_t *circ,
crypt_path_t *layer_hint);
static int circuit_queue_streams_are_blocked(circuit_t *circ);
+static void adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ,
+ entry_connection_t *conn,
+ node_t *node,
+ const tor_addr_t *addr);
/** Stop reading on edge connections when we have this many cells
* waiting on the appropriate queue. */
@@ -68,6 +75,9 @@ uint64_t stats_n_relay_cells_relayed = 0;
*/
uint64_t stats_n_relay_cells_delivered = 0;
+/** Used to tell which stream to read from first on a circuit. */
+static tor_weak_rng_t stream_choice_rng = TOR_WEAK_RNG_INIT;
+
/** Update digest from the payload of cell. Assign integrity part to
* cell.
*/
@@ -166,7 +176,7 @@ int
circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
cell_direction_t cell_direction)
{
- or_connection_t *or_conn=NULL;
+ channel_t *chan = NULL;
crypt_path_t *layer_hint=NULL;
char recognized=0;
int reason;
@@ -184,7 +194,17 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
}
if (recognized) {
- edge_connection_t *conn = relay_lookup_conn(circ, cell, cell_direction,
+ edge_connection_t *conn = NULL;
+
+ if (circ->purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING) {
+ pathbias_check_probe_response(circ, cell);
+
+ /* We need to drop this cell no matter what to avoid code that expects
+ * a certain purpose (such as the hidserv code). */
+ return 0;
+ }
+
+ conn = relay_lookup_conn(circ, cell, cell_direction,
layer_hint);
if (cell_direction == CELL_DIRECTION_OUT) {
++stats_n_relay_cells_delivered;
@@ -213,24 +233,32 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
/* not recognized. pass it on. */
if (cell_direction == CELL_DIRECTION_OUT) {
cell->circ_id = circ->n_circ_id; /* switch it */
- or_conn = circ->n_conn;
+ chan = circ->n_chan;
} else if (! CIRCUIT_IS_ORIGIN(circ)) {
cell->circ_id = TO_OR_CIRCUIT(circ)->p_circ_id; /* switch it */
- or_conn = TO_OR_CIRCUIT(circ)->p_conn;
+ chan = TO_OR_CIRCUIT(circ)->p_chan;
} else {
log_fn(LOG_PROTOCOL_WARN, LD_OR,
"Dropping unrecognized inbound cell on origin circuit.");
- return 0;
+ /* If we see unrecognized cells on path bias testing circs,
+ * it's bad mojo. Those circuits need to die.
+ * XXX: Shouldn't they always die? */
+ if (circ->purpose == CIRCUIT_PURPOSE_PATH_BIAS_TESTING) {
+ TO_ORIGIN_CIRCUIT(circ)->path_state = PATH_STATE_USE_FAILED;
+ return -END_CIRC_REASON_TORPROTOCOL;
+ } else {
+ return 0;
+ }
}
- if (!or_conn) {
+ if (!chan) {
// XXXX Can this splice stuff be done more cleanly?
if (! CIRCUIT_IS_ORIGIN(circ) &&
TO_OR_CIRCUIT(circ)->rend_splice &&
cell_direction == CELL_DIRECTION_OUT) {
or_circuit_t *splice = TO_OR_CIRCUIT(circ)->rend_splice;
tor_assert(circ->purpose == CIRCUIT_PURPOSE_REND_ESTABLISHED);
- tor_assert(splice->_base.purpose == CIRCUIT_PURPOSE_REND_ESTABLISHED);
+ tor_assert(splice->base_.purpose == CIRCUIT_PURPOSE_REND_ESTABLISHED);
cell->circ_id = splice->p_circ_id;
cell->command = CELL_RELAY; /* can't be relay_early anyway */
if ((reason = circuit_receive_relay_cell(cell, TO_CIRCUIT(splice),
@@ -254,7 +282,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
* we might kill the circ before we relay
* the cells. */
- append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0);
+ append_cell_to_circuit_queue(circ, chan, cell, cell_direction, 0);
return 0;
}
@@ -353,13 +381,13 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
cell_direction_t cell_direction,
crypt_path_t *layer_hint, streamid_t on_stream)
{
- or_connection_t *conn; /* where to send the cell */
+ channel_t *chan; /* where to send the cell */
if (cell_direction == CELL_DIRECTION_OUT) {
crypt_path_t *thishop; /* counter for repeated crypts */
- conn = circ->n_conn;
- if (!CIRCUIT_IS_ORIGIN(circ) || !conn) {
- log_warn(LD_BUG,"outgoing relay cell has n_conn==NULL. Dropping.");
+ chan = circ->n_chan;
+ if (!CIRCUIT_IS_ORIGIN(circ) || !chan) {
+ log_warn(LD_BUG,"outgoing relay cell has n_chan==NULL. Dropping.");
return 0; /* just drop it */
}
@@ -388,14 +416,14 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
return 0; /* just drop it */
}
or_circ = TO_OR_CIRCUIT(circ);
- conn = or_circ->p_conn;
+ chan = or_circ->p_chan;
relay_set_digest(or_circ->p_digest, cell);
if (relay_crypt_one_payload(or_circ->p_crypto, cell->payload, 1) < 0)
return -1;
}
++stats_n_relay_cells_relayed;
- append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream);
+ append_cell_to_circuit_queue(circ, chan, cell, cell_direction, on_stream);
return 0;
}
@@ -422,7 +450,7 @@ relay_lookup_conn(circuit_t *circ, cell_t *cell,
for (tmpconn = TO_ORIGIN_CIRCUIT(circ)->p_streams; tmpconn;
tmpconn=tmpconn->next_stream) {
if (rh.stream_id == tmpconn->stream_id &&
- !tmpconn->_base.marked_for_close &&
+ !tmpconn->base_.marked_for_close &&
tmpconn->cpath_layer == layer_hint) {
log_debug(LD_APP,"found conn for stream %d.", rh.stream_id);
return tmpconn;
@@ -432,7 +460,7 @@ relay_lookup_conn(circuit_t *circ, cell_t *cell,
for (tmpconn = TO_OR_CIRCUIT(circ)->n_streams; tmpconn;
tmpconn=tmpconn->next_stream) {
if (rh.stream_id == tmpconn->stream_id &&
- !tmpconn->_base.marked_for_close) {
+ !tmpconn->base_.marked_for_close) {
log_debug(LD_EXIT,"found conn for stream %d.", rh.stream_id);
if (cell_direction == CELL_DIRECTION_OUT ||
connection_edge_is_rendezvous_stream(tmpconn))
@@ -442,7 +470,7 @@ relay_lookup_conn(circuit_t *circ, cell_t *cell,
for (tmpconn = TO_OR_CIRCUIT(circ)->resolving_streams; tmpconn;
tmpconn=tmpconn->next_stream) {
if (rh.stream_id == tmpconn->stream_id &&
- !tmpconn->_base.marked_for_close) {
+ !tmpconn->base_.marked_for_close) {
log_debug(LD_EXIT,"found conn for stream %d.", rh.stream_id);
return tmpconn;
}
@@ -561,15 +589,16 @@ relay_send_command_from_edge(streamid_t stream_id, circuit_t *circ,
geoip_change_dirreq_state(circ->dirreq_id, DIRREQ_TUNNELED,
DIRREQ_END_CELL_SENT);
- if (cell_direction == CELL_DIRECTION_OUT && circ->n_conn) {
+ if (cell_direction == CELL_DIRECTION_OUT && circ->n_chan) {
/* if we're using relaybandwidthrate, this conn wants priority */
- circ->n_conn->client_used = approx_time();
+ channel_timestamp_client(circ->n_chan);
}
if (cell_direction == CELL_DIRECTION_OUT) {
origin_circuit_t *origin_circ = TO_ORIGIN_CIRCUIT(circ);
if (origin_circ->remaining_relay_early_cells > 0 &&
(relay_command == RELAY_COMMAND_EXTEND ||
+ relay_command == RELAY_COMMAND_EXTEND2 ||
cpath_layer != origin_circ->cpath)) {
/* If we've got any relay_early cells left and (we're sending
* an extend cell or we're not talking to the first hop), use
@@ -583,7 +612,8 @@ relay_send_command_from_edge(streamid_t stream_id, circuit_t *circ,
* task 878. */
origin_circ->relay_early_commands[
origin_circ->relay_early_cells_sent++] = relay_command;
- } else if (relay_command == RELAY_COMMAND_EXTEND) {
+ } else if (relay_command == RELAY_COMMAND_EXTEND ||
+ relay_command == RELAY_COMMAND_EXTEND2) {
/* If no RELAY_EARLY cells can be sent over this circuit, log which
* commands have been sent as RELAY_EARLY cells before; helps debug
* task 878. */
@@ -631,16 +661,16 @@ connection_edge_send_command(edge_connection_t *fromconn,
tor_assert(fromconn);
circ = fromconn->on_circuit;
- if (fromconn->_base.marked_for_close) {
+ if (fromconn->base_.marked_for_close) {
log_warn(LD_BUG,
"called on conn that's already marked for close at %s:%d.",
- fromconn->_base.marked_for_close_file,
- fromconn->_base.marked_for_close);
+ fromconn->base_.marked_for_close_file,
+ fromconn->base_.marked_for_close);
return 0;
}
if (!circ) {
- if (fromconn->_base.type == CONN_TYPE_AP) {
+ if (fromconn->base_.type == CONN_TYPE_AP) {
log_info(LD_APP,"no circ. Closing conn.");
connection_mark_unattached_ap(EDGE_TO_ENTRY_CONN(fromconn),
END_STREAM_REASON_INTERNAL);
@@ -685,14 +715,39 @@ connection_ap_process_end_not_open(
relay_header_t *rh, cell_t *cell, origin_circuit_t *circ,
entry_connection_t *conn, crypt_path_t *layer_hint)
{
- struct in_addr in;
node_t *exitrouter;
int reason = *(cell->payload+RELAY_HEADER_SIZE);
- int control_reason = reason | END_STREAM_REASON_FLAG_REMOTE;
+ int control_reason;
edge_connection_t *edge_conn = ENTRY_TO_EDGE_CONN(conn);
(void) layer_hint; /* unused */
- if (rh->length > 0 && edge_reason_is_retriable(reason) &&
+ if (rh->length > 0) {
+ if (reason == END_STREAM_REASON_TORPROTOCOL ||
+ reason == END_STREAM_REASON_INTERNAL ||
+ reason == END_STREAM_REASON_DESTROY) {
+ /* All three of these reasons could mean a failed tag
+ * hit the exit and it complained. Do not probe.
+ * Fail the circuit. */
+ circ->path_state = PATH_STATE_USE_FAILED;
+ return -END_CIRC_REASON_TORPROTOCOL;
+ } else {
+ /* Path bias: If we get a valid reason code from the exit,
+ * it wasn't due to tagging.
+ *
+ * We rely on recognized+digest being strong enough to make
+ * tags unlikely to allow us to get tagged, yet 'recognized'
+ * reason codes here. */
+ pathbias_mark_use_success(circ);
+ }
+ }
+
+ if (rh->length == 0) {
+ reason = END_STREAM_REASON_MISC;
+ }
+
+ control_reason = reason | END_STREAM_REASON_FLAG_REMOTE;
+
+ if (edge_reason_is_retriable(reason) &&
/* avoid retry if rend */
!connection_edge_is_rendezvous_stream(edge_conn)) {
const char *chosen_exit_digest =
@@ -702,48 +757,67 @@ connection_ap_process_end_not_open(
stream_end_reason_to_string(reason));
exitrouter = node_get_mutable_by_id(chosen_exit_digest);
switch (reason) {
- case END_STREAM_REASON_EXITPOLICY:
+ case END_STREAM_REASON_EXITPOLICY: {
+ tor_addr_t addr;
+ tor_addr_make_unspec(&addr);
if (rh->length >= 5) {
- uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+1));
- int ttl;
- if (!addr) {
+ int ttl = -1;
+ tor_addr_make_unspec(&addr);
+ if (rh->length == 5 || rh->length == 9) {
+ tor_addr_from_ipv4n(&addr,
+ get_uint32(cell->payload+RELAY_HEADER_SIZE+1));
+ if (rh->length == 9)
+ ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+5));
+ } else if (rh->length == 17 || rh->length == 21) {
+ tor_addr_from_ipv6_bytes(&addr,
+ (char*)(cell->payload+RELAY_HEADER_SIZE+1));
+ if (rh->length == 21)
+ ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+17));
+ }
+ if (tor_addr_is_null(&addr)) {
log_info(LD_APP,"Address '%s' resolved to 0.0.0.0. Closing,",
safe_str(conn->socks_request->address));
connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
return 0;
}
- if (rh->length >= 9)
- ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+5));
- else
- ttl = -1;
+ if ((tor_addr_family(&addr) == AF_INET && !conn->ipv4_traffic_ok) ||
+ (tor_addr_family(&addr) == AF_INET6 && !conn->ipv6_traffic_ok)) {
+ log_fn(LOG_PROTOCOL_WARN, LD_APP,
+ "Got an EXITPOLICY failure on a connection with a "
+ "mismatched family. Closing.");
+ connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
+ return 0;
+ }
if (get_options()->ClientDNSRejectInternalAddresses &&
- is_internal_IP(addr, 0)) {
+ tor_addr_is_internal(&addr, 0)) {
log_info(LD_APP,"Address '%s' resolved to internal. Closing,",
safe_str(conn->socks_request->address));
connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
return 0;
}
- client_dns_set_addressmap(conn->socks_request->address, addr,
+
+ client_dns_set_addressmap(conn,
+ conn->socks_request->address, &addr,
conn->chosen_exit_name, ttl);
+
+ {
+ char new_addr[TOR_ADDR_BUF_LEN];
+ tor_addr_to_str(new_addr, &addr, sizeof(new_addr), 1);
+ if (strcmp(conn->socks_request->address, new_addr)) {
+ strlcpy(conn->socks_request->address, new_addr,
+ sizeof(conn->socks_request->address));
+ control_event_stream_status(conn, STREAM_EVENT_REMAP, 0);
+ }
+ }
}
/* check if he *ought* to have allowed it */
- if (exitrouter &&
- (rh->length < 5 ||
- (tor_inet_aton(conn->socks_request->address, &in) &&
- !conn->chosen_exit_name))) {
- log_info(LD_APP,
- "Exitrouter %s seems to be more restrictive than its exit "
- "policy. Not using this router as exit for now.",
- node_describe(exitrouter));
- policies_set_node_exitpolicy_to_reject_all(exitrouter);
- }
- /* rewrite it to an IP if we learned one. */
- if (addressmap_rewrite(conn->socks_request->address,
- sizeof(conn->socks_request->address),
- NULL, NULL)) {
- control_event_stream_status(conn, STREAM_EVENT_REMAP, 0);
- }
+
+ adjust_exit_policy_from_exitpolicy_failure(circ,
+ conn,
+ exitrouter,
+ &addr);
+
if (conn->chosen_exit_optional ||
conn->chosen_exit_retries) {
/* stop wanting a specific exit */
@@ -762,6 +836,7 @@ connection_ap_process_end_not_open(
return 0;
/* else, conn will get closed below */
break;
+ }
case END_STREAM_REASON_CONNECTREFUSED:
if (!conn->chosen_exit_optional)
break; /* break means it'll close, below */
@@ -776,9 +851,7 @@ connection_ap_process_end_not_open(
/* We haven't retried too many times; reattach the connection. */
circuit_log_path(LOG_INFO,LD_APP,circ);
/* Mark this circuit "unusable for new streams". */
- /* XXXX024 this is a kludgy way to do this. */
- tor_assert(circ->_base.timestamp_dirty);
- circ->_base.timestamp_dirty -= get_options()->MaxCircuitDirtiness;
+ mark_circuit_unusable_for_new_conns(circ);
if (conn->chosen_exit_optional) {
/* stop wanting a specific exit */
@@ -826,21 +899,102 @@ connection_ap_process_end_not_open(
return 0;
}
+/** Called when we have gotten an END_REASON_EXITPOLICY failure on <b>circ</b>
+ * for <b>conn</b>, while attempting to connect via <b>node</b>. If the node
+ * told us which address it rejected, then <b>addr</b> is that address;
+ * otherwise it is AF_UNSPEC.
+ *
+ * If we are sure the node should have allowed this address, mark the node as
+ * having a reject *:* exit policy. Otherwise, mark the circuit as unusable
+ * for this particular address.
+ **/
+static void
+adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ,
+ entry_connection_t *conn,
+ node_t *node,
+ const tor_addr_t *addr)
+{
+ int make_reject_all = 0;
+ const sa_family_t family = tor_addr_family(addr);
+
+ if (node) {
+ tor_addr_t tmp;
+ int asked_for_family = tor_addr_parse(&tmp, conn->socks_request->address);
+ if (family == AF_UNSPEC) {
+ make_reject_all = 1;
+ } else if (node_exit_policy_is_exact(node, family) &&
+ asked_for_family != -1 && !conn->chosen_exit_name) {
+ make_reject_all = 1;
+ }
+
+ if (make_reject_all) {
+ log_info(LD_APP,
+ "Exitrouter %s seems to be more restrictive than its exit "
+ "policy. Not using this router as exit for now.",
+ node_describe(node));
+ policies_set_node_exitpolicy_to_reject_all(node);
+ }
+ }
+
+ if (family != AF_UNSPEC)
+ addr_policy_append_reject_addr(&circ->prepend_policy, addr);
+}
+
/** Helper: change the socks_request-&gt;address field on conn to the
- * dotted-quad representation of <b>new_addr</b> (given in host order),
+ * dotted-quad representation of <b>new_addr</b>,
* and send an appropriate REMAP event. */
static void
-remap_event_helper(entry_connection_t *conn, uint32_t new_addr)
+remap_event_helper(entry_connection_t *conn, const tor_addr_t *new_addr)
{
- struct in_addr in;
-
- in.s_addr = htonl(new_addr);
- tor_inet_ntoa(&in, conn->socks_request->address,
- sizeof(conn->socks_request->address));
+ tor_addr_to_str(conn->socks_request->address, new_addr,
+ sizeof(conn->socks_request->address),
+ 1);
control_event_stream_status(conn, STREAM_EVENT_REMAP,
REMAP_STREAM_SOURCE_EXIT);
}
+/** Extract the contents of a connected cell in <b>cell</b>, whose relay
+ * header has already been parsed into <b>rh</b>. On success, set
+ * <b>addr_out</b> to the address we're connected to, and <b>ttl_out</b> to
+ * the ttl of that address, in seconds, and return 0. On failure, return
+ * -1. */
+int
+connected_cell_parse(const relay_header_t *rh, const cell_t *cell,
+ tor_addr_t *addr_out, int *ttl_out)
+{
+ uint32_t bytes;
+ const uint8_t *payload = cell->payload + RELAY_HEADER_SIZE;
+
+ tor_addr_make_unspec(addr_out);
+ *ttl_out = -1;
+ if (rh->length == 0)
+ return 0;
+ if (rh->length < 4)
+ return -1;
+ bytes = ntohl(get_uint32(payload));
+
+ /* If bytes is 0, this is maybe a v6 address. Otherwise it's a v4 address */
+ if (bytes != 0) {
+ /* v4 address */
+ tor_addr_from_ipv4h(addr_out, bytes);
+ if (rh->length >= 8) {
+ bytes = ntohl(get_uint32(payload + 4));
+ if (bytes <= INT32_MAX)
+ *ttl_out = bytes;
+ }
+ } else {
+ if (rh->length < 25) /* 4 bytes of 0s, 1 addr, 16 ipv4, 4 ttl. */
+ return -1;
+ if (get_uint8(payload + 4) != 6)
+ return -1;
+ tor_addr_from_ipv6_bytes(addr_out, (char*)(payload + 5));
+ bytes = ntohl(get_uint32(payload + 21));
+ if (bytes <= INT32_MAX)
+ *ttl_out = (int) bytes;
+ }
+ return 0;
+}
+
/** An incoming relay cell has arrived from circuit <b>circ</b> to
* stream <b>conn</b>.
*
@@ -854,7 +1008,7 @@ connection_edge_process_relay_cell_not_open(
edge_connection_t *conn, crypt_path_t *layer_hint)
{
if (rh->command == RELAY_COMMAND_END) {
- if (CIRCUIT_IS_ORIGIN(circ) && conn->_base.type == CONN_TYPE_AP) {
+ if (CIRCUIT_IS_ORIGIN(circ) && conn->base_.type == CONN_TYPE_AP) {
return connection_ap_process_end_not_open(rh, cell,
TO_ORIGIN_CIRCUIT(circ),
EDGE_TO_ENTRY_CONN(conn),
@@ -869,38 +1023,55 @@ connection_edge_process_relay_cell_not_open(
}
}
- if (conn->_base.type == CONN_TYPE_AP &&
+ if (conn->base_.type == CONN_TYPE_AP &&
rh->command == RELAY_COMMAND_CONNECTED) {
+ tor_addr_t addr;
+ int ttl;
entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn);
tor_assert(CIRCUIT_IS_ORIGIN(circ));
- if (conn->_base.state != AP_CONN_STATE_CONNECT_WAIT) {
+ if (conn->base_.state != AP_CONN_STATE_CONNECT_WAIT) {
log_fn(LOG_PROTOCOL_WARN, LD_APP,
"Got 'connected' while not in state connect_wait. Dropping.");
return 0;
}
- conn->_base.state = AP_CONN_STATE_OPEN;
+ conn->base_.state = AP_CONN_STATE_OPEN;
log_info(LD_APP,"'connected' received after %d seconds.",
- (int)(time(NULL) - conn->_base.timestamp_lastread));
- if (rh->length >= 4) {
- uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE));
- int ttl;
- if (!addr || (get_options()->ClientDNSRejectInternalAddresses &&
- is_internal_IP(addr, 0))) {
+ (int)(time(NULL) - conn->base_.timestamp_lastread));
+ if (connected_cell_parse(rh, cell, &addr, &ttl) < 0) {
+ log_fn(LOG_PROTOCOL_WARN, LD_APP,
+ "Got a badly formatted connected cell. Closing.");
+ connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
+ connection_mark_unattached_ap(entry_conn, END_STREAM_REASON_TORPROTOCOL);
+ }
+ if (tor_addr_family(&addr) != AF_UNSPEC) {
+ const sa_family_t family = tor_addr_family(&addr);
+ if (tor_addr_is_null(&addr) ||
+ (get_options()->ClientDNSRejectInternalAddresses &&
+ tor_addr_is_internal(&addr, 0))) {
log_info(LD_APP, "...but it claims the IP address was %s. Closing.",
- fmt_addr32(addr));
+ fmt_addr(&addr));
+ connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
+ connection_mark_unattached_ap(entry_conn,
+ END_STREAM_REASON_TORPROTOCOL);
+ return 0;
+ }
+
+ if ((family == AF_INET && ! entry_conn->ipv4_traffic_ok) ||
+ (family == AF_INET6 && ! entry_conn->ipv6_traffic_ok)) {
+ log_fn(LOG_PROTOCOL_WARN, LD_APP,
+ "Got a connected cell to %s with unsupported address family."
+ " Closing.", fmt_addr(&addr));
connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
connection_mark_unattached_ap(entry_conn,
END_STREAM_REASON_TORPROTOCOL);
return 0;
}
- if (rh->length >= 8)
- ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+4));
- else
- ttl = -1;
- client_dns_set_addressmap(entry_conn->socks_request->address, addr,
+
+ client_dns_set_addressmap(entry_conn,
+ entry_conn->socks_request->address, &addr,
entry_conn->chosen_exit_name, ttl);
- remap_event_helper(entry_conn, addr);
+ remap_event_helper(entry_conn, &addr);
}
circuit_log_path(LOG_INFO,LD_APP,TO_ORIGIN_CIRCUIT(circ));
/* don't send a socks reply to transparent conns */
@@ -944,13 +1115,13 @@ connection_edge_process_relay_cell_not_open(
}
return 0;
}
- if (conn->_base.type == CONN_TYPE_AP &&
+ if (conn->base_.type == CONN_TYPE_AP &&
rh->command == RELAY_COMMAND_RESOLVED) {
int ttl;
int answer_len;
uint8_t answer_type;
entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn);
- if (conn->_base.state != AP_CONN_STATE_RESOLVE_WAIT) {
+ if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) {
log_fn(LOG_PROTOCOL_WARN, LD_APP, "Got a 'resolved' cell while "
"not in state resolve_wait. Dropping.");
return 0;
@@ -990,8 +1161,15 @@ connection_edge_process_relay_cell_not_open(
ttl,
-1);
if (answer_type == RESOLVED_TYPE_IPV4 && answer_len == 4) {
- uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+2));
- remap_event_helper(entry_conn, addr);
+ tor_addr_t addr;
+ tor_addr_from_ipv4n(&addr,
+ get_uint32(cell->payload+RELAY_HEADER_SIZE+2));
+ remap_event_helper(entry_conn, &addr);
+ } else if (answer_type == RESOLVED_TYPE_IPV6 && answer_len == 16) {
+ tor_addr_t addr;
+ tor_addr_from_ipv6_bytes(&addr,
+ (char*)(cell->payload+RELAY_HEADER_SIZE+2));
+ remap_event_helper(entry_conn, &addr);
}
connection_mark_unattached_ap(entry_conn,
END_STREAM_REASON_DONE |
@@ -1001,8 +1179,8 @@ connection_edge_process_relay_cell_not_open(
log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
"Got an unexpected relay command %d, in state %d (%s). Dropping.",
- rh->command, conn->_base.state,
- conn_state_to_string(conn->_base.type, conn->_base.state));
+ rh->command, conn->base_.state,
+ conn_state_to_string(conn->base_.type, conn->base_.state));
return 0; /* for forward compatibility, don't kill the circuit */
// connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
// connection_mark_for_close(conn);
@@ -1067,9 +1245,9 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
* conn points to the recognized stream. */
if (conn && !connection_state_is_open(TO_CONN(conn))) {
- if (conn->_base.type == CONN_TYPE_EXIT &&
- (conn->_base.state == EXIT_CONN_STATE_CONNECTING ||
- conn->_base.state == EXIT_CONN_STATE_RESOLVING) &&
+ if (conn->base_.type == CONN_TYPE_EXIT &&
+ (conn->base_.state == EXIT_CONN_STATE_CONNECTING ||
+ conn->base_.state == EXIT_CONN_STATE_RESOLVING) &&
rh.command == RELAY_COMMAND_DATA) {
/* Allow DATA cells to be delivered to an exit node in state
* EXIT_CONN_STATE_CONNECTING or EXIT_CONN_STATE_RESOLVING.
@@ -1112,7 +1290,7 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
* and linked. */
static uint64_t next_id = 0;
circ->dirreq_id = ++next_id;
- TO_CONN(TO_OR_CIRCUIT(circ)->p_conn)->dirreq_id = circ->dirreq_id;
+ TO_OR_CIRCUIT(circ)->p_chan->dirreq_id = circ->dirreq_id;
}
return connection_exit_begin_conn(cell, circ);
@@ -1168,11 +1346,12 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
return 0;
}
/* XXX add to this log_fn the exit node's nickname? */
- log_info(domain,"%d: end cell (%s) for stream %d. Removing stream.",
- conn->_base.s,
+ log_info(domain,TOR_SOCKET_T_FORMAT": end cell (%s) for stream %d. "
+ "Removing stream.",
+ conn->base_.s,
stream_end_reason_to_string(reason),
conn->stream_id);
- if (conn->_base.type == CONN_TYPE_AP) {
+ if (conn->base_.type == CONN_TYPE_AP) {
entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn);
if (entry_conn->socks_request &&
!entry_conn->socks_request->has_finished)
@@ -1183,16 +1362,17 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
conn->edge_has_sent_end = 1;
if (!conn->end_reason)
conn->end_reason = reason | END_STREAM_REASON_FLAG_REMOTE;
- if (!conn->_base.marked_for_close) {
+ if (!conn->base_.marked_for_close) {
/* only mark it if not already marked. it's possible to
* get the 'end' right around when the client hangs up on us. */
connection_mark_and_flush(TO_CONN(conn));
}
return 0;
- case RELAY_COMMAND_EXTEND: {
+ case RELAY_COMMAND_EXTEND:
+ case RELAY_COMMAND_EXTEND2: {
static uint64_t total_n_extend=0, total_nonearly=0;
total_n_extend++;
- if (conn) {
+ if (rh.stream_id) {
log_fn(LOG_PROTOCOL_WARN, domain,
"'extend' cell received for non-zero stream. Dropping.");
return 0;
@@ -1224,17 +1404,27 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
return circuit_extend(cell, circ);
}
case RELAY_COMMAND_EXTENDED:
+ case RELAY_COMMAND_EXTENDED2:
if (!layer_hint) {
log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
"'extended' unsupported at non-origin. Dropping.");
return 0;
}
log_debug(domain,"Got an extended cell! Yay.");
- if ((reason = circuit_finish_handshake(TO_ORIGIN_CIRCUIT(circ),
- CELL_CREATED,
- cell->payload+RELAY_HEADER_SIZE)) < 0) {
- log_warn(domain,"circuit_finish_handshake failed.");
- return reason;
+ {
+ extended_cell_t extended_cell;
+ if (extended_cell_parse(&extended_cell, rh.command,
+ (const uint8_t*)cell->payload+RELAY_HEADER_SIZE,
+ rh.length)<0) {
+ log_warn(LD_PROTOCOL,
+ "Can't parse EXTENDED cell; killing circuit.");
+ return -END_CIRC_REASON_TORPROTOCOL;
+ }
+ if ((reason = circuit_finish_handshake(TO_ORIGIN_CIRCUIT(circ),
+ &extended_cell.created_cell)) < 0) {
+ log_warn(domain,"circuit_finish_handshake failed.");
+ return reason;
+ }
}
if ((reason=circuit_send_next_onion_skin(TO_ORIGIN_CIRCUIT(circ)))<0) {
log_info(domain,"circuit_send_next_onion_skin() failed.");
@@ -1247,12 +1437,20 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
"'truncate' unsupported at origin. Dropping.");
return 0;
}
- if (circ->n_conn) {
- uint8_t trunc_reason = *(uint8_t*)(cell->payload + RELAY_HEADER_SIZE);
- circuit_clear_cell_queue(circ, circ->n_conn);
- connection_or_send_destroy(circ->n_circ_id, circ->n_conn,
- trunc_reason);
- circuit_set_n_circid_orconn(circ, 0, NULL);
+ if (circ->n_hop) {
+ if (circ->n_chan)
+ log_warn(LD_BUG, "n_chan and n_hop set on the same circuit!");
+ extend_info_free(circ->n_hop);
+ circ->n_hop = NULL;
+ tor_free(circ->n_chan_create_cell);
+ circuit_set_state(circ, CIRCUIT_STATE_OPEN);
+ }
+ if (circ->n_chan) {
+ uint8_t trunc_reason = get_uint8(cell->payload + RELAY_HEADER_SIZE);
+ circuit_clear_cell_queue(circ, circ->n_chan);
+ channel_send_destroy(circ->n_circ_id, circ->n_chan,
+ trunc_reason);
+ circuit_set_n_circid_chan(circ, 0, NULL);
}
log_debug(LD_EXIT, "Processed 'truncate', replying.");
{
@@ -1268,7 +1466,8 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
"'truncated' unsupported at non-origin. Dropping.");
return 0;
}
- circuit_truncated(TO_ORIGIN_CIRCUIT(circ), layer_hint);
+ circuit_truncated(TO_ORIGIN_CIRCUIT(circ), layer_hint,
+ get_uint8(cell->payload + RELAY_HEADER_SIZE));
return 0;
case RELAY_COMMAND_CONNECTED:
if (conn) {
@@ -1284,7 +1483,8 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
if (layer_hint) {
if (layer_hint->package_window + CIRCWINDOW_INCREMENT >
CIRCWINDOW_START_MAX) {
- log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
+ /*XXXX024: Downgrade this back to LOG_PROTOCOL_WARN after a while*/
+ log_fn(LOG_WARN, LD_PROTOCOL,
"Bug/attack: unexpected sendme cell from exit relay. "
"Closing circ.");
return -END_CIRC_REASON_TORPROTOCOL;
@@ -1296,9 +1496,11 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
} else {
if (circ->package_window + CIRCWINDOW_INCREMENT >
CIRCWINDOW_START_MAX) {
- log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
+ /*XXXX024: Downgrade this back to LOG_PROTOCOL_WARN after a while*/
+ log_fn(LOG_WARN, LD_PROTOCOL,
"Bug/attack: unexpected sendme cell from client. "
- "Closing circ.");
+ "Closing circ (window %d).",
+ circ->package_window);
return -END_CIRC_REASON_TORPROTOCOL;
}
circ->package_window += CIRCWINDOW_INCREMENT;
@@ -1406,21 +1608,22 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
size_t bytes_to_process, length;
char payload[CELL_PAYLOAD_SIZE];
circuit_t *circ;
- const unsigned domain = conn->_base.type == CONN_TYPE_AP ? LD_APP : LD_EXIT;
+ const unsigned domain = conn->base_.type == CONN_TYPE_AP ? LD_APP : LD_EXIT;
int sending_from_optimistic = 0;
- const int sending_optimistically =
- conn->_base.type == CONN_TYPE_AP &&
- conn->_base.state != AP_CONN_STATE_OPEN;
entry_connection_t *entry_conn =
- conn->_base.type == CONN_TYPE_AP ? EDGE_TO_ENTRY_CONN(conn) : NULL;
+ conn->base_.type == CONN_TYPE_AP ? EDGE_TO_ENTRY_CONN(conn) : NULL;
+ const int sending_optimistically =
+ entry_conn &&
+ conn->base_.type == CONN_TYPE_AP &&
+ conn->base_.state != AP_CONN_STATE_OPEN;
crypt_path_t *cpath_layer = conn->cpath_layer;
tor_assert(conn);
- if (conn->_base.marked_for_close) {
+ if (conn->base_.marked_for_close) {
log_warn(LD_BUG,
"called on conn that's already marked for close at %s:%d.",
- conn->_base.marked_for_close_file, conn->_base.marked_for_close);
+ conn->base_.marked_for_close_file, conn->base_.marked_for_close);
return 0;
}
@@ -1487,7 +1690,8 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
connection_fetch_from_buf(payload, length, TO_CONN(conn));
}
- log_debug(domain,"(%d) Packaging %d bytes (%d waiting).", conn->_base.s,
+ log_debug(domain,TOR_SOCKET_T_FORMAT": Packaging %d bytes (%d waiting).",
+ conn->base_.s,
(int)length, (int)connection_get_inbuf_len(TO_CONN(conn)));
if (sending_optimistically && !sending_from_optimistic) {
@@ -1553,9 +1757,9 @@ connection_edge_consider_sending_sendme(edge_connection_t *conn)
}
while (conn->deliver_window <= STREAMWINDOW_START - STREAMWINDOW_INCREMENT) {
- log_debug(conn->_base.type == CONN_TYPE_AP ?LD_APP:LD_EXIT,
+ log_debug(conn->base_.type == CONN_TYPE_AP ?LD_APP:LD_EXIT,
"Outbuf %d, Queuing stream sendme.",
- (int)conn->_base.outbuf_flushlen);
+ (int)conn->base_.outbuf_flushlen);
conn->deliver_window += STREAMWINDOW_INCREMENT;
if (connection_edge_send_command(conn, RELAY_COMMAND_SENDME,
NULL, 0) < 0) {
@@ -1587,6 +1791,12 @@ circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
circ, layer_hint);
}
+void
+stream_choice_seed_weak_rng(void)
+{
+ crypto_seed_weak_rng(&stream_choice_rng);
+}
+
/** A helper function for circuit_resume_edge_reading() above.
* The arguments are the same, except that <b>conn</b> is the head
* of a linked list of edge streams that should each be considered.
@@ -1602,16 +1812,23 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
int cells_on_queue;
int cells_per_conn;
edge_connection_t *chosen_stream = NULL;
+ int max_to_package;
+
+ if (first_conn == NULL) {
+ /* Don't bother to try to do the rest of this if there are no connections
+ * to resume. */
+ return 0;
+ }
/* How many cells do we have space for? It will be the minimum of
* the number needed to exhaust the package window, and the minimum
* needed to fill the cell queue. */
- int max_to_package = circ->package_window;
+ max_to_package = circ->package_window;
if (CIRCUIT_IS_ORIGIN(circ)) {
- cells_on_queue = circ->n_conn_cells.n;
+ cells_on_queue = circ->n_chan_cells.n;
} else {
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
- cells_on_queue = or_circ->p_conn_cells.n;
+ cells_on_queue = or_circ->p_chan_cells.n;
}
if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package)
max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue;
@@ -1631,10 +1848,19 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
int num_streams = 0;
for (conn = first_conn; conn; conn = conn->next_stream) {
num_streams++;
- if ((tor_weak_random() % num_streams)==0)
+ if (tor_weak_random_one_in_n(&stream_choice_rng, num_streams)) {
chosen_stream = conn;
+ }
/* Invariant: chosen_stream has been chosen uniformly at random from
- * among the first num_streams streams on first_conn. */
+ * among the first num_streams streams on first_conn.
+ *
+ * (Note that we iterate over every stream on the circuit, so that after
+ * we've considered the first stream, we've chosen it with P=1; and
+ * after we consider the second stream, we've switched to it with P=1/2
+ * and stayed with the first stream with P=1/2; and after we've
+ * considered the third stream, we've switched to it with P=1/3 and
+ * remained with one of the first two streams with P=(2/3), giving each
+ * one P=(1/2)(2/3) )=(1/3).) */
}
}
@@ -1644,7 +1870,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
/* Activate reading starting from the chosen stream */
for (conn=chosen_stream; conn; conn = conn->next_stream) {
/* Start reading for the streams starting from here */
- if (conn->_base.marked_for_close || conn->package_window <= 0)
+ if (conn->base_.marked_for_close || conn->package_window <= 0)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
@@ -1655,7 +1881,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
}
/* Go back and do the ones we skipped, circular-style */
for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
- if (conn->_base.marked_for_close || conn->package_window <= 0)
+ if (conn->base_.marked_for_close || conn->package_window <= 0)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
@@ -1681,7 +1907,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
* package.
*/
for (conn=first_conn; conn; conn=conn->next_stream) {
- if (conn->_base.marked_for_close || conn->package_window <= 0)
+ if (conn->base_.marked_for_close || conn->package_window <= 0)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
int n = cells_per_conn, r;
@@ -1792,10 +2018,10 @@ circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint)
}
#ifdef ACTIVE_CIRCUITS_PARANOIA
-#define assert_active_circuits_ok_paranoid(conn) \
- assert_active_circuits_ok(conn)
+#define assert_cmux_ok_paranoid(chan) \
+ assert_circuit_mux_okay(chan)
#else
-#define assert_active_circuits_ok_paranoid(conn)
+#define assert_cmux_ok_paranoid(chan)
#endif
/** The total number of cells we have allocated from the memory pool. */
@@ -1850,12 +2076,19 @@ packed_cell_free_unchecked(packed_cell_t *cell)
/** Allocate and return a new packed_cell_t. */
static INLINE packed_cell_t *
-packed_cell_alloc(void)
+packed_cell_new(void)
{
++total_cells_allocated;
return mp_pool_get(cell_pool);
}
+/** Return a packed cell used outside by channel_t lower layer */
+void
+packed_cell_free(packed_cell_t *cell)
+{
+ packed_cell_free_unchecked(cell);
+}
+
/** Log current statistics for cell pool allocation at log level
* <b>severity</b>. */
void
@@ -1864,23 +2097,24 @@ dump_cell_pool_usage(int severity)
circuit_t *c;
int n_circs = 0;
int n_cells = 0;
- for (c = _circuit_get_global_list(); c; c = c->next) {
- n_cells += c->n_conn_cells.n;
+ for (c = circuit_get_global_list_(); c; c = c->next) {
+ n_cells += c->n_chan_cells.n;
if (!CIRCUIT_IS_ORIGIN(c))
- n_cells += TO_OR_CIRCUIT(c)->p_conn_cells.n;
+ n_cells += TO_OR_CIRCUIT(c)->p_chan_cells.n;
++n_circs;
}
- log(severity, LD_MM, "%d cells allocated on %d circuits. %d cells leaked.",
- n_cells, n_circs, total_cells_allocated - n_cells);
+ tor_log(severity, LD_MM,
+ "%d cells allocated on %d circuits. %d cells leaked.",
+ n_cells, n_circs, total_cells_allocated - n_cells);
mp_pool_log_status(cell_pool, severity);
}
/** Allocate a new copy of packed <b>cell</b>. */
static INLINE packed_cell_t *
-packed_cell_copy(const cell_t *cell)
+packed_cell_copy(const cell_t *cell, int wide_circ_ids)
{
- packed_cell_t *c = packed_cell_alloc();
- cell_pack(c, cell);
+ packed_cell_t *c = packed_cell_new();
+ cell_pack(c, cell, wide_circ_ids);
c->next = NULL;
return c;
}
@@ -1902,9 +2136,10 @@ cell_queue_append(cell_queue_t *queue, packed_cell_t *cell)
/** Append a newly allocated copy of <b>cell</b> to the end of <b>queue</b> */
void
-cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell)
+cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell,
+ int wide_circ_ids)
{
- packed_cell_t *copy = packed_cell_copy(cell);
+ packed_cell_t *copy = packed_cell_copy(cell, wide_circ_ids);
/* Remember the time when this cell was put in the queue. */
if (get_options()->CellStatistics) {
struct timeval now;
@@ -1978,363 +2213,68 @@ cell_queue_pop(cell_queue_t *queue)
return cell;
}
-/** Return a pointer to the "next_active_on_{n,p}_conn" pointer of <b>circ</b>,
- * depending on whether <b>conn</b> matches n_conn or p_conn. */
-static INLINE circuit_t **
-next_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
-{
- tor_assert(circ);
- tor_assert(conn);
- if (conn == circ->n_conn) {
- return &circ->next_active_on_n_conn;
- } else {
- or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
- tor_assert(conn == orcirc->p_conn);
- return &orcirc->next_active_on_p_conn;
- }
-}
-
-/** Return a pointer to the "prev_active_on_{n,p}_conn" pointer of <b>circ</b>,
- * depending on whether <b>conn</b> matches n_conn or p_conn. */
-static INLINE circuit_t **
-prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
-{
- tor_assert(circ);
- tor_assert(conn);
- if (conn == circ->n_conn) {
- return &circ->prev_active_on_n_conn;
- } else {
- or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
- tor_assert(conn == orcirc->p_conn);
- return &orcirc->prev_active_on_p_conn;
- }
-}
-
-/** Helper for sorting cell_ewma_t values in their priority queue. */
-static int
-compare_cell_ewma_counts(const void *p1, const void *p2)
-{
- const cell_ewma_t *e1=p1, *e2=p2;
- if (e1->cell_count < e2->cell_count)
- return -1;
- else if (e1->cell_count > e2->cell_count)
- return 1;
- else
- return 0;
-}
-
-/** Given a cell_ewma_t, return a pointer to the circuit containing it. */
-static circuit_t *
-cell_ewma_to_circuit(cell_ewma_t *ewma)
-{
- if (ewma->is_for_p_conn) {
- /* This is an or_circuit_t's p_cell_ewma. */
- or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma);
- return TO_CIRCUIT(orcirc);
- } else {
- /* This is some circuit's n_cell_ewma. */
- return SUBTYPE_P(ewma, circuit_t, n_cell_ewma);
- }
-}
-
-/* ==== Functions for scaling cell_ewma_t ====
-
- When choosing which cells to relay first, we favor circuits that have been
- quiet recently. This gives better latency on connections that aren't
- pushing lots of data, and makes the network feel more interactive.
-
- Conceptually, we take an exponentially weighted mean average of the number
- of cells a circuit has sent, and allow active circuits (those with cells to
- relay) to send cells in reverse order of their exponentially-weighted mean
- average (EWMA) cell count. [That is, a cell sent N seconds ago 'counts'
- F^N times as much as a cell sent now, for 0<F<1.0, and we favor the
- circuit that has sent the fewest cells]
-
- If 'double' had infinite precision, we could do this simply by counting a
- cell sent at startup as having weight 1.0, and a cell sent N seconds later
- as having weight F^-N. This way, we would never need to re-scale
- any already-sent cells.
-
- To prevent double from overflowing, we could count a cell sent now as
- having weight 1.0 and a cell sent N seconds ago as having weight F^N.
- This, however, would mean we'd need to re-scale *ALL* old circuits every
- time we wanted to send a cell.
-
- So as a compromise, we divide time into 'ticks' (currently, 10-second
- increments) and say that a cell sent at the start of a current tick is
- worth 1.0, a cell sent N seconds before the start of the current tick is
- worth F^N, and a cell sent N seconds after the start of the current tick is
- worth F^-N. This way we don't overflow, and we don't need to constantly
- rescale.
- */
-
-/** How long does a tick last (seconds)? */
-#define EWMA_TICK_LEN 10
-
-/** The default per-tick scale factor, if it hasn't been overridden by a
- * consensus or a configuration setting. zero means "disabled". */
-#define EWMA_DEFAULT_HALFLIFE 0.0
-
-/** Given a timeval <b>now</b>, compute the cell_ewma tick in which it occurs
- * and the fraction of the tick that has elapsed between the start of the tick
- * and <b>now</b>. Return the former and store the latter in
- * *<b>remainder_out</b>.
- *
- * These tick values are not meant to be shared between Tor instances, or used
- * for other purposes. */
-static unsigned
-cell_ewma_tick_from_timeval(const struct timeval *now,
- double *remainder_out)
-{
- unsigned res = (unsigned) (now->tv_sec / EWMA_TICK_LEN);
- /* rem */
- double rem = (now->tv_sec % EWMA_TICK_LEN) +
- ((double)(now->tv_usec)) / 1.0e6;
- *remainder_out = rem / EWMA_TICK_LEN;
- return res;
-}
-
-/** Compute and return the current cell_ewma tick. */
-unsigned
-cell_ewma_get_tick(void)
-{
- return ((unsigned)approx_time() / EWMA_TICK_LEN);
-}
-
-/** The per-tick scale factor to be used when computing cell-count EWMA
- * values. (A cell sent N ticks before the start of the current tick
- * has value ewma_scale_factor ** N.)
+/**
+ * Update the number of cells available on the circuit's n_chan or p_chan's
+ * circuit mux.
*/
-static double ewma_scale_factor = 0.1;
-/* DOCDOC ewma_enabled */
-static int ewma_enabled = 0;
-
-/*DOCDOC*/
-#define EPSILON 0.00001
-/*DOCDOC*/
-#define LOG_ONEHALF -0.69314718055994529
-
-/** Adjust the global cell scale factor based on <b>options</b> */
-void
-cell_ewma_set_scale_factor(const or_options_t *options,
- const networkstatus_t *consensus)
-{
- int32_t halflife_ms;
- double halflife;
- const char *source;
- if (options && options->CircuitPriorityHalflife >= -EPSILON) {
- halflife = options->CircuitPriorityHalflife;
- source = "CircuitPriorityHalflife in configuration";
- } else if (consensus && (halflife_ms = networkstatus_get_param(
- consensus, "CircuitPriorityHalflifeMsec",
- -1, -1, INT32_MAX)) >= 0) {
- halflife = ((double)halflife_ms)/1000.0;
- source = "CircuitPriorityHalflifeMsec in consensus";
- } else {
- halflife = EWMA_DEFAULT_HALFLIFE;
- source = "Default value";
- }
-
- if (halflife <= EPSILON) {
- /* The cell EWMA algorithm is disabled. */
- ewma_scale_factor = 0.1;
- ewma_enabled = 0;
- log_info(LD_OR,
- "Disabled cell_ewma algorithm because of value in %s",
- source);
- } else {
- /* convert halflife into halflife-per-tick. */
- halflife /= EWMA_TICK_LEN;
- /* compute per-tick scale factor. */
- ewma_scale_factor = exp( LOG_ONEHALF / halflife );
- ewma_enabled = 1;
- log_info(LD_OR,
- "Enabled cell_ewma algorithm because of value in %s; "
- "scale factor is %f per %d seconds",
- source, ewma_scale_factor, EWMA_TICK_LEN);
- }
-}
-
-/** Return the multiplier necessary to convert the value of a cell sent in
- * 'from_tick' to one sent in 'to_tick'. */
-static INLINE double
-get_scale_factor(unsigned from_tick, unsigned to_tick)
-{
- /* This math can wrap around, but that's okay: unsigned overflow is
- well-defined */
- int diff = (int)(to_tick - from_tick);
- return pow(ewma_scale_factor, diff);
-}
-
-/** Adjust the cell count of <b>ewma</b> so that it is scaled with respect to
- * <b>cur_tick</b> */
-static void
-scale_single_cell_ewma(cell_ewma_t *ewma, unsigned cur_tick)
-{
- double factor = get_scale_factor(ewma->last_adjusted_tick, cur_tick);
- ewma->cell_count *= factor;
- ewma->last_adjusted_tick = cur_tick;
-}
-
-/** Adjust the cell count of every active circuit on <b>conn</b> so
- * that they are scaled with respect to <b>cur_tick</b> */
-static void
-scale_active_circuits(or_connection_t *conn, unsigned cur_tick)
-{
-
- double factor = get_scale_factor(
- conn->active_circuit_pqueue_last_recalibrated,
- cur_tick);
- /** Ordinarily it isn't okay to change the value of an element in a heap,
- * but it's okay here, since we are preserving the order. */
- SMARTLIST_FOREACH(conn->active_circuit_pqueue, cell_ewma_t *, e, {
- tor_assert(e->last_adjusted_tick ==
- conn->active_circuit_pqueue_last_recalibrated);
- e->cell_count *= factor;
- e->last_adjusted_tick = cur_tick;
- });
- conn->active_circuit_pqueue_last_recalibrated = cur_tick;
-}
-
-/** Rescale <b>ewma</b> to the same scale as <b>conn</b>, and add it to
- * <b>conn</b>'s priority queue of active circuits */
-static void
-add_cell_ewma_to_conn(or_connection_t *conn, cell_ewma_t *ewma)
-{
- tor_assert(ewma->heap_index == -1);
- scale_single_cell_ewma(ewma,
- conn->active_circuit_pqueue_last_recalibrated);
-
- smartlist_pqueue_add(conn->active_circuit_pqueue,
- compare_cell_ewma_counts,
- STRUCT_OFFSET(cell_ewma_t, heap_index),
- ewma);
-}
-
-/** Remove <b>ewma</b> from <b>conn</b>'s priority queue of active circuits */
-static void
-remove_cell_ewma_from_conn(or_connection_t *conn, cell_ewma_t *ewma)
-{
- tor_assert(ewma->heap_index != -1);
- smartlist_pqueue_remove(conn->active_circuit_pqueue,
- compare_cell_ewma_counts,
- STRUCT_OFFSET(cell_ewma_t, heap_index),
- ewma);
-}
-
-/** Remove and return the first cell_ewma_t from conn's priority queue of
- * active circuits. Requires that the priority queue is nonempty. */
-static cell_ewma_t *
-pop_first_cell_ewma_from_conn(or_connection_t *conn)
-{
- return smartlist_pqueue_pop(conn->active_circuit_pqueue,
- compare_cell_ewma_counts,
- STRUCT_OFFSET(cell_ewma_t, heap_index));
-}
-
-/** Add <b>circ</b> to the list of circuits with pending cells on
- * <b>conn</b>. No effect if <b>circ</b> is already linked. */
void
-make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
+update_circuit_on_cmux_(circuit_t *circ, cell_direction_t direction,
+ const char *file, int lineno)
{
- circuit_t **nextp = next_circ_on_conn_p(circ, conn);
- circuit_t **prevp = prev_circ_on_conn_p(circ, conn);
+ channel_t *chan = NULL;
+ or_circuit_t *or_circ = NULL;
+ circuitmux_t *cmux = NULL;
- if (*nextp && *prevp) {
- /* Already active. */
- return;
- }
-
- assert_active_circuits_ok_paranoid(conn);
+ tor_assert(circ);
- if (! conn->active_circuits) {
- conn->active_circuits = circ;
- *prevp = *nextp = circ;
+ /* Okay, get the channel */
+ if (direction == CELL_DIRECTION_OUT) {
+ chan = circ->n_chan;
} else {
- circuit_t *head = conn->active_circuits;
- circuit_t *old_tail = *prev_circ_on_conn_p(head, conn);
- *next_circ_on_conn_p(old_tail, conn) = circ;
- *nextp = head;
- *prev_circ_on_conn_p(head, conn) = circ;
- *prevp = old_tail;
+ or_circ = TO_OR_CIRCUIT(circ);
+ chan = or_circ->p_chan;
}
- if (circ->n_conn == conn) {
- add_cell_ewma_to_conn(conn, &circ->n_cell_ewma);
- } else {
- or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
- tor_assert(conn == orcirc->p_conn);
- add_cell_ewma_to_conn(conn, &orcirc->p_cell_ewma);
- }
+ tor_assert(chan);
+ tor_assert(chan->cmux);
- assert_active_circuits_ok_paranoid(conn);
-}
+ /* Now get the cmux */
+ cmux = chan->cmux;
-/** Remove <b>circ</b> from the list of circuits with pending cells on
- * <b>conn</b>. No effect if <b>circ</b> is already unlinked. */
-void
-make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
-{
- circuit_t **nextp = next_circ_on_conn_p(circ, conn);
- circuit_t **prevp = prev_circ_on_conn_p(circ, conn);
- circuit_t *next = *nextp, *prev = *prevp;
-
- if (!next && !prev) {
- /* Already inactive. */
+ /* Cmux sanity check */
+ if (! circuitmux_is_circuit_attached(cmux, circ)) {
+ log_warn(LD_BUG, "called on non-attachd circuit from %s:%d",
+ file, lineno);
return;
}
+ tor_assert(circuitmux_attached_circuit_direction(cmux, circ) == direction);
- assert_active_circuits_ok_paranoid(conn);
-
- tor_assert(next && prev);
- tor_assert(*prev_circ_on_conn_p(next, conn) == circ);
- tor_assert(*next_circ_on_conn_p(prev, conn) == circ);
+ assert_cmux_ok_paranoid(chan);
- if (next == circ) {
- conn->active_circuits = NULL;
- } else {
- *prev_circ_on_conn_p(next, conn) = prev;
- *next_circ_on_conn_p(prev, conn) = next;
- if (conn->active_circuits == circ)
- conn->active_circuits = next;
- }
- *prevp = *nextp = NULL;
-
- if (circ->n_conn == conn) {
- remove_cell_ewma_from_conn(conn, &circ->n_cell_ewma);
+ /* Update the number of cells we have for the circuit mux */
+ if (direction == CELL_DIRECTION_OUT) {
+ circuitmux_set_num_cells(cmux, circ, circ->n_chan_cells.n);
} else {
- or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
- tor_assert(conn == orcirc->p_conn);
- remove_cell_ewma_from_conn(conn, &orcirc->p_cell_ewma);
+ circuitmux_set_num_cells(cmux, circ, or_circ->p_chan_cells.n);
}
- assert_active_circuits_ok_paranoid(conn);
+ assert_cmux_ok_paranoid(chan);
}
-/** Remove all circuits from the list of circuits with pending cells on
- * <b>conn</b>. */
+/** Remove all circuits from the cmux on <b>chan</b>. */
void
-connection_or_unlink_all_active_circs(or_connection_t *orconn)
+channel_unlink_all_circuits(channel_t *chan)
{
- circuit_t *head = orconn->active_circuits;
- circuit_t *cur = head;
- if (! head)
- return;
- do {
- circuit_t *next = *next_circ_on_conn_p(cur, orconn);
- *prev_circ_on_conn_p(cur, orconn) = NULL;
- *next_circ_on_conn_p(cur, orconn) = NULL;
- cur = next;
- } while (cur != head);
- orconn->active_circuits = NULL;
-
- SMARTLIST_FOREACH(orconn->active_circuit_pqueue, cell_ewma_t *, e,
- e->heap_index = -1);
- smartlist_clear(orconn->active_circuit_pqueue);
+ tor_assert(chan);
+ tor_assert(chan->cmux);
+
+ circuitmux_detach_all_circuits(chan->cmux);
+ chan->num_n_circuits = 0;
+ chan->num_p_circuits = 0;
}
/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
- * every edge connection that is using <b>circ</b> to write to <b>orconn</b>,
+ * every edge connection that is using <b>circ</b> to write to <b>chan</b>,
* and start or stop reading as appropriate.
*
* If <b>stream_id</b> is nonzero, block only the edge connection whose
@@ -2343,17 +2283,17 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn)
* Returns the number of streams whose status we changed.
*/
static int
-set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
+set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
int block, streamid_t stream_id)
{
edge_connection_t *edge = NULL;
int n = 0;
- if (circ->n_conn == orconn) {
- circ->streams_blocked_on_n_conn = block;
+ if (circ->n_chan == chan) {
+ circ->streams_blocked_on_n_chan = block;
if (CIRCUIT_IS_ORIGIN(circ))
edge = TO_ORIGIN_CIRCUIT(circ)->p_streams;
} else {
- circ->streams_blocked_on_p_conn = block;
+ circ->streams_blocked_on_p_chan = block;
tor_assert(!CIRCUIT_IS_ORIGIN(circ));
edge = TO_OR_CIRCUIT(circ)->n_streams;
}
@@ -2388,58 +2328,51 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
}
/** Pull as many cells as possible (but no more than <b>max</b>) from the
- * queue of the first active circuit on <b>conn</b>, and write them to
- * <b>conn</b>-&gt;outbuf. Return the number of cells written. Advance
+ * queue of the first active circuit on <b>chan</b>, and write them to
+ * <b>chan</b>-&gt;outbuf. Return the number of cells written. Advance
* the active circuit pointer to the next active circuit in the ring. */
int
-connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
- time_t now)
+channel_flush_from_first_active_circuit(channel_t *chan, int max)
{
- int n_flushed;
+ circuitmux_t *cmux = NULL;
+ int n_flushed = 0;
cell_queue_t *queue;
circuit_t *circ;
+ or_circuit_t *or_circ;
int streams_blocked;
-
- /* The current (hi-res) time */
- struct timeval now_hires;
-
- /* The EWMA cell counter for the circuit we're flushing. */
- cell_ewma_t *cell_ewma = NULL;
- double ewma_increment = -1;
-
- circ = conn->active_circuits;
- if (!circ) return 0;
- assert_active_circuits_ok_paranoid(conn);
-
- /* See if we're doing the ewma circuit selection algorithm. */
- if (ewma_enabled) {
- unsigned tick;
- double fractional_tick;
- tor_gettimeofday_cached(&now_hires);
- tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick);
-
- if (tick != conn->active_circuit_pqueue_last_recalibrated) {
- scale_active_circuits(conn, tick);
+ packed_cell_t *cell;
+
+ /* Get the cmux */
+ tor_assert(chan);
+ tor_assert(chan->cmux);
+ cmux = chan->cmux;
+
+ /* Main loop: pick a circuit, send a cell, update the cmux */
+ while (n_flushed < max) {
+ circ = circuitmux_get_first_active_circuit(cmux);
+ /* If it returns NULL, no cells left to send */
+ if (!circ) break;
+ assert_cmux_ok_paranoid(chan);
+
+ if (circ->n_chan == chan) {
+ queue = &circ->n_chan_cells;
+ streams_blocked = circ->streams_blocked_on_n_chan;
+ } else {
+ or_circ = TO_OR_CIRCUIT(circ);
+ tor_assert(or_circ->p_chan == chan);
+ queue = &TO_OR_CIRCUIT(circ)->p_chan_cells;
+ streams_blocked = circ->streams_blocked_on_p_chan;
}
- ewma_increment = pow(ewma_scale_factor, -fractional_tick);
-
- cell_ewma = smartlist_get(conn->active_circuit_pqueue, 0);
- circ = cell_ewma_to_circuit(cell_ewma);
- }
-
- if (circ->n_conn == conn) {
- queue = &circ->n_conn_cells;
- streams_blocked = circ->streams_blocked_on_n_conn;
- } else {
- queue = &TO_OR_CIRCUIT(circ)->p_conn_cells;
- streams_blocked = circ->streams_blocked_on_p_conn;
- }
- tor_assert(*next_circ_on_conn_p(circ,conn));
+ /* Circuitmux told us this was active, so it should have cells */
+ tor_assert(queue->n > 0);
- for (n_flushed = 0; n_flushed < max && queue->head; ) {
- packed_cell_t *cell = cell_queue_pop(queue);
- tor_assert(*next_circ_on_conn_p(circ,conn));
+ /*
+ * Get just one cell here; once we've sent it, that can change the circuit
+ * selection, so we have to loop around for another even if this circuit
+ * has more than one.
+ */
+ cell = cell_queue_pop(queue);
/* Calculate the exact time that this cell has spent in the queue. */
if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) {
@@ -2455,8 +2388,8 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
"Looks like the CellStatistics option was "
"recently enabled.");
} else {
- or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
insertion_time_elem_t *elem = it_queue->first;
+ or_circ = TO_OR_CIRCUIT(circ);
cell_waiting_time =
(uint32_t)((flushed * 10L + SECONDS_IN_A_DAY * 1000L -
elem->insertion_time * 10L) %
@@ -2469,66 +2402,58 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
it_queue->last = NULL;
mp_pool_release(elem);
}
- orcirc->total_cell_waiting_time += cell_waiting_time;
- orcirc->processed_cells++;
+ or_circ->total_cell_waiting_time += cell_waiting_time;
+ or_circ->processed_cells++;
}
}
/* If we just flushed our queue and this circuit is used for a
* tunneled directory request, possibly advance its state. */
- if (queue->n == 0 && TO_CONN(conn)->dirreq_id)
- geoip_change_dirreq_state(TO_CONN(conn)->dirreq_id,
+ if (queue->n == 0 && chan->dirreq_id)
+ geoip_change_dirreq_state(chan->dirreq_id,
DIRREQ_TUNNELED,
DIRREQ_CIRC_QUEUE_FLUSHED);
- connection_write_to_buf(cell->body, CELL_NETWORK_SIZE, TO_CONN(conn));
+ /* Now send the cell */
+ channel_write_packed_cell(chan, cell);
+ cell = NULL;
- packed_cell_free_unchecked(cell);
+ /*
+ * Don't packed_cell_free_unchecked(cell) here because the channel will
+ * do so when it gets out of the channel queue (probably already did, in
+ * which case that was an immediate double-free bug).
+ */
+
+ /* Update the counter */
++n_flushed;
- if (cell_ewma) {
- cell_ewma_t *tmp;
- cell_ewma->cell_count += ewma_increment;
- /* We pop and re-add the cell_ewma_t here, not above, since we need to
- * re-add it immediately to keep the priority queue consistent with
- * the linked-list implementation */
- tmp = pop_first_cell_ewma_from_conn(conn);
- tor_assert(tmp == cell_ewma);
- add_cell_ewma_to_conn(conn, cell_ewma);
- }
- if (!ewma_enabled && circ != conn->active_circuits) {
- /* If this happens, the current circuit just got made inactive by
- * a call in connection_write_to_buf(). That's nothing to worry about:
- * circuit_make_inactive_on_conn() already advanced conn->active_circuits
- * for us.
- */
- assert_active_circuits_ok_paranoid(conn);
- goto done;
- }
- }
- tor_assert(*next_circ_on_conn_p(circ,conn));
- assert_active_circuits_ok_paranoid(conn);
- conn->active_circuits = *next_circ_on_conn_p(circ, conn);
- /* Is the cell queue low enough to unblock all the streams that are waiting
- * to write to this circuit? */
- if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
- set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */
+ /*
+ * Now update the cmux; tell it we've just sent a cell, and how many
+ * we have left.
+ */
+ circuitmux_notify_xmit_cells(cmux, circ, 1);
+ circuitmux_set_num_cells(cmux, circ, queue->n);
+ if (queue->n == 0)
+ log_debug(LD_GENERAL, "Made a circuit inactive.");
+
+ /* Is the cell queue low enough to unblock all the streams that are waiting
+ * to write to this circuit? */
+ if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
+ set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
- /* Did we just run out of cells on this circuit's queue? */
- if (queue->n == 0) {
- log_debug(LD_GENERAL, "Made a circuit inactive.");
- make_circuit_inactive_on_conn(circ, conn);
+ /* If n_flushed < max still, loop around and pick another circuit */
}
- done:
- if (n_flushed)
- conn->timestamp_last_added_nonpadding = now;
+
+ /* Okay, we're done sending now */
+ assert_cmux_ok_paranoid(chan);
+
return n_flushed;
}
-/** Add <b>cell</b> to the queue of <b>circ</b> writing to <b>orconn</b>
+/** Add <b>cell</b> to the queue of <b>circ</b> writing to <b>chan</b>
* transmitting in <b>direction</b>. */
void
-append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
+append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
cell_t *cell, cell_direction_t direction,
streamid_t fromstream)
{
@@ -2538,40 +2463,40 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
return;
if (direction == CELL_DIRECTION_OUT) {
- queue = &circ->n_conn_cells;
- streams_blocked = circ->streams_blocked_on_n_conn;
+ queue = &circ->n_chan_cells;
+ streams_blocked = circ->streams_blocked_on_n_chan;
} else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
- queue = &orcirc->p_conn_cells;
- streams_blocked = circ->streams_blocked_on_p_conn;
+ queue = &orcirc->p_chan_cells;
+ streams_blocked = circ->streams_blocked_on_p_chan;
}
- cell_queue_append_packed_copy(queue, cell);
+ cell_queue_append_packed_copy(queue, cell, chan->wide_circ_ids);
/* If we have too many cells on the circuit, we should stop reading from
* the edge streams for a while. */
if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
- set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */
+ set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */
if (streams_blocked && fromstream) {
/* This edge connection is apparently not blocked; block it. */
- set_streams_blocked_on_circ(circ, orconn, 1, fromstream);
+ set_streams_blocked_on_circ(circ, chan, 1, fromstream);
}
+ update_circuit_on_cmux(circ, direction);
if (queue->n == 1) {
- /* This was the first cell added to the queue. We need to make this
+ /* This was the first cell added to the queue. We just made this
* circuit active. */
log_debug(LD_GENERAL, "Made a circuit active.");
- make_circuit_active_on_conn(circ, orconn);
}
- if (! connection_get_outbuf_len(TO_CONN(orconn))) {
+ if (!channel_has_queued_writes(chan)) {
/* There is no data at all waiting to be sent on the outbuf. Add a
* cell, so that we can notice when it gets flushed, flushed_some can
* get called, and we can start putting more data onto the buffer then.
*/
log_debug(LD_GENERAL, "Primed a buffer.");
- connection_or_flush_from_first_active_circuit(orconn, 1, approx_time());
+ channel_flush_from_first_active_circuit(chan, 1);
}
}
@@ -2635,58 +2560,40 @@ decode_address_from_payload(tor_addr_t *addr_out, const uint8_t *payload,
return payload + 2 + payload[1];
}
-/** Remove all the cells queued on <b>circ</b> for <b>orconn</b>. */
+/** Remove all the cells queued on <b>circ</b> for <b>chan</b>. */
void
-circuit_clear_cell_queue(circuit_t *circ, or_connection_t *orconn)
+circuit_clear_cell_queue(circuit_t *circ, channel_t *chan)
{
cell_queue_t *queue;
- if (circ->n_conn == orconn) {
- queue = &circ->n_conn_cells;
+ cell_direction_t direction;
+
+ if (circ->n_chan == chan) {
+ queue = &circ->n_chan_cells;
+ direction = CELL_DIRECTION_OUT;
} else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
- tor_assert(orcirc->p_conn == orconn);
- queue = &orcirc->p_conn_cells;
+ tor_assert(orcirc->p_chan == chan);
+ queue = &orcirc->p_chan_cells;
+ direction = CELL_DIRECTION_IN;
}
- if (queue->n)
- make_circuit_inactive_on_conn(circ,orconn);
-
+ /* Clear the queue */
cell_queue_clear(queue);
+
+ /* Update the cell counter in the cmux */
+ if (chan->cmux && circuitmux_is_circuit_attached(chan->cmux, circ))
+ update_circuit_on_cmux(circ, direction);
}
-/** Fail with an assert if the active circuits ring on <b>orconn</b> is
- * corrupt. */
+/** Fail with an assert if the circuit mux on chan is corrupt
+ */
void
-assert_active_circuits_ok(or_connection_t *orconn)
+assert_circuit_mux_okay(channel_t *chan)
{
- circuit_t *head = orconn->active_circuits;
- circuit_t *cur = head;
- int n = 0;
- if (! head)
- return;
- do {
- circuit_t *next = *next_circ_on_conn_p(cur, orconn);
- circuit_t *prev = *prev_circ_on_conn_p(cur, orconn);
- cell_ewma_t *ewma;
- tor_assert(next);
- tor_assert(prev);
- tor_assert(*next_circ_on_conn_p(prev, orconn) == cur);
- tor_assert(*prev_circ_on_conn_p(next, orconn) == cur);
- if (orconn == cur->n_conn) {
- ewma = &cur->n_cell_ewma;
- tor_assert(!ewma->is_for_p_conn);
- } else {
- ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma;
- tor_assert(ewma->is_for_p_conn);
- }
- tor_assert(ewma->heap_index != -1);
- tor_assert(ewma == smartlist_get(orconn->active_circuit_pqueue,
- ewma->heap_index));
- n++;
- cur = next;
- } while (cur != head);
-
- tor_assert(n == smartlist_len(orconn->active_circuit_pqueue));
+ tor_assert(chan);
+ tor_assert(chan->cmux);
+
+ circuitmux_assert_okay(chan->cmux);
}
/** Return 1 if we shouldn't restart reading on this circuit, even if
@@ -2696,9 +2603,9 @@ static int
circuit_queue_streams_are_blocked(circuit_t *circ)
{
if (CIRCUIT_IS_ORIGIN(circ)) {
- return circ->streams_blocked_on_n_conn;
+ return circ->streams_blocked_on_n_chan;
} else {
- return circ->streams_blocked_on_p_conn;
+ return circ->streams_blocked_on_p_chan;
}
}