aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Goulet <dgoulet@torproject.org>2021-10-04 10:49:27 -0400
committerDavid Goulet <dgoulet@torproject.org>2021-10-04 10:49:27 -0400
commit1873d4c14c62e8c737e24a60b0aca5a3fcd89eb4 (patch)
tree3b663a7275af1fd441037b942b7b9e05ae8d1d9b
parent1a10948260d915d4982415e37dd6495bca9ef545 (diff)
parent7005046bd27968069dd56cad3fc66d644c7f517b (diff)
downloadtor-1873d4c14c62e8c737e24a60b0aca5a3fcd89eb4.tar.gz
tor-1873d4c14c62e8c737e24a60b0aca5a3fcd89eb4.zip
Merge branch 'tor-gitlab/mr/444'
-rw-r--r--changes/ticket404503
-rw-r--r--src/app/main/main.c4
-rw-r--r--src/core/mainloop/connection.c66
-rw-r--r--src/core/mainloop/mainloop.c7
-rw-r--r--src/core/or/channeltls.c3
-rw-r--r--src/core/or/circuitlist.c17
-rw-r--r--src/core/or/circuitmux_ewma.c26
-rw-r--r--src/core/or/congestion_control_common.c193
-rw-r--r--src/core/or/congestion_control_common.h58
-rw-r--r--src/core/or/congestion_control_flow.c710
-rw-r--r--src/core/or/congestion_control_flow.h48
-rw-r--r--src/core/or/connection_edge.c51
-rw-r--r--src/core/or/connection_or.c7
-rw-r--r--src/core/or/edge_connection_st.h55
-rw-r--r--src/core/or/half_edge_st.h12
-rw-r--r--src/core/or/include.am5
-rw-r--r--src/core/or/lttng_cc.inc166
-rw-r--r--src/core/or/or.h15
-rw-r--r--src/core/or/relay.c62
-rw-r--r--src/core/or/sendme.c28
-rw-r--r--src/core/or/sendme.h2
-rw-r--r--src/core/or/trace_probes_cc.c33
-rw-r--r--src/core/or/trace_probes_cc.h22
-rw-r--r--src/feature/nodelist/networkstatus.c4
-rw-r--r--src/test/test_channeltls.c3
-rw-r--r--src/trunnel/flow_control_cells.c382
-rw-r--r--src/trunnel/flow_control_cells.h120
-rw-r--r--src/trunnel/flow_control_cells.trunnel20
-rw-r--r--src/trunnel/include.am3
29 files changed, 2016 insertions, 109 deletions
diff --git a/changes/ticket40450 b/changes/ticket40450
new file mode 100644
index 0000000000..6753bd04f5
--- /dev/null
+++ b/changes/ticket40450
@@ -0,0 +1,3 @@
+ o Major features (congestion control):
+ - Implement support for flow control over congestion controlled circuits.
+ This work comes from proposal 324. Closes ticket 40450.
diff --git a/src/app/main/main.c b/src/app/main/main.c
index 0959b0db71..fe18ea0524 100644
--- a/src/app/main/main.c
+++ b/src/app/main/main.c
@@ -27,6 +27,8 @@
#include "core/or/channel.h"
#include "core/or/channelpadding.h"
#include "core/or/circuitpadding.h"
+#include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_flow.h"
#include "core/or/circuitlist.h"
#include "core/or/command.h"
#include "core/or/connection_or.h"
@@ -630,6 +632,8 @@ tor_init(int argc, char *argv[])
* until we get a consensus */
channelpadding_new_consensus_params(NULL);
circpad_new_consensus_params(NULL);
+ congestion_control_new_consensus_params(NULL);
+ flow_control_new_consensus_params(NULL);
/* Initialize circuit padding to defaults+torrc until we get a consensus */
circpad_machines_init();
diff --git a/src/core/mainloop/connection.c b/src/core/mainloop/connection.c
index 79e034fb34..9271a70914 100644
--- a/src/core/mainloop/connection.c
+++ b/src/core/mainloop/connection.c
@@ -117,6 +117,7 @@
#include "lib/cc/ctassert.h"
#include "lib/sandbox/sandbox.h"
#include "lib/net/buffers_net.h"
+#include "lib/net/address.h"
#include "lib/tls/tortls.h"
#include "lib/evloop/compat_libevent.h"
#include "lib/compress/compress.h"
@@ -146,6 +147,8 @@
#include "feature/nodelist/routerinfo_st.h"
#include "core/or/socks_request_st.h"
+#include "core/or/congestion_control_flow.h"
+
/**
* On Windows and Linux we cannot reliably bind() a socket to an
* address and port if: 1) There's already a socket bound to wildcard
@@ -612,6 +615,11 @@ entry_connection_new(int type, int socket_family)
entry_conn->entry_cfg.ipv4_traffic = 1;
else if (socket_family == AF_INET6)
entry_conn->entry_cfg.ipv6_traffic = 1;
+
+ /* Initialize the read token bucket to the maximum value which is the same as
+ * no rate limiting. */
+ token_bucket_rw_init(&ENTRY_TO_EDGE_CONN(entry_conn)->bucket, INT32_MAX,
+ INT32_MAX, monotime_coarse_get_stamp());
return entry_conn;
}
@@ -623,6 +631,10 @@ edge_connection_new(int type, int socket_family)
edge_connection_t *edge_conn = tor_malloc_zero(sizeof(edge_connection_t));
tor_assert(type == CONN_TYPE_EXIT);
connection_init(time(NULL), TO_CONN(edge_conn), type, socket_family);
+ /* Initialize the read token bucket to the maximum value which is the same as
+ * no rate limiting. */
+ token_bucket_rw_init(&edge_conn->bucket, INT32_MAX, INT32_MAX,
+ monotime_coarse_get_stamp());
return edge_conn;
}
@@ -3457,6 +3469,19 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
base = get_cell_network_size(or_conn->wide_circ_ids);
}
+ /* Edge connection have their own read bucket due to flow control being able
+ * to set a rate limit for them. However, for exit connections, we still need
+ * to honor the global bucket as well. */
+ if (CONN_IS_EDGE(conn)) {
+ const edge_connection_t *edge_conn = CONST_TO_EDGE_CONN(conn);
+ conn_bucket = token_bucket_rw_get_read(&edge_conn->bucket);
+ if (conn->type == CONN_TYPE_EXIT) {
+ /* Decide between our limit and the global one. */
+ goto end;
+ }
+ return conn_bucket;
+ }
+
if (!connection_is_rate_limited(conn)) {
/* be willing to read on local conns even if our buckets are empty */
return conn_bucket>=0 ? conn_bucket : 1<<14;
@@ -3467,6 +3492,7 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
global_bucket_val = MIN(global_bucket_val, relayed);
}
+ end:
return connection_bucket_get_share(base, priority,
global_bucket_val, conn_bucket);
}
@@ -3644,6 +3670,13 @@ connection_buckets_decrement(connection_t *conn, time_t now,
record_num_bytes_transferred_impl(conn, now, num_read, num_written);
+ /* Edge connection need to decrement the read side of the bucket used by our
+ * congestion control. */
+ if (CONN_IS_EDGE(conn) && num_read > 0) {
+ edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
+ token_bucket_rw_dec(&edge_conn->bucket, num_read, 0);
+ }
+
if (!connection_is_rate_limited(conn))
return; /* local IPs are free */
@@ -3697,14 +3730,16 @@ connection_write_bw_exhausted(connection_t *conn, bool is_global_bw)
void
connection_consider_empty_read_buckets(connection_t *conn)
{
+ int is_global = 1;
const char *reason;
- if (!connection_is_rate_limited(conn))
+ if (CONN_IS_EDGE(conn) &&
+ token_bucket_rw_get_read(&TO_EDGE_CONN(conn)->bucket) <= 0) {
+ reason = "edge connection read bucket exhausted. Pausing.";
+ is_global = false;
+ } else if (!connection_is_rate_limited(conn)) {
return; /* Always okay. */
-
- int is_global = 1;
-
- if (token_bucket_rw_get_read(&global_bucket) <= 0) {
+ } else if (token_bucket_rw_get_read(&global_bucket) <= 0) {
reason = "global read bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
token_bucket_rw_get_read(&global_relayed_bucket) <= 0) {
@@ -3714,8 +3749,9 @@ connection_consider_empty_read_buckets(connection_t *conn)
token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) {
reason = "connection read bucket exhausted. Pausing.";
is_global = false;
- } else
+ } else {
return; /* all good, no need to stop it */
+ }
LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
connection_read_bw_exhausted(conn, is_global);
@@ -3819,6 +3855,10 @@ connection_bucket_refill_single(connection_t *conn, uint32_t now_ts)
or_connection_t *or_conn = TO_OR_CONN(conn);
token_bucket_rw_refill(&or_conn->bucket, now_ts);
}
+
+ if (CONN_IS_EDGE(conn)) {
+ token_bucket_rw_refill(&TO_EDGE_CONN(conn)->bucket, now_ts);
+ }
}
/**
@@ -4556,9 +4596,9 @@ connection_handle_write_impl(connection_t *conn, int force)
!dont_stop_writing) { /* it's done flushing */
if (connection_finished_flushing(conn) < 0) {
/* already marked */
- return -1;
+ goto err;
}
- return 0;
+ goto done;
}
/* Call even if result is 0, since the global write bucket may
@@ -4568,7 +4608,17 @@ connection_handle_write_impl(connection_t *conn, int force)
if (n_read > 0 && connection_is_reading(conn))
connection_consider_empty_read_buckets(conn);
+ done:
+ /* If this is an edge connection with congestion control, check to see
+ * if it is time to send an xon */
+ if (conn_uses_flow_control(conn)) {
+ flow_control_decide_xon(TO_EDGE_CONN(conn), n_written);
+ }
+
return 0;
+
+ err:
+ return -1;
}
/* DOCDOC connection_handle_write */
diff --git a/src/core/mainloop/mainloop.c b/src/core/mainloop/mainloop.c
index 37b53db92a..cd57dea3d4 100644
--- a/src/core/mainloop/mainloop.c
+++ b/src/core/mainloop/mainloop.c
@@ -641,6 +641,13 @@ connection_start_reading,(connection_t *conn))
if (connection_should_read_from_linked_conn(conn))
connection_start_reading_from_linked_conn(conn);
} else {
+ if (CONN_IS_EDGE(conn) && TO_EDGE_CONN(conn)->xoff_received) {
+ /* We should not get called here if we're waiting for an XON, but
+ * belt-and-suspenders */
+ log_notice(LD_NET,
+ "Request to start reading on an edgeconn blocked with XOFF");
+ return;
+ }
if (event_add(conn->read_event, NULL))
log_warn(LD_NET, "Error from libevent setting read event state for %d "
"to watched: %s",
diff --git a/src/core/or/channeltls.c b/src/core/or/channeltls.c
index 481dafef91..9db8e2392d 100644
--- a/src/core/or/channeltls.c
+++ b/src/core/or/channeltls.c
@@ -64,6 +64,7 @@
#include "trunnel/netinfo.h"
#include "core/or/channelpadding.h"
#include "core/or/extendinfo.h"
+#include "core/or/congestion_control_common.h"
#include "core/or/cell_st.h"
#include "core/or/cell_queue_st.h"
@@ -793,7 +794,7 @@ channel_tls_num_cells_writeable_method(channel_t *chan)
cell_network_size = get_cell_network_size(tlschan->conn->wide_circ_ids);
outbuf_len = connection_get_outbuf_len(TO_CONN(tlschan->conn));
/* Get the number of cells */
- n = CEIL_DIV(OR_CONN_HIGHWATER - outbuf_len, cell_network_size);
+ n = CEIL_DIV(or_conn_highwatermark() - outbuf_len, cell_network_size);
if (n < 0) n = 0;
#if SIZEOF_SIZE_T > SIZEOF_INT
if (n > INT_MAX) n = INT_MAX;
diff --git a/src/core/or/circuitlist.c b/src/core/or/circuitlist.c
index 35d810c660..88c4159294 100644
--- a/src/core/or/circuitlist.c
+++ b/src/core/or/circuitlist.c
@@ -2602,6 +2602,7 @@ circuits_handle_oom(size_t current_allocation)
size_t mem_recovered=0;
int n_circuits_killed=0;
int n_dirconns_killed=0;
+ int n_edgeconns_killed = 0;
uint32_t now_ts;
log_notice(LD_GENERAL, "We're low on memory (cell queues total alloc:"
" %"TOR_PRIuSZ" buffer total alloc: %" TOR_PRIuSZ ","
@@ -2668,12 +2669,19 @@ circuits_handle_oom(size_t current_allocation)
if (conn_age < circ->age_tmp) {
break;
}
- if (conn->type == CONN_TYPE_DIR && conn->linked_conn == NULL) {
+ /* Also consider edge connections so we don't accumulate bytes on the
+ * outbuf due to a malicious destination holding off the read on us. */
+ if ((conn->type == CONN_TYPE_DIR && conn->linked_conn == NULL) ||
+ CONN_IS_EDGE(conn)) {
if (!conn->marked_for_close)
connection_mark_for_close(conn);
mem_recovered += single_conn_free_bytes(conn);
- ++n_dirconns_killed;
+ if (conn->type == CONN_TYPE_DIR) {
+ ++n_dirconns_killed;
+ } else {
+ ++n_edgeconns_killed;
+ }
if (mem_recovered >= mem_to_recover)
goto done_recovering_mem;
@@ -2703,11 +2711,12 @@ circuits_handle_oom(size_t current_allocation)
done_recovering_mem:
log_notice(LD_GENERAL, "Removed %"TOR_PRIuSZ" bytes by killing %d circuits; "
"%d circuits remain alive. Also killed %d non-linked directory "
- "connections.",
+ "connections. Killed %d edge connections",
mem_recovered,
n_circuits_killed,
smartlist_len(circlist) - n_circuits_killed,
- n_dirconns_killed);
+ n_dirconns_killed,
+ n_edgeconns_killed);
return mem_recovered;
}
diff --git a/src/core/or/circuitmux_ewma.c b/src/core/or/circuitmux_ewma.c
index 0382e62f75..adf256ab05 100644
--- a/src/core/or/circuitmux_ewma.c
+++ b/src/core/or/circuitmux_ewma.c
@@ -45,7 +45,10 @@
/*** EWMA parameter #defines ***/
/** How long does a tick last (seconds)? */
-#define EWMA_TICK_LEN 10
+#define EWMA_TICK_LEN_DEFAULT 10
+#define EWMA_TICK_LEN_MIN 1
+#define EWMA_TICK_LEN_MAX 600
+static int ewma_tick_len = EWMA_TICK_LEN_DEFAULT;
/** The default per-tick scale factor, if it hasn't been overridden by a
* consensus or a configuration setting. zero means "disabled". */
@@ -148,7 +151,7 @@ cell_ewma_get_tick(void)
monotime_coarse_get(&now);
int32_t msec_diff = monotime_coarse_diff_msec32(&start_of_current_tick,
&now);
- return current_tick_num + msec_diff / (1000*EWMA_TICK_LEN);
+ return current_tick_num + msec_diff / (1000*ewma_tick_len);
}
/**
@@ -527,15 +530,15 @@ cell_ewma_get_current_tick_and_fraction(double *remainder_out)
monotime_coarse_get(&now);
int32_t msec_diff = monotime_coarse_diff_msec32(&start_of_current_tick,
&now);
- if (msec_diff > (1000*EWMA_TICK_LEN)) {
- unsigned ticks_difference = msec_diff / (1000*EWMA_TICK_LEN);
+ if (msec_diff > (1000*ewma_tick_len)) {
+ unsigned ticks_difference = msec_diff / (1000*ewma_tick_len);
monotime_coarse_add_msec(&start_of_current_tick,
&start_of_current_tick,
- ticks_difference * 1000 * EWMA_TICK_LEN);
+ ticks_difference * 1000 * ewma_tick_len);
current_tick_num += ticks_difference;
- msec_diff %= 1000*EWMA_TICK_LEN;
+ msec_diff %= 1000*ewma_tick_len;
}
- *remainder_out = ((double)msec_diff) / (1.0e3 * EWMA_TICK_LEN);
+ *remainder_out = ((double)msec_diff) / (1.0e3 * ewma_tick_len);
return current_tick_num;
}
@@ -605,15 +608,20 @@ cmux_ewma_set_options(const or_options_t *options,
/* Both options and consensus can be NULL. This assures us to either get a
* valid configured value or the default one. */
halflife = get_circuit_priority_halflife(options, consensus, &source);
+ ewma_tick_len = networkstatus_get_param(consensus,
+ "CircuitPriorityTickSecs",
+ EWMA_TICK_LEN_DEFAULT,
+ EWMA_TICK_LEN_MIN,
+ EWMA_TICK_LEN_MAX);
/* convert halflife into halflife-per-tick. */
- halflife /= EWMA_TICK_LEN;
+ halflife /= ewma_tick_len;
/* compute per-tick scale factor. */
ewma_scale_factor = exp(LOG_ONEHALF / halflife);
log_info(LD_OR,
"Enabled cell_ewma algorithm because of value in %s; "
"scale factor is %f per %d seconds",
- source, ewma_scale_factor, EWMA_TICK_LEN);
+ source, ewma_scale_factor, ewma_tick_len);
}
/** Return the multiplier necessary to convert the value of a cell sent in
diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c
index 9db1d7d664..0919f037db 100644
--- a/src/core/or/congestion_control_common.c
+++ b/src/core/or/congestion_control_common.c
@@ -22,28 +22,51 @@
#include "core/or/congestion_control_nola.h"
#include "core/or/congestion_control_westwood.h"
#include "core/or/congestion_control_st.h"
+#include "core/or/trace_probes_cc.h"
#include "lib/time/compat_time.h"
#include "feature/nodelist/networkstatus.h"
-/* Consensus parameter defaults */
+/* Consensus parameter defaults.
+ *
+ * More details for each of the parameters can be found in proposal 324,
+ * section 6.5 including tuning notes. */
#define CIRCWINDOW_INIT (500)
-
-#define CWND_INC_PCT_SS_DFLT (100)
-
-#define SENDME_INC_DFLT (50)
-#define CWND_MIN_DFLT (MAX(100, SENDME_INC_DFLT))
+#define SENDME_INC_DFLT (50)
#define CWND_INC_DFLT (50)
-
+#define CWND_INC_PCT_SS_DFLT (100)
#define CWND_INC_RATE_DFLT (1)
+#define CWND_MAX_DFLT (INT32_MAX)
+#define CWND_MIN_DFLT (MAX(100, SENDME_INC_DFLT))
+
+#define BWE_SENDME_MIN_DFLT (5)
+#define EWMA_CWND_COUNT_DFLT (2)
+/* BDP algorithms for each congestion control algorithms use the piecewise
+ * estimattor. See section 3.1.4 of proposal 324. */
#define WESTWOOD_BDP_ALG BDP_ALG_PIECEWISE
#define VEGAS_BDP_MIX_ALG BDP_ALG_PIECEWISE
#define NOLA_BDP_ALG BDP_ALG_PIECEWISE
-#define EWMA_CWND_COUNT_DFLT 2
+/* Indicate OR connection buffer limitations used to stop or start accepting
+ * cells in its outbuf.
+ *
+ * These watermarks are historical to tor in a sense that they've been used
+ * almost from the genesis point. And were likely defined to fit the bounds of
+ * TLS records of 16KB which would be around 32 cells.
+ *
+ * These are defaults of the consensus parameter "orconn_high" and "orconn_low"
+ * values. */
+#define OR_CONN_HIGHWATER_DFLT (32*1024)
+#define OR_CONN_LOWWATER_DFLT (16*1024)
-#define BWE_SENDME_MIN_DFLT 5
+/* Low and high values of circuit cell queue sizes. They are used to tell when
+ * to start or stop reading on the streams attached on the circuit.
+ *
+ * These are defaults of the consensus parameters "cellq_high" and "cellq_low".
+ */
+#define CELL_QUEUE_LOW_DFLT (10)
+#define CELL_QUEUE_HIGH_DFLT (256)
static uint64_t congestion_control_update_circuit_rtt(congestion_control_t *,
uint64_t);
@@ -52,6 +75,59 @@ static bool congestion_control_update_circuit_bdp(congestion_control_t *,
const crypt_path_t *,
uint64_t, uint64_t);
+/* Consensus parameters cached. The non static ones are extern. */
+static uint32_t cwnd_max = CWND_MAX_DFLT;
+int32_t cell_queue_high = CELL_QUEUE_HIGH_DFLT;
+int32_t cell_queue_low = CELL_QUEUE_LOW_DFLT;
+uint32_t or_conn_highwater = OR_CONN_HIGHWATER_DFLT;
+uint32_t or_conn_lowwater = OR_CONN_LOWWATER_DFLT;
+
+/**
+ * Update global congestion control related consensus parameter values,
+ * every consensus update.
+ */
+void
+congestion_control_new_consensus_params(const networkstatus_t *ns)
+{
+#define CELL_QUEUE_HIGH_MIN (1)
+#define CELL_QUEUE_HIGH_MAX (1000)
+ cell_queue_high = networkstatus_get_param(ns, "cellq_high",
+ CELL_QUEUE_HIGH_DFLT,
+ CELL_QUEUE_HIGH_MIN,
+ CELL_QUEUE_HIGH_MAX);
+
+#define CELL_QUEUE_LOW_MIN (1)
+#define CELL_QUEUE_LOW_MAX (1000)
+ cell_queue_low = networkstatus_get_param(ns, "cellq_low",
+ CELL_QUEUE_LOW_DFLT,
+ CELL_QUEUE_LOW_MIN,
+ CELL_QUEUE_LOW_MAX);
+
+#define OR_CONN_HIGHWATER_MIN (CELL_PAYLOAD_SIZE)
+#define OR_CONN_HIGHWATER_MAX (INT32_MAX)
+ or_conn_highwater =
+ networkstatus_get_param(ns, "orconn_high",
+ OR_CONN_HIGHWATER_DFLT,
+ OR_CONN_HIGHWATER_MIN,
+ OR_CONN_HIGHWATER_MAX);
+
+#define OR_CONN_LOWWATER_MIN (CELL_PAYLOAD_SIZE)
+#define OR_CONN_LOWWATER_MAX (INT32_MAX)
+ or_conn_lowwater =
+ networkstatus_get_param(ns, "orconn_low",
+ OR_CONN_LOWWATER_DFLT,
+ OR_CONN_LOWWATER_MIN,
+ OR_CONN_LOWWATER_MAX);
+
+#define CWND_MAX_MIN 500
+#define CWND_MAX_MAX (INT32_MAX)
+ cwnd_max =
+ networkstatus_get_param(NULL, "cc_cwnd_max",
+ CWND_MAX_DFLT,
+ CWND_MAX_MIN,
+ CWND_MAX_MAX);
+}
+
/**
* Set congestion control parameters on a circuit's congestion
* control object based on values from the consensus.
@@ -225,24 +301,6 @@ congestion_control_free_(congestion_control_t *cc)
}
/**
- * Compute an N-count EWMA, aka N-EWMA. N-EWMA is defined as:
- * EWMA = alpha*value + (1-alpha)*EWMA_prev
- * with alpha = 2/(N+1).
- *
- * This works out to:
- * EWMA = value*2/(N+1) + EMA_prev*(N-1)/(N+1)
- * = (value*2 + EWMA_prev*(N-1))/(N+1)
- */
-static inline uint64_t
-n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N)
-{
- if (prev == 0)
- return curr;
- else
- return (2*curr + (N-1)*prev)/(N+1);
-}
-
-/**
* Enqueue a u64 timestamp to the end of a queue of timestamps.
*/
static inline void
@@ -558,10 +616,16 @@ time_delta_should_use_heuristics(const congestion_control_t *cc)
return false;
}
+static bool is_monotime_clock_broken = false;
+
/**
* Returns true if the monotime delta is 0, or is significantly
* different than the previous delta. Either case indicates
* that the monotime time source stalled or jumped.
+ *
+ * Also caches the clock state in the is_monotime_clock_broken flag,
+ * so we can also provide a is_monotime_clock_reliable() function,
+ * used by flow control rate timing.
*/
static bool
time_delta_stalled_or_jumped(const congestion_control_t *cc,
@@ -573,22 +637,30 @@ time_delta_stalled_or_jumped(const congestion_control_t *cc,
static ratelim_t stall_info_limit = RATELIM_INIT(60);
log_fn_ratelim(&stall_info_limit, LOG_INFO, LD_CIRC,
"Congestion control cannot measure RTT due to monotime stall.");
- return true;
+
+ /* If delta is every 0, the monotime clock has stalled, and we should
+ * not use it anywhere. */
+ is_monotime_clock_broken = true;
+
+ return is_monotime_clock_broken;
}
- /* If the old_delta is 0, we have no previous values. So
- * just assume this one is valid (beause it is non-zero) */
- if (old_delta == 0)
- return false;
+ /* If the old_delta is 0, we have no previous values on this circuit.
+ *
+ * So, return the global monotime status from other circuits, and
+ * do not update.
+ */
+ if (old_delta == 0) {
+ return is_monotime_clock_broken;
+ }
/*
* For the heuristic cases, we need at least a few timestamps,
* to average out any previous partial stalls or jumps. So until
- * than point, let's just delcare these time values "good enough
- * to use".
+ * than point, let's just use the cached status from other circuits.
*/
if (!time_delta_should_use_heuristics(cc)) {
- return false;
+ return is_monotime_clock_broken;
}
/* If old_delta is significantly larger than new_delta, then
@@ -601,7 +673,9 @@ time_delta_stalled_or_jumped(const congestion_control_t *cc,
"), likely due to clock jump.",
new_delta/1000, old_delta/1000);
- return true;
+ is_monotime_clock_broken = true;
+
+ return is_monotime_clock_broken;
}
/* If new_delta is significantly larger than old_delta, then
@@ -613,10 +687,24 @@ time_delta_stalled_or_jumped(const congestion_control_t *cc,
"), likely due to clock jump.",
new_delta/1000, old_delta/1000);
- return true;
+ is_monotime_clock_broken = true;
+
+ return is_monotime_clock_broken;
}
- return false;
+ /* All good! Update cached status, too */
+ is_monotime_clock_broken = false;
+
+ return is_monotime_clock_broken;
+}
+
+/**
+ * Is the monotime clock stalled according to any circuits?
+ */
+bool
+is_monotime_clock_reliable(void)
+{
+ return !is_monotime_clock_broken;
}
/**
@@ -753,7 +841,7 @@ congestion_control_update_circuit_bdp(congestion_control_t *cc,
SMARTLIST_FOREACH(cc->sendme_arrival_timestamps, uint64_t *, t,
tor_free(t));
smartlist_clear(cc->sendme_arrival_timestamps);
- } else if (curr_rtt_usec) {
+ } else if (curr_rtt_usec && is_monotime_clock_reliable()) {
/* Sendme-based BDP will quickly measure BDP in much less than
* a cwnd worth of data when in use (in 2-10 SENDMEs).
*
@@ -903,7 +991,12 @@ congestion_control_update_circuit_bdp(congestion_control_t *cc,
/* We updated BDP this round if either we had a blocked channel, or
* the curr_rtt_usec was not 0. */
- return (blocked_on_chan || curr_rtt_usec != 0);
+ bool ret = (blocked_on_chan || curr_rtt_usec != 0);
+ if (ret) {
+ tor_trace(TR_SUBSYS(cc), TR_EV(bdp_update), circ, cc, curr_rtt_usec,
+ sendme_rate_bdp);
+ }
+ return ret;
}
/**
@@ -914,20 +1007,32 @@ congestion_control_dispatch_cc_alg(congestion_control_t *cc,
const circuit_t *circ,
const crypt_path_t *layer_hint)
{
+ int ret = -END_CIRC_REASON_INTERNAL;
switch (cc->cc_alg) {
case CC_ALG_WESTWOOD:
- return congestion_control_westwood_process_sendme(cc, circ, layer_hint);
+ ret = congestion_control_westwood_process_sendme(cc, circ, layer_hint);
+ break;
case CC_ALG_VEGAS:
- return congestion_control_vegas_process_sendme(cc, circ, layer_hint);
+ ret = congestion_control_vegas_process_sendme(cc, circ, layer_hint);
+ break;
case CC_ALG_NOLA:
- return congestion_control_nola_process_sendme(cc, circ, layer_hint);
+ ret = congestion_control_nola_process_sendme(cc, circ, layer_hint);
+ break;
case CC_ALG_SENDME:
default:
tor_assert(0);
}
- return -END_CIRC_REASON_INTERNAL;
+ if (cc->cwnd > cwnd_max) {
+ static ratelim_t cwnd_limit = RATELIM_INIT(60);
+ log_fn_ratelim(&cwnd_limit, LOG_NOTICE, LD_CIRC,
+ "Congestion control cwnd %"PRIu64" exceeds max %d, clamping.",
+ cc->cwnd, cwnd_max);
+ cc->cwnd = cwnd_max;
+ }
+
+ return ret;
}
diff --git a/src/core/or/congestion_control_common.h b/src/core/or/congestion_control_common.h
index 4193d94cba..01dbc1ceb4 100644
--- a/src/core/or/congestion_control_common.h
+++ b/src/core/or/congestion_control_common.h
@@ -39,6 +39,64 @@ int congestion_control_get_package_window(const circuit_t *,
int sendme_get_inc_count(const circuit_t *, const crypt_path_t *);
bool circuit_sent_cell_for_sendme(const circuit_t *, const crypt_path_t *);
+bool is_monotime_clock_reliable(void);
+
+void congestion_control_new_consensus_params(const networkstatus_t *ns);
+
+/* Ugh, C.. these four are private. Use the getter instead, when
+ * external to the congestion control code. */
+extern uint32_t or_conn_highwater;
+extern uint32_t or_conn_lowwater;
+extern int32_t cell_queue_high;
+extern int32_t cell_queue_low;
+
+/** Stop writing on an orconn when its outbuf is this large */
+static inline uint32_t
+or_conn_highwatermark(void)
+{
+ return or_conn_highwater;
+}
+
+/** Resume writing on an orconn when its outbuf is less than this */
+static inline uint32_t
+or_conn_lowwatermark(void)
+{
+ return or_conn_lowwater;
+}
+
+/** Stop reading on edge connections when we have this many cells
+ * waiting on the appropriate queue. */
+static inline int32_t
+cell_queue_highwatermark(void)
+{
+ return cell_queue_high;
+}
+
+/** Start reading from edge connections again when we get down to this many
+ * cells. */
+static inline int32_t
+cell_queue_lowwatermark(void)
+{
+ return cell_queue_low;
+}
+
+/**
+ * Compute an N-count EWMA, aka N-EWMA. N-EWMA is defined as:
+ * EWMA = alpha*value + (1-alpha)*EWMA_prev
+ * with alpha = 2/(N+1).
+ *
+ * This works out to:
+ * EWMA = value*2/(N+1) + EMA_prev*(N-1)/(N+1)
+ * = (value*2 + EWMA_prev*(N-1))/(N+1)
+ */
+static inline uint64_t
+n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N)
+{
+ if (prev == 0)
+ return curr;
+ else
+ return (2*curr + (N-1)*prev)/(N+1);
+}
/* Private section starts. */
#ifdef TOR_CONGESTION_CONTROL_PRIVATE
diff --git a/src/core/or/congestion_control_flow.c b/src/core/or/congestion_control_flow.c
new file mode 100644
index 0000000000..0c2baa96e2
--- /dev/null
+++ b/src/core/or/congestion_control_flow.c
@@ -0,0 +1,710 @@
+/* Copyright (c) 2019-2021, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file congestion_control_flow.c
+ * \brief Code that implements flow control for congestion controlled
+ * circuits.
+ */
+
+#define TOR_CONGESTION_CONTROL_FLOW_PRIVATE
+
+#include "core/or/or.h"
+
+#include "core/or/relay.h"
+#include "core/mainloop/connection.h"
+#include "core/or/connection_edge.h"
+#include "core/mainloop/mainloop.h"
+#include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_flow.h"
+#include "core/or/congestion_control_st.h"
+#include "core/or/circuitlist.h"
+#include "core/or/trace_probes_cc.h"
+#include "feature/nodelist/networkstatus.h"
+#include "trunnel/flow_control_cells.h"
+
+#include "core/or/connection_st.h"
+#include "core/or/cell_st.h"
+#include "app/config/config.h"
+
+/** Cache consensus parameters */
+static uint32_t xoff_client;
+static uint32_t xoff_exit;
+
+static uint32_t xon_change_pct;
+static uint32_t xon_ewma_cnt;
+static uint32_t xon_rate_bytes;
+
+/* In normal operation, we can get a burst of up to 32 cells before returning
+ * to libevent to flush the outbuf. This is a heuristic from hardcoded values
+ * and strange logic in connection_bucket_get_share(). */
+#define MAX_EXPECTED_CELL_BURST 32
+
+/* The following three are for dropmark rate limiting. They define when we
+ * scale down our XON, XOFF, and xmit byte counts. Early scaling is beneficial
+ * because it limits the ability of spurious XON/XOFF to be sent after large
+ * amounts of data without XON/XOFF. At these limits, after 10MB of data (or
+ * more), an adversary can only inject (log2(10MB)-log2(200*500))*100 ~= 1000
+ * cells of fake XOFF/XON before the xmit byte count will be halved enough to
+ * triggering a limit. */
+#define XON_COUNT_SCALE_AT 200
+#define XOFF_COUNT_SCALE_AT 200
+#define ONE_MEGABYTE (UINT64_C(1) << 20)
+#define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
+
+/**
+ * Return the congestion control object of the given edge connection.
+ *
+ * Returns NULL if the edge connection doesn't have a cpath_layer or not
+ * attached to a circuit. But also if the cpath_layer or circuit doesn't have a
+ * congestion control object.
+ */
+static inline const congestion_control_t *
+edge_get_ccontrol(const edge_connection_t *edge)
+{
+ if (edge->cpath_layer)
+ return edge->cpath_layer->ccontrol;
+ else if (edge->on_circuit)
+ return edge->on_circuit->ccontrol;
+ else
+ return NULL;
+}
+
+/**
+ * Update global congestion control related consensus parameter values, every
+ * consensus update.
+ *
+ * More details for each of the parameters can be found in proposal 324,
+ * section 6.5 including tuning notes.
+ */
+void
+flow_control_new_consensus_params(const networkstatus_t *ns)
+{
+#define CC_XOFF_CLIENT_DFLT 500
+#define CC_XOFF_CLIENT_MIN 1
+#define CC_XOFF_CLIENT_MAX 10000
+ xoff_client = networkstatus_get_param(ns, "cc_xoff_client",
+ CC_XOFF_CLIENT_DFLT,
+ CC_XOFF_CLIENT_MIN,
+ CC_XOFF_CLIENT_MAX)*RELAY_PAYLOAD_SIZE;
+
+#define CC_XOFF_EXIT_DFLT 500
+#define CC_XOFF_EXIT_MIN 1
+#define CC_XOFF_EXIT_MAX 10000
+ xoff_exit = networkstatus_get_param(ns, "cc_xoff_exit",
+ CC_XOFF_EXIT_DFLT,
+ CC_XOFF_EXIT_MIN,
+ CC_XOFF_EXIT_MAX)*RELAY_PAYLOAD_SIZE;
+
+#define CC_XON_CHANGE_PCT_DFLT 25
+#define CC_XON_CHANGE_PCT_MIN 1
+#define CC_XON_CHANGE_PCT_MAX 99
+ xon_change_pct = networkstatus_get_param(ns, "cc_xon_change_pct",
+ CC_XON_CHANGE_PCT_DFLT,
+ CC_XON_CHANGE_PCT_MIN,
+ CC_XON_CHANGE_PCT_MAX);
+
+#define CC_XON_RATE_BYTES_DFLT (500)
+#define CC_XON_RATE_BYTES_MIN (1)
+#define CC_XON_RATE_BYTES_MAX (5000)
+ xon_rate_bytes = networkstatus_get_param(ns, "cc_xon_rate",
+ CC_XON_RATE_BYTES_DFLT,
+ CC_XON_RATE_BYTES_MIN,
+ CC_XON_RATE_BYTES_MAX)*RELAY_PAYLOAD_SIZE;
+
+#define CC_XON_EWMA_CNT_DFLT (2)
+#define CC_XON_EWMA_CNT_MIN (1)
+#define CC_XON_EWMA_CNT_MAX (100)
+ xon_ewma_cnt = networkstatus_get_param(ns, "cc_xon_ewma_cnt",
+ CC_XON_EWMA_CNT_DFLT,
+ CC_XON_EWMA_CNT_MIN,
+ CC_XON_EWMA_CNT_MAX);
+}
+
+/**
+ * Send an XOFF for this stream, and note that we sent one
+ */
+static void
+circuit_send_stream_xoff(edge_connection_t *stream)
+{
+ xoff_cell_t xoff;
+ uint8_t payload[CELL_PAYLOAD_SIZE];
+ ssize_t xoff_size;
+
+ memset(&xoff, 0, sizeof(xoff));
+ memset(payload, 0, sizeof(payload));
+
+ xoff_cell_set_version(&xoff, 0);
+
+ if ((xoff_size = xoff_cell_encode(payload, CELL_PAYLOAD_SIZE, &xoff)) < 0) {
+ log_warn(LD_BUG, "Failed to encode xon cell");
+ return;
+ }
+
+ if (connection_edge_send_command(stream, RELAY_COMMAND_XOFF,
+ (char*)payload, (size_t)xoff_size) == 0) {
+ stream->xoff_sent = true;
+ }
+}
+
+/**
+ * Compute the recent drain rate (write rate) for this edge
+ * connection and return it, in KB/sec (1000 bytes/sec).
+ *
+ * Returns 0 if the monotime clock is busted.
+ */
+static inline uint32_t
+compute_drain_rate(const edge_connection_t *stream)
+{
+ if (BUG(!is_monotime_clock_reliable())) {
+ log_warn(LD_BUG, "Computing drain rate with stalled monotime clock");
+ return 0;
+ }
+
+ uint64_t delta = monotime_absolute_usec() - stream->drain_start_usec;
+
+ if (delta == 0) {
+ log_warn(LD_BUG, "Computing stream drain rate with zero time delta");
+ return 0;
+ }
+
+ /* Overflow checks */
+ if (stream->prev_drained_bytes > INT32_MAX/1000 || /* Intermediate */
+ stream->prev_drained_bytes/delta > INT32_MAX/1000) { /* full value */
+ return INT32_MAX;
+ }
+
+ /* kb/sec = bytes/usec * 1000 usec/msec * 1000 msec/sec * kb/1000bytes */
+ return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta);
+}
+
+/**
+ * Send an XON for this stream, with appropriate advisory rate information.
+ *
+ * Reverts the xoff sent status, and stores the rate information we sent,
+ * in case it changes.
+ */
+static void
+circuit_send_stream_xon(edge_connection_t *stream)
+{
+ xon_cell_t xon;
+ uint8_t payload[CELL_PAYLOAD_SIZE];
+ ssize_t xon_size;
+
+ memset(&xon, 0, sizeof(xon));
+ memset(payload, 0, sizeof(payload));
+
+ xon_cell_set_version(&xon, 0);
+ xon_cell_set_kbps_ewma(&xon, stream->ewma_drain_rate);
+
+ if ((xon_size = xon_cell_encode(payload, CELL_PAYLOAD_SIZE, &xon)) < 0) {
+ log_warn(LD_BUG, "Failed to encode xon cell");
+ return;
+ }
+
+ /* Store the advisory rate information, to send advisory updates if
+ * it changes */
+ stream->ewma_rate_last_sent = stream->ewma_drain_rate;
+
+ if (connection_edge_send_command(stream, RELAY_COMMAND_XON, (char*)payload,
+ (size_t)xon_size) == 0) {
+ /* Revert the xoff sent status, so we can send another one if need be */
+ stream->xoff_sent = false;
+ }
+}
+
+/**
+ * Process a stream XOFF, parsing it, and then stopping reading on
+ * the edge connection.
+ *
+ * Record that we have recieved an xoff, so we know not to resume
+ * reading on this edge conn until we get an XON.
+ *
+ * Returns false if the XOFF did not validate; true if it does.
+ */
+bool
+circuit_process_stream_xoff(edge_connection_t *conn,
+ const crypt_path_t *layer_hint,
+ const cell_t *cell)
+{
+ (void)cell;
+ bool retval = true;
+
+ if (BUG(!conn)) {
+ log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+ "Got XOFF on invalid stream?");
+ return false;
+ }
+
+ /* Make sure this XOFF came from the right hop */
+ if (layer_hint && layer_hint != conn->cpath_layer) {
+ log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+ "Got XOFF from wrong hop.");
+ return false;
+ }
+
+ if (edge_get_ccontrol(conn) == NULL) {
+ log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+ "Got XOFF for non-congestion control circuit");
+ return false;
+ }
+
+ if (conn->xoff_received) {
+ log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+ "Got multiple XOFF on connection");
+ return false;
+ }
+
+ /* If we are near the max, scale everything down */
+ if (conn->num_xoff_recv == XOFF_COUNT_SCALE_AT) {
+ log_info(LD_EDGE, "Scaling down for XOFF count: %d %d %d",
+ conn->total_bytes_xmit,
+ conn->num_xoff_recv,
+ conn->num_xon_recv);
+ conn->total_bytes_xmit /= 2;
+ conn->num_xoff_recv /= 2;
+ conn->num_xon_recv /= 2;
+ }
+
+ conn->num_xoff_recv++;
+
+ /* Client-side check to make sure that XOFF is not sent too early,
+ * for dropmark attacks. The main sidechannel risk is early cells,
+ * but we also check to make sure that we have not received more XOFFs
+ * than could have been generated by the bytes we sent.
+ */
+ if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
+ uint32_t limit = 0;
+
+ /* TODO: This limit technically needs to come from negotiation,
+ * and be bounds checked for sanity, because the other endpoint
+ * may have a different consensus */
+ if (conn->hs_ident)
+ limit = xoff_client;
+ else
+ limit = xoff_exit;
+
+ if (conn->total_bytes_xmit < limit*conn->num_xoff_recv) {
+ log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+ "Got extra XOFF for bytes sent. Got %d, expected max %d",
+ conn->num_xoff_recv, conn->total_bytes_xmit/limit);
+ /* We still process this, because the only dropmark defenses
+ * in C tor are via the vanguards addon's use of the read valid
+ * cells. So just signal that we think this is not valid protocol
+ * data and proceed. */
+ retval = false;
+ }
+ }
+
+ // TODO: Count how many xoffs we have; log if "too many", for shadow
+ // analysis of chatter. Possibly add to extra-info?
+
+ log_info(LD_EDGE, "Got XOFF!");
+ connection_stop_reading(TO_CONN(conn));
+ conn->xoff_received = true;
+
+ return retval;
+}
+
+/**
+ * Process a stream XON, and if it validates, clear the xoff
+ * flag and resume reading on this edge connection.
+ *
+ * Also, use provided rate information to rate limit
+ * reading on this edge (or packagaing from it onto
+ * the circuit), to avoid XON/XOFF chatter.
+ *
+ * Returns true if the XON validates, false otherwise.
+ */
+bool
+circuit_process_stream_xon(edge_connection_t *conn,
+ const crypt_path_t *layer_hint,
+ const cell_t *cell)
+{
+ xon_cell_t *xon;
+ bool retval = true;
+
+ if (BUG(!conn)) {
+ log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+ "Got XON on invalid stream?");
+ return false;
+ }
+
+ /* Make sure this XON came from the right hop */
+ if (layer_hint && layer_hint != conn->cpath_layer) {
+ log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+ "Got XON from wrong hop.");
+ return false;
+ }
+
+ if (edge_get_ccontrol(conn) == NULL) {
+ log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+ "Got XON for non-congestion control circuit");
+ return false;
+ }
+
+ if (xon_cell_parse(&xon, cell->payload+RELAY_HEADER_SIZE,
+ CELL_PAYLOAD_SIZE-RELAY_HEADER_SIZE) < 0) {
+ log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+ "Received malformed XON cell.");
+ return false;
+ }
+
+ /* If we are near the max, scale everything down */
+ if (conn->num_xon_recv == XON_COUNT_SCALE_AT) {
+ log_info(LD_EDGE, "Scaling down for XON count: %d %d %d",
+ conn->total_bytes_xmit,
+ conn->num_xoff_recv,
+ conn->num_xon_recv);
+ conn->total_bytes_xmit /= 2;
+ conn->num_xoff_recv /= 2;
+ conn->num_xon_recv /= 2;
+ }
+
+ conn->num_xon_recv++;
+
+ /* Client-side check to make sure that XON is not sent too early,
+ * for dropmark attacks. The main sidechannel risk is early cells,
+ * but we also check to see that we did not get more XONs than make
+ * sense for the number of bytes we sent.
+ */
+ if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
+ uint32_t limit = 0;
+
+ /* TODO: This limit technically needs to come from negotiation,
+ * and be bounds checked for sanity, because the other endpoint
+ * may have a different consensus */
+ if (conn->hs_ident)
+ limit = MIN(xoff_client, xon_rate_bytes);
+ else
+ limit = MIN(xoff_exit, xon_rate_bytes);
+
+ if (conn->total_bytes_xmit < limit*conn->num_xon_recv) {
+ log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+ "Got extra XON for bytes sent. Got %d, expected max %d",
+ conn->num_xon_recv, conn->total_bytes_xmit/limit);
+
+ /* We still process this, because the only dropmark defenses
+ * in C tor are via the vanguards addon's use of the read valid
+ * cells. So just signal that we think this is not valid protocol
+ * data and proceed. */
+ retval = false;
+ }
+ }
+
+ log_info(LD_EDGE, "Got XON: %d", xon->kbps_ewma);
+
+ /* Adjust the token bucket of this edge connection with the drain rate in
+ * the XON. Rate is in bytes from kilobit (kpbs). */
+ uint64_t rate = xon_cell_get_kbps_ewma(xon) * 1000;
+ if (rate == 0 || INT32_MAX < rate) {
+ /* No rate. */
+ rate = INT32_MAX;
+ }
+ token_bucket_rw_adjust(&conn->bucket, (uint32_t) rate, (uint32_t) rate);
+
+ if (conn->xoff_received) {
+ /* Clear the fact that we got an XOFF, so that this edge can
+ * start and stop reading normally */
+ conn->xoff_received = false;
+ connection_start_reading(TO_CONN(conn));
+ }
+
+ xon_cell_free(xon);
+
+ return retval;
+}
+
+/**
+ * Called from sendme_stream_data_received(), when data arrives
+ * from a circuit to our edge's outbuf, to decide if we need to send
+ * an XOFF.
+ *
+ * Returns the amount of cells remaining until the buffer is full, at
+ * which point it sends an XOFF, and returns 0.
+ *
+ * Returns less than 0 if we have queued more than a congestion window
+ * worth of data and need to close the circuit.
+ */
+int
+flow_control_decide_xoff(edge_connection_t *stream)
+{
+ size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
+ uint32_t buffer_limit_xoff = 0;
+
+ if (BUG(edge_get_ccontrol(stream) == NULL)) {
+ log_err(LD_BUG, "Flow control called for non-congestion control circuit");
+ return -1;
+ }
+
+ /* Onion services and clients are typically localhost edges, so they
+ * need different buffering limits than exits do */
+ if (TO_CONN(stream)->type == CONN_TYPE_AP || stream->hs_ident != NULL) {
+ buffer_limit_xoff = xoff_client;
+ } else {
+ buffer_limit_xoff = xoff_exit;
+ }
+
+ if (total_buffered > buffer_limit_xoff) {
+ if (!stream->xoff_sent) {
+ log_info(LD_EDGE, "Sending XOFF: %ld %d",
+ total_buffered, buffer_limit_xoff);
+ tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream);
+
+ circuit_send_stream_xoff(stream);
+
+ /* Clear the drain rate. It is considered wrong if we
+ * got all the way to XOFF */
+ stream->ewma_drain_rate = 0;
+ }
+ }
+
+ /* If the outbuf has accumulated more than the expected burst limit of
+ * cells, then assume it is not draining, and call decide_xon. We must
+ * do this because writes only happen when the socket unblocks, so
+ * may not otherwise notice accumulation of data in the outbuf for
+ * advisory XONs. */
+ if (total_buffered > MAX_EXPECTED_CELL_BURST*RELAY_PAYLOAD_SIZE) {
+ flow_control_decide_xon(stream, 0);
+ }
+
+ /* Flow control always takes more data; we rely on the oomkiller to
+ * handle misbehavior. */
+ return 0;
+}
+
+/**
+ * Returns true if the stream's drain rate has changed significantly.
+ *
+ * Returns false if the monotime clock is stalled, or if we have
+ * no previous drain rate information.
+ */
+static bool
+stream_drain_rate_changed(const edge_connection_t *stream)
+{
+ if (!is_monotime_clock_reliable()) {
+ return false;
+ }
+
+ if (!stream->ewma_rate_last_sent) {
+ return false;
+ }
+
+ if (stream->ewma_drain_rate >
+ (100+(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
+ return true;
+ }
+
+ if (stream->ewma_drain_rate <
+ (100-(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
+ return true;
+ }
+
+ return false;
+}
+
+/**
+ * Called whenever we drain an edge connection outbuf by writing on
+ * its socket, to decide if it is time to send an xon.
+ *
+ * The n_written parameter tells us how many bytes we have written
+ * this time, which is used to compute the advisory drain rate fields.
+ */
+void
+flow_control_decide_xon(edge_connection_t *stream, size_t n_written)
+{
+ size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
+
+ /* Bounds check the number of drained bytes, and scale */
+ if (stream->drained_bytes >= UINT32_MAX - n_written) {
+ /* Cut the bytes in half, and move the start time up halfway to now
+ * (if we have one). */
+ stream->drained_bytes /= 2;
+
+ if (stream->drain_start_usec) {
+ uint64_t now = monotime_absolute_usec();
+
+ stream->drain_start_usec = now - (now-stream->drain_start_usec)/2;
+ }
+ }
+
+ /* Accumulate drained bytes since last rate computation */
+ stream->drained_bytes += n_written;
+
+ tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written);
+
+ /* Check for bad monotime clock and bytecount wrap */
+ if (!is_monotime_clock_reliable()) {
+ /* If the monotime clock ever goes wrong, the safest thing to do
+ * is just clear our short-term rate info and wait for the clock to
+ * become reliable again.. */
+ stream->drain_start_usec = 0;
+ stream->drained_bytes = 0;
+ } else {
+ /* If we have no drain start timestamp, and we still have
+ * remaining buffer, start the buffering counter */
+ if (!stream->drain_start_usec && total_buffered > 0) {
+ log_debug(LD_EDGE, "Began edge buffering: %d %d %ld",
+ stream->ewma_rate_last_sent,
+ stream->ewma_drain_rate,
+ total_buffered);
+ tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start),
+ stream);
+ stream->drain_start_usec = monotime_absolute_usec();
+ stream->drained_bytes = 0;
+ }
+ }
+
+ if (stream->drain_start_usec) {
+ /* If we have spent enough time in a queued state, update our drain
+ * rate. */
+ if (stream->drained_bytes > xon_rate_bytes) {
+ /* No previous drained bytes means it is the first time we are computing
+ * it so use the value we just drained onto the socket as a baseline. It
+ * won't be accurate but it will be a start towards the right value.
+ *
+ * We have to do this in order to have a drain rate else we could be
+ * sending a drain rate of 0 in an XON which would be undesirable and
+ * basically like sending an XOFF. */
+ if (stream->prev_drained_bytes == 0) {
+ stream->prev_drained_bytes = stream->drained_bytes;
+ }
+ uint32_t drain_rate = compute_drain_rate(stream);
+ /* Once the drain rate has been computed, note how many bytes we just
+ * drained so it can be used at the next calculation. We do this here
+ * because it gets reset once the rate is changed. */
+ stream->prev_drained_bytes = stream->drained_bytes;
+
+ if (drain_rate) {
+ stream->ewma_drain_rate =
+ (uint32_t)n_count_ewma(drain_rate,
+ stream->ewma_drain_rate,
+ xon_ewma_cnt);
+ log_debug(LD_EDGE, "Updating drain rate: %d %d %ld",
+ drain_rate,
+ stream->ewma_drain_rate,
+ total_buffered);
+ tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update),
+ stream, drain_rate);
+ /* Reset recent byte counts. This prevents us from sending advisory
+ * XONs more frequent than every xon_rate_bytes. */
+ stream->drained_bytes = 0;
+ stream->drain_start_usec = 0;
+ }
+ }
+ }
+
+ /* If we don't have an XOFF outstanding, consider updating an
+ * old rate */
+ if (!stream->xoff_sent) {
+ if (stream_drain_rate_changed(stream)) {
+ /* If we are still buffering and the rate changed, update
+ * advisory XON */
+ log_info(LD_EDGE, "Sending rate-change XON: %d %d %ld",
+ stream->ewma_rate_last_sent,
+ stream->ewma_drain_rate,
+ total_buffered);
+ tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream);
+ circuit_send_stream_xon(stream);
+ }
+ } else if (total_buffered == 0) {
+ log_info(LD_EDGE, "Sending XON: %d %d %ld",
+ stream->ewma_rate_last_sent,
+ stream->ewma_drain_rate,
+ total_buffered);
+ tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream);
+ circuit_send_stream_xon(stream);
+ }
+
+ /* If the buffer has fully emptied, clear the drain timestamp,
+ * so we can total only bytes drained while outbuf is 0. */
+ if (total_buffered == 0) {
+ stream->drain_start_usec = 0;
+
+ /* After we've spent 'xon_rate_bytes' with the queue fully drained,
+ * double any rate we sent. */
+ if (stream->drained_bytes >= xon_rate_bytes &&
+ stream->ewma_rate_last_sent) {
+ stream->ewma_drain_rate = MIN(INT32_MAX, 2*stream->ewma_drain_rate);
+
+ log_debug(LD_EDGE,
+ "Queue empty for xon_rate_limit bytes: %d %d",
+ stream->ewma_rate_last_sent,
+ stream->ewma_drain_rate);
+ tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream);
+ /* Resetting the drained bytes count. We need to keep its value as a
+ * previous so the drain rate calculation takes into account what was
+ * actually drain the last time. */
+ stream->prev_drained_bytes = stream->drained_bytes;
+ stream->drained_bytes = 0;
+ }
+ }
+
+ return;
+}
+
+/**
+ * Note that we packaged some data on this stream. Used to enforce
+ * client-side dropmark limits
+ */
+void
+flow_control_note_sent_data(edge_connection_t *stream, size_t len)
+{
+ /* If we are near the max, scale everything down */
+ if (stream->total_bytes_xmit >= TOTAL_XMIT_SCALE_AT-len) {
+ log_info(LD_EDGE, "Scaling down for flow control xmit bytes:: %d %d %d",
+ stream->total_bytes_xmit,
+ stream->num_xoff_recv,
+ stream->num_xon_recv);
+
+ stream->total_bytes_xmit /= 2;
+ stream->num_xoff_recv /= 2;
+ stream->num_xon_recv /= 2;
+ }
+
+ stream->total_bytes_xmit += len;
+}
+
+/** Returns true if an edge connection uses flow control */
+bool
+edge_uses_flow_control(const edge_connection_t *stream)
+{
+ bool ret = (stream->on_circuit && stream->on_circuit->ccontrol) ||
+ (stream->cpath_layer && stream->cpath_layer->ccontrol);
+
+ /* All circuits with congestion control use flow control */
+ return ret;
+}
+
+/**
+ * Returns the max RTT for the circuit that carries this stream,
+ * as observed by congestion control.
+ */
+uint64_t
+edge_get_max_rtt(const edge_connection_t *stream)
+{
+ if (stream->on_circuit && stream->on_circuit->ccontrol)
+ return stream->on_circuit->ccontrol->max_rtt_usec;
+ else if (stream->cpath_layer && stream->cpath_layer->ccontrol)
+ return stream->cpath_layer->ccontrol->max_rtt_usec;
+
+ return 0;
+}
+
+/** Returns true if a connection is an edge conn that uses flow control */
+bool
+conn_uses_flow_control(connection_t *conn)
+{
+ bool ret = false;
+
+ if (CONN_IS_EDGE(conn)) {
+ edge_connection_t *edge = TO_EDGE_CONN(conn);
+
+ if (edge_uses_flow_control(edge)) {
+ ret = true;
+ }
+ }
+
+ return ret;
+}
+
diff --git a/src/core/or/congestion_control_flow.h b/src/core/or/congestion_control_flow.h
new file mode 100644
index 0000000000..6c318027ea
--- /dev/null
+++ b/src/core/or/congestion_control_flow.h
@@ -0,0 +1,48 @@
+/* Copyright (c) 2019-2021, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file congestion_control_flow.h
+ * \brief APIs for stream flow control on congestion controlled circuits.
+ **/
+
+#ifndef TOR_CONGESTION_CONTROL_FLOW_H
+#define TOR_CONGESTION_CONTROL_FLOW_H
+
+#include "core/or/crypt_path_st.h"
+#include "core/or/circuit_st.h"
+#include "core/or/edge_connection_st.h"
+
+void flow_control_new_consensus_params(const struct networkstatus_t *);
+
+bool circuit_process_stream_xoff(edge_connection_t *conn,
+ const crypt_path_t *layer_hint,
+ const cell_t *cell);
+bool circuit_process_stream_xon(edge_connection_t *conn,
+ const crypt_path_t *layer_hint,
+ const cell_t *cell);
+
+int flow_control_decide_xoff(edge_connection_t *stream);
+void flow_control_decide_xon(edge_connection_t *stream, size_t n_written);
+
+void flow_control_note_sent_data(edge_connection_t *stream, size_t len);
+
+bool edge_uses_flow_control(const edge_connection_t *stream);
+
+bool conn_uses_flow_control(connection_t *stream);
+
+uint64_t edge_get_max_rtt(const edge_connection_t *);
+
+/* Private section starts. */
+#ifdef TOR_CONGESTION_CONTROL_FLOW_PRIVATE
+
+/*
+ * Unit tests declaractions.
+ */
+#ifdef TOR_UNIT_TESTS
+
+#endif /* defined(TOR_UNIT_TESTS) */
+
+#endif /* defined(TOR_CONGESTION_CONTROL_FLOW_PRIVATE) */
+
+#endif /* !defined(TOR_CONGESTION_CONTROL_FLOW_H) */
diff --git a/src/core/or/connection_edge.c b/src/core/or/connection_edge.c
index 6f6f22a0d4..d4d9d2f759 100644
--- a/src/core/or/connection_edge.c
+++ b/src/core/or/connection_edge.c
@@ -69,6 +69,8 @@
#include "core/or/circuituse.h"
#include "core/or/circuitpadding.h"
#include "core/or/connection_edge.h"
+#include "core/or/congestion_control_flow.h"
+#include "core/or/circuitstats.h"
#include "core/or/connection_or.h"
#include "core/or/extendinfo.h"
#include "core/or/policies.h"
@@ -614,20 +616,39 @@ connection_half_edge_add(const edge_connection_t *conn,
half_conn->stream_id = conn->stream_id;
- // How many sendme's should I expect?
- half_conn->sendmes_pending =
- (STREAMWINDOW_START-conn->package_window)/STREAMWINDOW_INCREMENT;
-
// Is there a connected cell pending?
half_conn->connected_pending = conn->base_.state ==
AP_CONN_STATE_CONNECT_WAIT;
- /* Data should only arrive if we're not waiting on a resolved cell.
- * It can arrive after waiting on connected, because of optimistic
- * data. */
- if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) {
- // How many more data cells can arrive on this id?
- half_conn->data_pending = conn->deliver_window;
+ if (edge_uses_flow_control(conn)) {
+ /* If the edge uses the new congestion control flow control, we must use
+ * time-based limits on half-edge activity. */
+ uint64_t timeout_usec = (uint64_t)(get_circuit_build_timeout_ms()*1000);
+ half_conn->used_ccontrol = 1;
+
+ /* If this is an onion service circuit, double the CBT as an approximate
+ * value for the other half of the circuit */
+ if (conn->hs_ident) {
+ timeout_usec *= 2;
+ }
+
+ /* The stream should stop seeing any use after the larger of the circuit
+ * RTT and the overall circuit build timeout */
+ half_conn->end_ack_expected_usec = MAX(timeout_usec,
+ edge_get_max_rtt(conn)) +
+ monotime_absolute_usec();
+ } else {
+ // How many sendme's should I expect?
+ half_conn->sendmes_pending =
+ (STREAMWINDOW_START-conn->package_window)/STREAMWINDOW_INCREMENT;
+
+ /* Data should only arrive if we're not waiting on a resolved cell.
+ * It can arrive after waiting on connected, because of optimistic
+ * data. */
+ if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) {
+ // How many more data cells can arrive on this id?
+ half_conn->data_pending = conn->deliver_window;
+ }
}
insert_at = smartlist_bsearch_idx(circ->half_streams, &half_conn->stream_id,
@@ -688,6 +709,12 @@ connection_half_edge_is_valid_data(const smartlist_t *half_conns,
if (!half)
return 0;
+ if (half->used_ccontrol) {
+ if (monotime_absolute_usec() > half->end_ack_expected_usec)
+ return 0;
+ return 1;
+ }
+
if (half->data_pending > 0) {
half->data_pending--;
return 1;
@@ -740,6 +767,10 @@ connection_half_edge_is_valid_sendme(const smartlist_t *half_conns,
if (!half)
return 0;
+ /* congestion control edges don't use sendmes */
+ if (half->used_ccontrol)
+ return 0;
+
if (half->sendmes_pending > 0) {
half->sendmes_pending--;
return 1;
diff --git a/src/core/or/connection_or.c b/src/core/or/connection_or.c
index dd31638eb3..db9f93e6f6 100644
--- a/src/core/or/connection_or.c
+++ b/src/core/or/connection_or.c
@@ -65,6 +65,7 @@
#include "core/or/scheduler.h"
#include "feature/nodelist/torcert.h"
#include "core/or/channelpadding.h"
+#include "core/or/congestion_control_common.h"
#include "feature/dirauth/authmode.h"
#include "feature/hs/hs_service.h"
@@ -636,7 +637,7 @@ connection_or_flushed_some(or_connection_t *conn)
/* If we're under the low water mark, add cells until we're just over the
* high water mark. */
datalen = connection_get_outbuf_len(TO_CONN(conn));
- if (datalen < OR_CONN_LOWWATER) {
+ if (datalen < or_conn_lowwatermark()) {
/* Let the scheduler know */
scheduler_channel_wants_writes(TLS_CHAN_TO_BASE(conn->chan));
}
@@ -660,9 +661,9 @@ connection_or_num_cells_writeable(or_connection_t *conn)
* used to trigger when to start writing after we've stopped.
*/
datalen = connection_get_outbuf_len(TO_CONN(conn));
- if (datalen < OR_CONN_HIGHWATER) {
+ if (datalen < or_conn_highwatermark()) {
cell_network_size = get_cell_network_size(conn->wide_circ_ids);
- n = CEIL_DIV(OR_CONN_HIGHWATER - datalen, cell_network_size);
+ n = CEIL_DIV(or_conn_highwatermark() - datalen, cell_network_size);
}
return n;
diff --git a/src/core/or/edge_connection_st.h b/src/core/or/edge_connection_st.h
index 0120c3df25..dab32fc8d0 100644
--- a/src/core/or/edge_connection_st.h
+++ b/src/core/or/edge_connection_st.h
@@ -15,6 +15,7 @@
#include "core/or/or.h"
#include "core/or/connection_st.h"
+#include "lib/evloop/token_bucket.h"
/** Subtype of connection_t for an "edge connection" -- that is, an entry (ap)
* connection, or an exit. */
@@ -73,6 +74,60 @@ struct edge_connection_t {
* that's going away and being used on channels instead. We still tag
* edge connections with dirreq_id from circuits, so it's copied here. */
uint64_t dirreq_id;
+
+ /* The following are flow control fields */
+
+ /** Used for rate limiting the read side of this edge connection when
+ * congestion control is enabled on its circuit. The XON cell ewma_drain_rate
+ * parameter is used to set the bucket limits. */
+ token_bucket_rw_t bucket;
+
+ /**
+ * Monotime timestamp of the last time we sent a flow control message
+ * for this edge, used to compute advisory rates */
+ uint64_t drain_start_usec;
+
+ /**
+ * Number of bytes written since we either emptied our buffers,
+ * or sent an advisory drate rate. Can wrap, buf if so,
+ * we must reset the usec timestamp above. (Or make this u64, idk).
+ */
+ uint32_t drained_bytes;
+ uint32_t prev_drained_bytes;
+
+ /**
+ * N_EWMA of the drain rate of writes on this edge conn
+ * while buffers were present.
+ */
+ uint32_t ewma_drain_rate;
+
+ /**
+ * The ewma drain rate the last time we sent an xon.
+ */
+ uint32_t ewma_rate_last_sent;
+
+ /**
+ * The following fields are used to count the total bytes sent on this
+ * stream, and compare them to the number of XON and XOFFs recieved, so
+ * that clients can check rate limits of XOFF/XON to prevent dropmark
+ * attacks. */
+ uint32_t total_bytes_xmit;
+
+ /** Number of XOFFs received */
+ uint8_t num_xoff_recv;
+
+ /** Number of XONs received */
+ uint8_t num_xon_recv;
+
+ /**
+ * Flag that tells us if an XOFF has been sent; cleared when we send an XON.
+ * Used to avoid sending multiple */
+ uint8_t xoff_sent : 1;
+
+ /** Flag that tells us if an XOFF has been received; cleared when we get
+ * an XON. Used to ensure that this edge keeps reads on its edge socket
+ * disabled. */
+ uint8_t xoff_received : 1;
};
#endif /* !defined(EDGE_CONNECTION_ST_H) */
diff --git a/src/core/or/half_edge_st.h b/src/core/or/half_edge_st.h
index c956c7434a..ac97eb19f1 100644
--- a/src/core/or/half_edge_st.h
+++ b/src/core/or/half_edge_st.h
@@ -31,6 +31,18 @@ typedef struct half_edge_t {
* our deliver window */
int data_pending;
+ /**
+ * Monotime timestamp of when the other end should have successfuly
+ * shut down the stream and stop sending data, based on the larger
+ * of circuit RTT and CBT. Used if 'used_ccontrol' is true, to expire
+ * the half_edge at this monotime timestamp. */
+ uint64_t end_ack_expected_usec;
+
+ /**
+ * Did this edge use congestion control? If so, use
+ * timer instead of pending data approach */
+ int used_ccontrol : 1;
+
/** Is there a connected cell pending? */
int connected_pending : 1;
} half_edge_t;
diff --git a/src/core/or/include.am b/src/core/or/include.am
index d142062216..278556144c 100644
--- a/src/core/or/include.am
+++ b/src/core/or/include.am
@@ -39,6 +39,7 @@ LIBTOR_APP_A_SOURCES += \
src/core/or/congestion_control_vegas.c \
src/core/or/congestion_control_nola.c \
src/core/or/congestion_control_westwood.c \
+ src/core/or/congestion_control_flow.c \
src/core/or/status.c \
src/core/or/versions.c
@@ -82,6 +83,7 @@ noinst_HEADERS += \
src/core/or/entry_port_cfg_st.h \
src/core/or/extend_info_st.h \
src/core/or/listener_connection_st.h \
+ src/core/or/lttng_cc.inc \
src/core/or/lttng_circuit.inc \
src/core/or/onion.h \
src/core/or/or.h \
@@ -102,6 +104,7 @@ noinst_HEADERS += \
src/core/or/relay_crypto_st.h \
src/core/or/scheduler.h \
src/core/or/sendme.h \
+ src/core/or/congestion_control_flow.h \
src/core/or/congestion_control_common.h \
src/core/or/congestion_control_vegas.h \
src/core/or/congestion_control_nola.h \
@@ -115,7 +118,9 @@ noinst_HEADERS += \
if USE_TRACING_INSTRUMENTATION_LTTNG
LIBTOR_APP_A_SOURCES += \
+ src/core/or/trace_probes_cc.c \
src/core/or/trace_probes_circuit.c
noinst_HEADERS += \
+ src/core/or/trace_probes_cc.h \
src/core/or/trace_probes_circuit.h
endif
diff --git a/src/core/or/lttng_cc.inc b/src/core/or/lttng_cc.inc
new file mode 100644
index 0000000000..b7bf58e196
--- /dev/null
+++ b/src/core/or/lttng_cc.inc
@@ -0,0 +1,166 @@
+/* Copyright (c) 2021, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file lttng_cc.inc
+ * \brief LTTng tracing probe declaration for the congestion control subsystem.
+ * It is in this .inc file due to the non C standard syntax and the way
+ * we guard the header with the LTTng specific
+ * TRACEPOINT_HEADER_MULTI_READ.
+ **/
+
+#include "orconfig.h"
+
+/* We only build the following if LTTng instrumentation has been enabled. */
+#ifdef USE_TRACING_INSTRUMENTATION_LTTNG
+
+/* The following defines are LTTng-UST specific. */
+#undef TRACEPOINT_PROVIDER
+#define TRACEPOINT_PROVIDER tor_cc
+
+#undef TRACEPOINT_INCLUDE
+#define TRACEPOINT_INCLUDE "./src/core/or/lttng_cc.inc"
+
+#if !defined(LTTNG_CC_INC) || defined(TRACEPOINT_HEADER_MULTI_READ)
+#define LTTNG_CC_INC
+
+#include <lttng/tracepoint.h>
+
+/*
+ * Flow Control
+ */
+
+/* Emitted everytime the flow_control_decide_xon() function is called. */
+TRACEPOINT_EVENT(tor_cc, flow_decide_xon,
+ TP_ARGS(const edge_connection_t *, stream, size_t, n_written),
+ TP_FIELDS(
+ ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
+ ctf_integer(size_t, written_bytes, n_written)
+ ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
+ ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
+ ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
+ ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
+ ctf_integer(size_t, outbuf_len,
+ connection_get_outbuf_len(TO_CONN(stream)))
+ )
+)
+
+/* Emitted when flow control starts measuring the drain rate. */
+TRACEPOINT_EVENT(tor_cc, flow_decide_xon_drain_start,
+ TP_ARGS(const edge_connection_t *, stream),
+ TP_FIELDS(
+ ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
+ ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
+ ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
+ ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
+ ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
+ ctf_integer(size_t, outbuf_len,
+ connection_get_outbuf_len(TO_CONN(stream)))
+ )
+)
+
+/* Emitted when the drain rate is updated. The new_drain_rate value is what was
+ * just computed. */
+TRACEPOINT_EVENT(tor_cc, flow_decide_xon_drain_update,
+ TP_ARGS(const edge_connection_t *, stream, uint32_t, drain_rate),
+ TP_FIELDS(
+ ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
+ ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
+ ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
+ ctf_integer(uint32_t, new_drain_rate, drain_rate)
+ ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
+ ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
+ ctf_integer(size_t, outbuf_len,
+ connection_get_outbuf_len(TO_CONN(stream)))
+ )
+)
+
+/* Emitted when an XON cell is sent due to a notice in a drain rate change. */
+TRACEPOINT_EVENT(tor_cc, flow_decide_xon_rate_change,
+ TP_ARGS(const edge_connection_t *, stream),
+ TP_FIELDS(
+ ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
+ ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
+ ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
+ ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
+ ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
+ ctf_integer(size_t, outbuf_len,
+ connection_get_outbuf_len(TO_CONN(stream)))
+ )
+)
+
+/* Emitted when an XON cell is sent because we partially or fully drained the
+ * edge connection buffer. */
+TRACEPOINT_EVENT(tor_cc, flow_decide_xon_partial_drain,
+ TP_ARGS(const edge_connection_t *, stream),
+ TP_FIELDS(
+ ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
+ ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
+ ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
+ ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
+ ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
+ ctf_integer(size_t, outbuf_len,
+ connection_get_outbuf_len(TO_CONN(stream)))
+ )
+)
+
+/* Emitted when we double the drain rate which is an attempt to see if we can
+ * speed things up. */
+TRACEPOINT_EVENT(tor_cc, flow_decide_xon_drain_doubled,
+ TP_ARGS(const edge_connection_t *, stream),
+ TP_FIELDS(
+ ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
+ ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
+ ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
+ ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
+ ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
+ ctf_integer(size_t, outbuf_len,
+ connection_get_outbuf_len(TO_CONN(stream)))
+ )
+)
+
+/* XOFF */
+
+/* Emitted when we send an XOFF cell. */
+TRACEPOINT_EVENT(tor_cc, flow_decide_xoff_sending,
+ TP_ARGS(const edge_connection_t *, stream),
+ TP_FIELDS(
+ ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
+ ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
+ ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
+ ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
+ ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
+ ctf_integer(size_t, outbuf_len,
+ connection_get_outbuf_len(TO_CONN(stream)))
+ )
+)
+
+/*
+ * Congestion Control
+ */
+
+/* Emitted when the BDP value has been updated. */
+TRACEPOINT_EVENT(tor_cc, bdp_update,
+ TP_ARGS(const circuit_t *, circ, const congestion_control_t *, cc,
+ uint64_t, curr_rtt_usec, uint64_t, sendme_rate_bdp),
+ TP_FIELDS(
+ ctf_integer(uint64_t, circuit_ptr, circ)
+ ctf_integer(uint32_t, n_circ_id, circ->n_circ_id)
+ ctf_integer(uint64_t, min_rtt_usec, cc->min_rtt_usec)
+ ctf_integer(uint64_t, curr_rtt_usec, curr_rtt_usec)
+ ctf_integer(uint64_t, ewma_rtt_usec, cc->ewma_rtt_usec)
+ ctf_integer(uint64_t, max_rtt_usec, cc->max_rtt_usec)
+ ctf_integer(uint64_t, bdp_inflight_rtt, cc->bdp[BDP_ALG_INFLIGHT_RTT])
+ ctf_integer(uint64_t, bdp_cwnd_rtt, cc->bdp[BDP_ALG_CWND_RTT])
+ ctf_integer(uint64_t, bdp_sendme_rate, cc->bdp[BDP_ALG_SENDME_RATE])
+ ctf_integer(uint64_t, bdp_piecewise, cc->bdp[BDP_ALG_PIECEWISE])
+ ctf_integer(uint64_t, sendme_rate_bdp, sendme_rate_bdp)
+ )
+)
+
+#endif /* LTTNG_CC_INC || TRACEPOINT_HEADER_MULTI_READ */
+
+/* Must be included after the probes declaration. */
+#include <lttng/tracepoint-event.h>
+
+#endif /* USE_TRACING_INSTRUMENTATION_LTTNG */
diff --git a/src/core/or/or.h b/src/core/or/or.h
index 99948f26e2..392a848ee7 100644
--- a/src/core/or/or.h
+++ b/src/core/or/or.h
@@ -210,6 +210,9 @@ struct curve25519_public_key_t;
#define RELAY_COMMAND_PADDING_NEGOTIATE 41
#define RELAY_COMMAND_PADDING_NEGOTIATED 42
+#define RELAY_COMMAND_XOFF 43
+#define RELAY_COMMAND_XON 44
+
/* Reasons why an OR connection is closed. */
#define END_OR_CONN_REASON_DONE 1
#define END_OR_CONN_REASON_REFUSED 2 /* connection refused */
@@ -591,18 +594,6 @@ typedef struct or_handshake_state_t or_handshake_state_t;
/** Length of Extended ORPort connection identifier. */
#define EXT_OR_CONN_ID_LEN DIGEST_LEN /* 20 */
-/*
- * OR_CONN_HIGHWATER and OR_CONN_LOWWATER moved from connection_or.c so
- * channeltls.c can see them too.
- */
-
-/** When adding cells to an OR connection's outbuf, keep adding until the
- * outbuf is at least this long, or we run out of cells. */
-#define OR_CONN_HIGHWATER (32*1024)
-
-/** Add cells to an OR connection's outbuf whenever the outbuf's data length
- * drops below this size. */
-#define OR_CONN_LOWWATER (16*1024)
typedef struct connection_t connection_t;
typedef struct control_connection_t control_connection_t;
diff --git a/src/core/or/relay.c b/src/core/or/relay.c
index e3d41d7bf0..a2123f991c 100644
--- a/src/core/or/relay.c
+++ b/src/core/or/relay.c
@@ -98,6 +98,7 @@
#include "core/or/socks_request_st.h"
#include "core/or/sendme.h"
#include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_flow.h"
static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
cell_direction_t cell_direction,
@@ -116,13 +117,6 @@ static void adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ,
node_t *node,
const tor_addr_t *addr);
-/** Stop reading on edge connections when we have this many cells
- * waiting on the appropriate queue. */
-#define CELL_QUEUE_HIGHWATER_SIZE 256
-/** Start reading from edge connections again when we get down to this many
- * cells. */
-#define CELL_QUEUE_LOWWATER_SIZE 64
-
/** Stats: how many relay cells have originated at this hop, or have
* been relayed onward (not recognized at this hop)?
*/
@@ -1740,6 +1734,44 @@ handle_relay_cell_command(cell_t *cell, circuit_t *circ,
}
return 0;
+ case RELAY_COMMAND_XOFF:
+ if (!conn) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+ if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
+ connection_half_edge_is_valid_data(ocirc->half_streams,
+ rh->stream_id)) {
+ circuit_read_valid_data(ocirc, rh->length);
+ }
+ }
+ return 0;
+ }
+
+ if (circuit_process_stream_xoff(conn, layer_hint, cell)) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
+ }
+ }
+ return 0;
+ case RELAY_COMMAND_XON:
+ if (!conn) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+ if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
+ connection_half_edge_is_valid_data(ocirc->half_streams,
+ rh->stream_id)) {
+ circuit_read_valid_data(ocirc, rh->length);
+ }
+ }
+ return 0;
+ }
+
+ if (circuit_process_stream_xon(conn, layer_hint, cell)) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
+ }
+ }
+ return 0;
case RELAY_COMMAND_END:
reason = rh->length > 0 ?
get_uint8(cell->payload+RELAY_HEADER_SIZE) : END_STREAM_REASON_MISC;
@@ -2287,7 +2319,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
}
/* Handle the stream-level SENDME package window. */
- if (sendme_note_stream_data_packaged(conn) < 0) {
+ if (sendme_note_stream_data_packaged(conn, length) < 0) {
connection_stop_reading(TO_CONN(conn));
log_debug(domain,"conn->package_window reached 0.");
circuit_consider_stop_edge_reading(circ, cpath_layer);
@@ -2361,8 +2393,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
cells_on_queue = or_circ->p_chan_cells.n;
}
- if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package)
- max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue;
+ if (cell_queue_highwatermark() - cells_on_queue < max_to_package)
+ max_to_package = cell_queue_highwatermark() - cells_on_queue;
/* Once we used to start listening on the streams in the order they
* appeared in the linked list. That leads to starvation on the
@@ -2402,7 +2434,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
/* Activate reading starting from the chosen stream */
for (conn=chosen_stream; conn; conn = conn->next_stream) {
/* Start reading for the streams starting from here */
- if (conn->base_.marked_for_close || conn->package_window <= 0)
+ if (conn->base_.marked_for_close || conn->package_window <= 0 ||
+ conn->xoff_received)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
@@ -2413,7 +2446,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
}
/* Go back and do the ones we skipped, circular-style */
for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
- if (conn->base_.marked_for_close || conn->package_window <= 0)
+ if (conn->base_.marked_for_close || conn->package_window <= 0 ||
+ conn->xoff_received)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
@@ -3080,7 +3114,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
/* Is the cell queue low enough to unblock all the streams that are waiting
* to write to this circuit? */
- if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
+ if (streams_blocked && queue->n <= cell_queue_lowwatermark())
set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
/* If n_flushed < max still, loop around and pick another circuit */
@@ -3198,7 +3232,7 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
/* If we have too many cells on the circuit, we should stop reading from
* the edge streams for a while. */
- if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
+ if (!streams_blocked && queue->n >= cell_queue_highwatermark())
set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */
if (streams_blocked && fromstream) {
diff --git a/src/core/or/sendme.c b/src/core/or/sendme.c
index 900490a892..ee670f9d51 100644
--- a/src/core/or/sendme.c
+++ b/src/core/or/sendme.c
@@ -22,6 +22,7 @@
#include "core/or/relay.h"
#include "core/or/sendme.h"
#include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_flow.h"
#include "feature/nodelist/networkstatus.h"
#include "lib/ctime/di_ops.h"
#include "trunnel/sendme_cell.h"
@@ -370,6 +371,10 @@ sendme_connection_edge_consider_sending(edge_connection_t *conn)
int log_domain = TO_CONN(conn)->type == CONN_TYPE_AP ? LD_APP : LD_EXIT;
+ /* If we use flow control, we do not send stream sendmes */
+ if (edge_uses_flow_control(conn))
+ goto end;
+
/* Don't send it if we still have data to deliver. */
if (connection_outbuf_too_full(TO_CONN(conn))) {
goto end;
@@ -546,6 +551,12 @@ sendme_process_stream_level(edge_connection_t *conn, circuit_t *circ,
tor_assert(conn);
tor_assert(circ);
+ if (edge_uses_flow_control(conn)) {
+ log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+ "Congestion control got stream sendme");
+ return -END_CIRC_REASON_TORPROTOCOL;
+ }
+
/* Don't allow the other endpoint to request more than our maximum (i.e.
* initial) stream SENDME window worth of data. Well-behaved stock clients
* will not request more than this max (as per the check in the while loop
@@ -603,7 +614,12 @@ int
sendme_stream_data_received(edge_connection_t *conn)
{
tor_assert(conn);
- return --conn->deliver_window;
+
+ if (edge_uses_flow_control(conn)) {
+ return flow_control_decide_xoff(conn);
+ } else {
+ return --conn->deliver_window;
+ }
}
/* Called when a relay DATA cell is packaged on the given circuit. If
@@ -651,10 +667,18 @@ sendme_note_circuit_data_packaged(circuit_t *circ, crypt_path_t *layer_hint)
/* Called when a relay DATA cell is packaged for the given edge connection
* conn. Update the package window and return its new value. */
int
-sendme_note_stream_data_packaged(edge_connection_t *conn)
+sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len)
{
tor_assert(conn);
+ if (edge_uses_flow_control(conn)) {
+ flow_control_note_sent_data(conn, len);
+ if (conn->xoff_received)
+ return -1;
+ else
+ return 1;
+ }
+
--conn->package_window;
log_debug(LD_APP, "Stream package_window now %d.", conn->package_window);
return conn->package_window;
diff --git a/src/core/or/sendme.h b/src/core/or/sendme.h
index c224d0a921..2abec91a91 100644
--- a/src/core/or/sendme.h
+++ b/src/core/or/sendme.h
@@ -33,7 +33,7 @@ int sendme_circuit_data_received(circuit_t *circ, crypt_path_t *layer_hint);
/* Update package window functions. */
int sendme_note_circuit_data_packaged(circuit_t *circ,
crypt_path_t *layer_hint);
-int sendme_note_stream_data_packaged(edge_connection_t *conn);
+int sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len);
/* Record cell digest on circuit. */
void sendme_record_cell_digest_on_circ(circuit_t *circ, crypt_path_t *cpath);
diff --git a/src/core/or/trace_probes_cc.c b/src/core/or/trace_probes_cc.c
new file mode 100644
index 0000000000..d52646da4f
--- /dev/null
+++ b/src/core/or/trace_probes_cc.c
@@ -0,0 +1,33 @@
+/* Copyright (c) 2021, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file trace_probes_cc.c
+ * \brief Tracepoint provider source file for the cc subsystem. Probes
+ * are generated within this C file for LTTng-UST
+ **/
+
+#include "orconfig.h"
+
+/*
+ * Following section is specific to LTTng-UST.
+ */
+#ifdef USE_TRACING_INSTRUMENTATION_LTTNG
+
+/* Header files that the probes need. */
+#include "core/or/or.h"
+#include "core/or/channel.h"
+#include "core/or/circuit_st.h"
+#include "core/or/circuitlist.h"
+#include "core/or/congestion_control_st.h"
+#include "core/or/connection_st.h"
+#include "core/or/edge_connection_st.h"
+#include "core/or/or_circuit_st.h"
+#include "core/or/origin_circuit_st.h"
+
+#define TRACEPOINT_DEFINE
+#define TRACEPOINT_CREATE_PROBES
+
+#include "core/or/trace_probes_cc.h"
+
+#endif /* defined(USE_TRACING_INSTRUMENTATION_LTTNG) */
diff --git a/src/core/or/trace_probes_cc.h b/src/core/or/trace_probes_cc.h
new file mode 100644
index 0000000000..1f87528723
--- /dev/null
+++ b/src/core/or/trace_probes_cc.h
@@ -0,0 +1,22 @@
+/* Copyright (c) 2021, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file trace_probes_cc.c
+ * \brief The tracing probes for the congestion control subsystem.
+ * Currently, only LTTng-UST probes are available.
+ **/
+
+#ifndef TOR_TRACE_PROBES_CC_H
+#define TOR_TRACE_PROBES_CC_H
+
+#include "lib/trace/events.h"
+
+/* We only build the following if LTTng instrumentation has been enabled. */
+#ifdef USE_TRACING_INSTRUMENTATION_LTTNG
+
+#include "core/or/lttng_cc.inc"
+
+#endif /* USE_TRACING_INSTRUMENTATION_LTTNG */
+
+#endif /* !defined(TOR_TRACE_PROBES_CC_H) */
diff --git a/src/feature/nodelist/networkstatus.c b/src/feature/nodelist/networkstatus.c
index 7a1e73ef60..6867d8c98e 100644
--- a/src/feature/nodelist/networkstatus.c
+++ b/src/feature/nodelist/networkstatus.c
@@ -45,6 +45,8 @@
#include "core/or/channel.h"
#include "core/or/channelpadding.h"
#include "core/or/circuitpadding.h"
+#include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_flow.h"
#include "core/or/circuitmux.h"
#include "core/or/circuitmux_ewma.h"
#include "core/or/circuitstats.h"
@@ -1699,6 +1701,8 @@ notify_after_networkstatus_changes(void)
channelpadding_new_consensus_params(c);
circpad_new_consensus_params(c);
router_new_consensus_params(c);
+ congestion_control_new_consensus_params(c);
+ flow_control_new_consensus_params(c);
/* Maintenance of our L2 guard list */
maintain_layer2_guards();
diff --git a/src/test/test_channeltls.c b/src/test/test_channeltls.c
index 5219c86097..ca7fee2c53 100644
--- a/src/test/test_channeltls.c
+++ b/src/test/test_channeltls.c
@@ -20,6 +20,7 @@
#include "lib/tls/tortls.h"
#include "core/or/or_connection_st.h"
+#include "core/or/congestion_control_common.h"
/* Test suite stuff */
#include "test/test.h"
@@ -155,7 +156,7 @@ test_channeltls_num_bytes_queued(void *arg)
* - 2 cells.
*/
n = ch->num_cells_writeable(ch);
- tt_int_op(n, OP_EQ, CEIL_DIV(OR_CONN_HIGHWATER, 512) - 2);
+ tt_int_op(n, OP_EQ, CEIL_DIV(or_conn_highwatermark(), 512) - 2);
UNMOCK(buf_datalen);
tlschan_buf_datalen_mock_target = NULL;
tlschan_buf_datalen_mock_size = 0;
diff --git a/src/trunnel/flow_control_cells.c b/src/trunnel/flow_control_cells.c
new file mode 100644
index 0000000000..df44756d6b
--- /dev/null
+++ b/src/trunnel/flow_control_cells.c
@@ -0,0 +1,382 @@
+/* flow_control_cells.c -- generated by Trunnel v1.5.3.
+ * https://gitweb.torproject.org/trunnel.git
+ * You probably shouldn't edit this file.
+ */
+#include <stdlib.h>
+#include "trunnel-impl.h"
+
+#include "flow_control_cells.h"
+
+#define TRUNNEL_SET_ERROR_CODE(obj) \
+ do { \
+ (obj)->trunnel_error_code_ = 1; \
+ } while (0)
+
+#if defined(__COVERITY__) || defined(__clang_analyzer__)
+/* If we're running a static analysis tool, we don't want it to complain
+ * that some of our remaining-bytes checks are dead-code. */
+int flowcontrolcells_deadcode_dummy__ = 0;
+#define OR_DEADCODE_DUMMY || flowcontrolcells_deadcode_dummy__
+#else
+#define OR_DEADCODE_DUMMY
+#endif
+
+#define CHECK_REMAINING(nbytes, label) \
+ do { \
+ if (remaining < (nbytes) OR_DEADCODE_DUMMY) { \
+ goto label; \
+ } \
+ } while (0)
+
+xoff_cell_t *
+xoff_cell_new(void)
+{
+ xoff_cell_t *val = trunnel_calloc(1, sizeof(xoff_cell_t));
+ if (NULL == val)
+ return NULL;
+ return val;
+}
+
+/** Release all storage held inside 'obj', but do not free 'obj'.
+ */
+static void
+xoff_cell_clear(xoff_cell_t *obj)
+{
+ (void) obj;
+}
+
+void
+xoff_cell_free(xoff_cell_t *obj)
+{
+ if (obj == NULL)
+ return;
+ xoff_cell_clear(obj);
+ trunnel_memwipe(obj, sizeof(xoff_cell_t));
+ trunnel_free_(obj);
+}
+
+uint8_t
+xoff_cell_get_version(const xoff_cell_t *inp)
+{
+ return inp->version;
+}
+int
+xoff_cell_set_version(xoff_cell_t *inp, uint8_t val)
+{
+ if (! ((val == 0))) {
+ TRUNNEL_SET_ERROR_CODE(inp);
+ return -1;
+ }
+ inp->version = val;
+ return 0;
+}
+const char *
+xoff_cell_check(const xoff_cell_t *obj)
+{
+ if (obj == NULL)
+ return "Object was NULL";
+ if (obj->trunnel_error_code_)
+ return "A set function failed on this object";
+ if (! (obj->version == 0))
+ return "Integer out of bounds";
+ return NULL;
+}
+
+ssize_t
+xoff_cell_encoded_len(const xoff_cell_t *obj)
+{
+ ssize_t result = 0;
+
+ if (NULL != xoff_cell_check(obj))
+ return -1;
+
+
+ /* Length of u8 version IN [0] */
+ result += 1;
+ return result;
+}
+int
+xoff_cell_clear_errors(xoff_cell_t *obj)
+{
+ int r = obj->trunnel_error_code_;
+ obj->trunnel_error_code_ = 0;
+ return r;
+}
+ssize_t
+xoff_cell_encode(uint8_t *output, const size_t avail, const xoff_cell_t *obj)
+{
+ ssize_t result = 0;
+ size_t written = 0;
+ uint8_t *ptr = output;
+ const char *msg;
+#ifdef TRUNNEL_CHECK_ENCODED_LEN
+ const ssize_t encoded_len = xoff_cell_encoded_len(obj);
+#endif
+
+ if (NULL != (msg = xoff_cell_check(obj)))
+ goto check_failed;
+
+#ifdef TRUNNEL_CHECK_ENCODED_LEN
+ trunnel_assert(encoded_len >= 0);
+#endif
+
+ /* Encode u8 version IN [0] */
+ trunnel_assert(written <= avail);
+ if (avail - written < 1)
+ goto truncated;
+ trunnel_set_uint8(ptr, (obj->version));
+ written += 1; ptr += 1;
+
+
+ trunnel_assert(ptr == output + written);
+#ifdef TRUNNEL_CHECK_ENCODED_LEN
+ {
+ trunnel_assert(encoded_len >= 0);
+ trunnel_assert((size_t)encoded_len == written);
+ }
+
+#endif
+
+ return written;
+
+ truncated:
+ result = -2;
+ goto fail;
+ check_failed:
+ (void)msg;
+ result = -1;
+ goto fail;
+ fail:
+ trunnel_assert(result < 0);
+ return result;
+}
+
+/** As xoff_cell_parse(), but do not allocate the output object.
+ */
+static ssize_t
+xoff_cell_parse_into(xoff_cell_t *obj, const uint8_t *input, const size_t len_in)
+{
+ const uint8_t *ptr = input;
+ size_t remaining = len_in;
+ ssize_t result = 0;
+ (void)result;
+
+ /* Parse u8 version IN [0] */
+ CHECK_REMAINING(1, truncated);
+ obj->version = (trunnel_get_uint8(ptr));
+ remaining -= 1; ptr += 1;
+ if (! (obj->version == 0))
+ goto fail;
+ trunnel_assert(ptr + remaining == input + len_in);
+ return len_in - remaining;
+
+ truncated:
+ return -2;
+ fail:
+ result = -1;
+ return result;
+}
+
+ssize_t
+xoff_cell_parse(xoff_cell_t **output, const uint8_t *input, const size_t len_in)
+{
+ ssize_t result;
+ *output = xoff_cell_new();
+ if (NULL == *output)
+ return -1;
+ result = xoff_cell_parse_into(*output, input, len_in);
+ if (result < 0) {
+ xoff_cell_free(*output);
+ *output = NULL;
+ }
+ return result;
+}
+xon_cell_t *
+xon_cell_new(void)
+{
+ xon_cell_t *val = trunnel_calloc(1, sizeof(xon_cell_t));
+ if (NULL == val)
+ return NULL;
+ return val;
+}
+
+/** Release all storage held inside 'obj', but do not free 'obj'.
+ */
+static void
+xon_cell_clear(xon_cell_t *obj)
+{
+ (void) obj;
+}
+
+void
+xon_cell_free(xon_cell_t *obj)
+{
+ if (obj == NULL)
+ return;
+ xon_cell_clear(obj);
+ trunnel_memwipe(obj, sizeof(xon_cell_t));
+ trunnel_free_(obj);
+}
+
+uint8_t
+xon_cell_get_version(const xon_cell_t *inp)
+{
+ return inp->version;
+}
+int
+xon_cell_set_version(xon_cell_t *inp, uint8_t val)
+{
+ if (! ((val == 0))) {
+ TRUNNEL_SET_ERROR_CODE(inp);
+ return -1;
+ }
+ inp->version = val;
+ return 0;
+}
+uint32_t
+xon_cell_get_kbps_ewma(const xon_cell_t *inp)
+{
+ return inp->kbps_ewma;
+}
+int
+xon_cell_set_kbps_ewma(xon_cell_t *inp, uint32_t val)
+{
+ inp->kbps_ewma = val;
+ return 0;
+}
+const char *
+xon_cell_check(const xon_cell_t *obj)
+{
+ if (obj == NULL)
+ return "Object was NULL";
+ if (obj->trunnel_error_code_)
+ return "A set function failed on this object";
+ if (! (obj->version == 0))
+ return "Integer out of bounds";
+ return NULL;
+}
+
+ssize_t
+xon_cell_encoded_len(const xon_cell_t *obj)
+{
+ ssize_t result = 0;
+
+ if (NULL != xon_cell_check(obj))
+ return -1;
+
+
+ /* Length of u8 version IN [0] */
+ result += 1;
+
+ /* Length of u32 kbps_ewma */
+ result += 4;
+ return result;
+}
+int
+xon_cell_clear_errors(xon_cell_t *obj)
+{
+ int r = obj->trunnel_error_code_;
+ obj->trunnel_error_code_ = 0;
+ return r;
+}
+ssize_t
+xon_cell_encode(uint8_t *output, const size_t avail, const xon_cell_t *obj)
+{
+ ssize_t result = 0;
+ size_t written = 0;
+ uint8_t *ptr = output;
+ const char *msg;
+#ifdef TRUNNEL_CHECK_ENCODED_LEN
+ const ssize_t encoded_len = xon_cell_encoded_len(obj);
+#endif
+
+ if (NULL != (msg = xon_cell_check(obj)))
+ goto check_failed;
+
+#ifdef TRUNNEL_CHECK_ENCODED_LEN
+ trunnel_assert(encoded_len >= 0);
+#endif
+
+ /* Encode u8 version IN [0] */
+ trunnel_assert(written <= avail);
+ if (avail - written < 1)
+ goto truncated;
+ trunnel_set_uint8(ptr, (obj->version));
+ written += 1; ptr += 1;
+
+ /* Encode u32 kbps_ewma */
+ trunnel_assert(written <= avail);
+ if (avail - written < 4)
+ goto truncated;
+ trunnel_set_uint32(ptr, trunnel_htonl(obj->kbps_ewma));
+ written += 4; ptr += 4;
+
+
+ trunnel_assert(ptr == output + written);
+#ifdef TRUNNEL_CHECK_ENCODED_LEN
+ {
+ trunnel_assert(encoded_len >= 0);
+ trunnel_assert((size_t)encoded_len == written);
+ }
+
+#endif
+
+ return written;
+
+ truncated:
+ result = -2;
+ goto fail;
+ check_failed:
+ (void)msg;
+ result = -1;
+ goto fail;
+ fail:
+ trunnel_assert(result < 0);
+ return result;
+}
+
+/** As xon_cell_parse(), but do not allocate the output object.
+ */
+static ssize_t
+xon_cell_parse_into(xon_cell_t *obj, const uint8_t *input, const size_t len_in)
+{
+ const uint8_t *ptr = input;
+ size_t remaining = len_in;
+ ssize_t result = 0;
+ (void)result;
+
+ /* Parse u8 version IN [0] */
+ CHECK_REMAINING(1, truncated);
+ obj->version = (trunnel_get_uint8(ptr));
+ remaining -= 1; ptr += 1;
+ if (! (obj->version == 0))
+ goto fail;
+
+ /* Parse u32 kbps_ewma */
+ CHECK_REMAINING(4, truncated);
+ obj->kbps_ewma = trunnel_ntohl(trunnel_get_uint32(ptr));
+ remaining -= 4; ptr += 4;
+ trunnel_assert(ptr + remaining == input + len_in);
+ return len_in - remaining;
+
+ truncated:
+ return -2;
+ fail:
+ result = -1;
+ return result;
+}
+
+ssize_t
+xon_cell_parse(xon_cell_t **output, const uint8_t *input, const size_t len_in)
+{
+ ssize_t result;
+ *output = xon_cell_new();
+ if (NULL == *output)
+ return -1;
+ result = xon_cell_parse_into(*output, input, len_in);
+ if (result < 0) {
+ xon_cell_free(*output);
+ *output = NULL;
+ }
+ return result;
+}
diff --git a/src/trunnel/flow_control_cells.h b/src/trunnel/flow_control_cells.h
new file mode 100644
index 0000000000..b8108b9a24
--- /dev/null
+++ b/src/trunnel/flow_control_cells.h
@@ -0,0 +1,120 @@
+/* flow_control_cells.h -- generated by Trunnel v1.5.3.
+ * https://gitweb.torproject.org/trunnel.git
+ * You probably shouldn't edit this file.
+ */
+#ifndef TRUNNEL_FLOW_CONTROL_CELLS_H
+#define TRUNNEL_FLOW_CONTROL_CELLS_H
+
+#include <stdint.h>
+#include "trunnel.h"
+
+#if !defined(TRUNNEL_OPAQUE) && !defined(TRUNNEL_OPAQUE_XOFF_CELL)
+struct xoff_cell_st {
+ uint8_t version;
+ uint8_t trunnel_error_code_;
+};
+#endif
+typedef struct xoff_cell_st xoff_cell_t;
+#if !defined(TRUNNEL_OPAQUE) && !defined(TRUNNEL_OPAQUE_XON_CELL)
+struct xon_cell_st {
+ uint8_t version;
+ uint32_t kbps_ewma;
+ uint8_t trunnel_error_code_;
+};
+#endif
+typedef struct xon_cell_st xon_cell_t;
+/** Return a newly allocated xoff_cell with all elements set to zero.
+ */
+xoff_cell_t *xoff_cell_new(void);
+/** Release all storage held by the xoff_cell in 'victim'. (Do nothing
+ * if 'victim' is NULL.)
+ */
+void xoff_cell_free(xoff_cell_t *victim);
+/** Try to parse a xoff_cell from the buffer in 'input', using up to
+ * 'len_in' bytes from the input buffer. On success, return the number
+ * of bytes consumed and set *output to the newly allocated
+ * xoff_cell_t. On failure, return -2 if the input appears truncated,
+ * and -1 if the input is otherwise invalid.
+ */
+ssize_t xoff_cell_parse(xoff_cell_t **output, const uint8_t *input, const size_t len_in);
+/** Return the number of bytes we expect to need to encode the
+ * xoff_cell in 'obj'. On failure, return a negative value. Note that
+ * this value may be an overestimate, and can even be an underestimate
+ * for certain unencodeable objects.
+ */
+ssize_t xoff_cell_encoded_len(const xoff_cell_t *obj);
+/** Try to encode the xoff_cell from 'input' into the buffer at
+ * 'output', using up to 'avail' bytes of the output buffer. On
+ * success, return the number of bytes used. On failure, return -2 if
+ * the buffer was not long enough, and -1 if the input was invalid.
+ */
+ssize_t xoff_cell_encode(uint8_t *output, size_t avail, const xoff_cell_t *input);
+/** Check whether the internal state of the xoff_cell in 'obj' is
+ * consistent. Return NULL if it is, and a short message if it is not.
+ */
+const char *xoff_cell_check(const xoff_cell_t *obj);
+/** Clear any errors that were set on the object 'obj' by its setter
+ * functions. Return true iff errors were cleared.
+ */
+int xoff_cell_clear_errors(xoff_cell_t *obj);
+/** Return the value of the version field of the xoff_cell_t in 'inp'
+ */
+uint8_t xoff_cell_get_version(const xoff_cell_t *inp);
+/** Set the value of the version field of the xoff_cell_t in 'inp' to
+ * 'val'. Return 0 on success; return -1 and set the error code on
+ * 'inp' on failure.
+ */
+int xoff_cell_set_version(xoff_cell_t *inp, uint8_t val);
+/** Return a newly allocated xon_cell with all elements set to zero.
+ */
+xon_cell_t *xon_cell_new(void);
+/** Release all storage held by the xon_cell in 'victim'. (Do nothing
+ * if 'victim' is NULL.)
+ */
+void xon_cell_free(xon_cell_t *victim);
+/** Try to parse a xon_cell from the buffer in 'input', using up to
+ * 'len_in' bytes from the input buffer. On success, return the number
+ * of bytes consumed and set *output to the newly allocated
+ * xon_cell_t. On failure, return -2 if the input appears truncated,
+ * and -1 if the input is otherwise invalid.
+ */
+ssize_t xon_cell_parse(xon_cell_t **output, const uint8_t *input, const size_t len_in);
+/** Return the number of bytes we expect to need to encode the
+ * xon_cell in 'obj'. On failure, return a negative value. Note that
+ * this value may be an overestimate, and can even be an underestimate
+ * for certain unencodeable objects.
+ */
+ssize_t xon_cell_encoded_len(const xon_cell_t *obj);
+/** Try to encode the xon_cell from 'input' into the buffer at
+ * 'output', using up to 'avail' bytes of the output buffer. On
+ * success, return the number of bytes used. On failure, return -2 if
+ * the buffer was not long enough, and -1 if the input was invalid.
+ */
+ssize_t xon_cell_encode(uint8_t *output, size_t avail, const xon_cell_t *input);
+/** Check whether the internal state of the xon_cell in 'obj' is
+ * consistent. Return NULL if it is, and a short message if it is not.
+ */
+const char *xon_cell_check(const xon_cell_t *obj);
+/** Clear any errors that were set on the object 'obj' by its setter
+ * functions. Return true iff errors were cleared.
+ */
+int xon_cell_clear_errors(xon_cell_t *obj);
+/** Return the value of the version field of the xon_cell_t in 'inp'
+ */
+uint8_t xon_cell_get_version(const xon_cell_t *inp);
+/** Set the value of the version field of the xon_cell_t in 'inp' to
+ * 'val'. Return 0 on success; return -1 and set the error code on
+ * 'inp' on failure.
+ */
+int xon_cell_set_version(xon_cell_t *inp, uint8_t val);
+/** Return the value of the kbps_ewma field of the xon_cell_t in 'inp'
+ */
+uint32_t xon_cell_get_kbps_ewma(const xon_cell_t *inp);
+/** Set the value of the kbps_ewma field of the xon_cell_t in 'inp' to
+ * 'val'. Return 0 on success; return -1 and set the error code on
+ * 'inp' on failure.
+ */
+int xon_cell_set_kbps_ewma(xon_cell_t *inp, uint32_t val);
+
+
+#endif
diff --git a/src/trunnel/flow_control_cells.trunnel b/src/trunnel/flow_control_cells.trunnel
new file mode 100644
index 0000000000..9d07b568a9
--- /dev/null
+++ b/src/trunnel/flow_control_cells.trunnel
@@ -0,0 +1,20 @@
+/* This file contains the xon and xoff cell definitions, for flow control. */
+
+/* xoff cell definition. Tells the other endpoint to stop sending, because
+ * we have too much data queued for this stream. */
+struct xoff_cell {
+ /* Version field. */
+ u8 version IN [0x00];
+}
+
+/* xon cell declaration. Tells the other endpoint to resume sending and/or
+ * update its sending rate on this stream based on advisory information. */
+struct xon_cell {
+ /* Version field. */
+ u8 version IN [0x00];
+
+ /* Advisory field: The ewma rate of socket drain we have seen so far
+ * on this stream, in kilobytes/sec (1000 bytes/sec). May be zero,
+ * which means no rate advice. */
+ u32 kbps_ewma;
+}
diff --git a/src/trunnel/include.am b/src/trunnel/include.am
index 6c3a5ff06b..00a96536f1 100644
--- a/src/trunnel/include.am
+++ b/src/trunnel/include.am
@@ -12,6 +12,7 @@ TRUNNELINPUTS = \
src/trunnel/pwbox.trunnel \
src/trunnel/channelpadding_negotiation.trunnel \
src/trunnel/sendme_cell.trunnel \
+ src/trunnel/flow_control_cells.trunnel \
src/trunnel/socks5.trunnel \
src/trunnel/circpad_negotiation.trunnel
@@ -26,6 +27,7 @@ TRUNNELSOURCES = \
src/trunnel/hs/cell_rendezvous.c \
src/trunnel/channelpadding_negotiation.c \
src/trunnel/sendme_cell.c \
+ src/trunnel/flow_control_cells.c \
src/trunnel/socks5.c \
src/trunnel/netinfo.c \
src/trunnel/circpad_negotiation.c
@@ -43,6 +45,7 @@ TRUNNELHEADERS = \
src/trunnel/hs/cell_rendezvous.h \
src/trunnel/channelpadding_negotiation.h \
src/trunnel/sendme_cell.h \
+ src/trunnel/flow_control_cells.h \
src/trunnel/socks5.h \
src/trunnel/netinfo.h \
src/trunnel/circpad_negotiation.h