aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMike Perry <mikeperry-git@torproject.org>2021-08-10 21:35:46 +0000
committerDavid Goulet <dgoulet@torproject.org>2021-10-04 10:45:46 -0400
commit0422eb26a70fc1450cc6b57902f189edc4eed10a (patch)
treee5efb02a06ba635ddb837cb66c2c89969466895e /src
parenta89a71cd7b4658eba8465f31b5e1bc21e3325a53 (diff)
downloadtor-0422eb26a70fc1450cc6b57902f189edc4eed10a.tar.gz
tor-0422eb26a70fc1450cc6b57902f189edc4eed10a.zip
Prop#324: Hook up flow control
Diffstat (limited to 'src')
-rw-r--r--src/app/main/main.c2
-rw-r--r--src/core/mainloop/connection.c16
-rw-r--r--src/core/mainloop/mainloop.c7
-rw-r--r--src/core/or/or.h3
-rw-r--r--src/core/or/relay.c47
-rw-r--r--src/core/or/sendme.c28
-rw-r--r--src/core/or/sendme.h2
-rw-r--r--src/feature/nodelist/networkstatus.c2
8 files changed, 99 insertions, 8 deletions
diff --git a/src/app/main/main.c b/src/app/main/main.c
index 89564490e6..0742abe70a 100644
--- a/src/app/main/main.c
+++ b/src/app/main/main.c
@@ -27,6 +27,7 @@
#include "core/or/channel.h"
#include "core/or/channelpadding.h"
#include "core/or/circuitpadding.h"
+#include "core/or/congestion_control_flow.h"
#include "core/or/circuitlist.h"
#include "core/or/command.h"
#include "core/or/connection_or.h"
@@ -630,6 +631,7 @@ tor_init(int argc, char *argv[])
* until we get a consensus */
channelpadding_new_consensus_params(NULL);
circpad_new_consensus_params(NULL);
+ flow_control_new_consensus_params(NULL);
/* Initialize circuit padding to defaults+torrc until we get a consensus */
circpad_machines_init();
diff --git a/src/core/mainloop/connection.c b/src/core/mainloop/connection.c
index 48bea792ae..9271a70914 100644
--- a/src/core/mainloop/connection.c
+++ b/src/core/mainloop/connection.c
@@ -147,6 +147,8 @@
#include "feature/nodelist/routerinfo_st.h"
#include "core/or/socks_request_st.h"
+#include "core/or/congestion_control_flow.h"
+
/**
* On Windows and Linux we cannot reliably bind() a socket to an
* address and port if: 1) There's already a socket bound to wildcard
@@ -4594,9 +4596,9 @@ connection_handle_write_impl(connection_t *conn, int force)
!dont_stop_writing) { /* it's done flushing */
if (connection_finished_flushing(conn) < 0) {
/* already marked */
- return -1;
+ goto err;
}
- return 0;
+ goto done;
}
/* Call even if result is 0, since the global write bucket may
@@ -4606,7 +4608,17 @@ connection_handle_write_impl(connection_t *conn, int force)
if (n_read > 0 && connection_is_reading(conn))
connection_consider_empty_read_buckets(conn);
+ done:
+ /* If this is an edge connection with congestion control, check to see
+ * if it is time to send an xon */
+ if (conn_uses_flow_control(conn)) {
+ flow_control_decide_xon(TO_EDGE_CONN(conn), n_written);
+ }
+
return 0;
+
+ err:
+ return -1;
}
/* DOCDOC connection_handle_write */
diff --git a/src/core/mainloop/mainloop.c b/src/core/mainloop/mainloop.c
index 37b53db92a..cd57dea3d4 100644
--- a/src/core/mainloop/mainloop.c
+++ b/src/core/mainloop/mainloop.c
@@ -641,6 +641,13 @@ connection_start_reading,(connection_t *conn))
if (connection_should_read_from_linked_conn(conn))
connection_start_reading_from_linked_conn(conn);
} else {
+ if (CONN_IS_EDGE(conn) && TO_EDGE_CONN(conn)->xoff_received) {
+ /* We should not get called here if we're waiting for an XON, but
+ * belt-and-suspenders */
+ log_notice(LD_NET,
+ "Request to start reading on an edgeconn blocked with XOFF");
+ return;
+ }
if (event_add(conn->read_event, NULL))
log_warn(LD_NET, "Error from libevent setting read event state for %d "
"to watched: %s",
diff --git a/src/core/or/or.h b/src/core/or/or.h
index 99948f26e2..ad82130301 100644
--- a/src/core/or/or.h
+++ b/src/core/or/or.h
@@ -210,6 +210,9 @@ struct curve25519_public_key_t;
#define RELAY_COMMAND_PADDING_NEGOTIATE 41
#define RELAY_COMMAND_PADDING_NEGOTIATED 42
+#define RELAY_COMMAND_XOFF 43
+#define RELAY_COMMAND_XON 44
+
/* Reasons why an OR connection is closed. */
#define END_OR_CONN_REASON_DONE 1
#define END_OR_CONN_REASON_REFUSED 2 /* connection refused */
diff --git a/src/core/or/relay.c b/src/core/or/relay.c
index e3d41d7bf0..0e889eb348 100644
--- a/src/core/or/relay.c
+++ b/src/core/or/relay.c
@@ -98,6 +98,7 @@
#include "core/or/socks_request_st.h"
#include "core/or/sendme.h"
#include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_flow.h"
static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
cell_direction_t cell_direction,
@@ -1740,6 +1741,44 @@ handle_relay_cell_command(cell_t *cell, circuit_t *circ,
}
return 0;
+ case RELAY_COMMAND_XOFF:
+ if (!conn) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+ if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
+ connection_half_edge_is_valid_data(ocirc->half_streams,
+ rh->stream_id)) {
+ circuit_read_valid_data(ocirc, rh->length);
+ }
+ }
+ return 0;
+ }
+
+ if (circuit_process_stream_xoff(conn, layer_hint, cell)) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
+ }
+ }
+ return 0;
+ case RELAY_COMMAND_XON:
+ if (!conn) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+ if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
+ connection_half_edge_is_valid_data(ocirc->half_streams,
+ rh->stream_id)) {
+ circuit_read_valid_data(ocirc, rh->length);
+ }
+ }
+ return 0;
+ }
+
+ if (circuit_process_stream_xon(conn, layer_hint, cell)) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
+ }
+ }
+ return 0;
case RELAY_COMMAND_END:
reason = rh->length > 0 ?
get_uint8(cell->payload+RELAY_HEADER_SIZE) : END_STREAM_REASON_MISC;
@@ -2287,7 +2326,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
}
/* Handle the stream-level SENDME package window. */
- if (sendme_note_stream_data_packaged(conn) < 0) {
+ if (sendme_note_stream_data_packaged(conn, length) < 0) {
connection_stop_reading(TO_CONN(conn));
log_debug(domain,"conn->package_window reached 0.");
circuit_consider_stop_edge_reading(circ, cpath_layer);
@@ -2402,7 +2441,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
/* Activate reading starting from the chosen stream */
for (conn=chosen_stream; conn; conn = conn->next_stream) {
/* Start reading for the streams starting from here */
- if (conn->base_.marked_for_close || conn->package_window <= 0)
+ if (conn->base_.marked_for_close || conn->package_window <= 0 ||
+ conn->xoff_received)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
@@ -2413,7 +2453,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
}
/* Go back and do the ones we skipped, circular-style */
for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
- if (conn->base_.marked_for_close || conn->package_window <= 0)
+ if (conn->base_.marked_for_close || conn->package_window <= 0 ||
+ conn->xoff_received)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
diff --git a/src/core/or/sendme.c b/src/core/or/sendme.c
index 900490a892..ee670f9d51 100644
--- a/src/core/or/sendme.c
+++ b/src/core/or/sendme.c
@@ -22,6 +22,7 @@
#include "core/or/relay.h"
#include "core/or/sendme.h"
#include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_flow.h"
#include "feature/nodelist/networkstatus.h"
#include "lib/ctime/di_ops.h"
#include "trunnel/sendme_cell.h"
@@ -370,6 +371,10 @@ sendme_connection_edge_consider_sending(edge_connection_t *conn)
int log_domain = TO_CONN(conn)->type == CONN_TYPE_AP ? LD_APP : LD_EXIT;
+ /* If we use flow control, we do not send stream sendmes */
+ if (edge_uses_flow_control(conn))
+ goto end;
+
/* Don't send it if we still have data to deliver. */
if (connection_outbuf_too_full(TO_CONN(conn))) {
goto end;
@@ -546,6 +551,12 @@ sendme_process_stream_level(edge_connection_t *conn, circuit_t *circ,
tor_assert(conn);
tor_assert(circ);
+ if (edge_uses_flow_control(conn)) {
+ log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+ "Congestion control got stream sendme");
+ return -END_CIRC_REASON_TORPROTOCOL;
+ }
+
/* Don't allow the other endpoint to request more than our maximum (i.e.
* initial) stream SENDME window worth of data. Well-behaved stock clients
* will not request more than this max (as per the check in the while loop
@@ -603,7 +614,12 @@ int
sendme_stream_data_received(edge_connection_t *conn)
{
tor_assert(conn);
- return --conn->deliver_window;
+
+ if (edge_uses_flow_control(conn)) {
+ return flow_control_decide_xoff(conn);
+ } else {
+ return --conn->deliver_window;
+ }
}
/* Called when a relay DATA cell is packaged on the given circuit. If
@@ -651,10 +667,18 @@ sendme_note_circuit_data_packaged(circuit_t *circ, crypt_path_t *layer_hint)
/* Called when a relay DATA cell is packaged for the given edge connection
* conn. Update the package window and return its new value. */
int
-sendme_note_stream_data_packaged(edge_connection_t *conn)
+sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len)
{
tor_assert(conn);
+ if (edge_uses_flow_control(conn)) {
+ flow_control_note_sent_data(conn, len);
+ if (conn->xoff_received)
+ return -1;
+ else
+ return 1;
+ }
+
--conn->package_window;
log_debug(LD_APP, "Stream package_window now %d.", conn->package_window);
return conn->package_window;
diff --git a/src/core/or/sendme.h b/src/core/or/sendme.h
index c224d0a921..2abec91a91 100644
--- a/src/core/or/sendme.h
+++ b/src/core/or/sendme.h
@@ -33,7 +33,7 @@ int sendme_circuit_data_received(circuit_t *circ, crypt_path_t *layer_hint);
/* Update package window functions. */
int sendme_note_circuit_data_packaged(circuit_t *circ,
crypt_path_t *layer_hint);
-int sendme_note_stream_data_packaged(edge_connection_t *conn);
+int sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len);
/* Record cell digest on circuit. */
void sendme_record_cell_digest_on_circ(circuit_t *circ, crypt_path_t *cpath);
diff --git a/src/feature/nodelist/networkstatus.c b/src/feature/nodelist/networkstatus.c
index 7a1e73ef60..0138dff033 100644
--- a/src/feature/nodelist/networkstatus.c
+++ b/src/feature/nodelist/networkstatus.c
@@ -45,6 +45,7 @@
#include "core/or/channel.h"
#include "core/or/channelpadding.h"
#include "core/or/circuitpadding.h"
+#include "core/or/congestion_control_flow.h"
#include "core/or/circuitmux.h"
#include "core/or/circuitmux_ewma.h"
#include "core/or/circuitstats.h"
@@ -1699,6 +1700,7 @@ notify_after_networkstatus_changes(void)
channelpadding_new_consensus_params(c);
circpad_new_consensus_params(c);
router_new_consensus_params(c);
+ flow_control_new_consensus_params(c);
/* Maintenance of our L2 guard list */
maintain_layer2_guards();