diff options
Diffstat (limited to 'src/core')
52 files changed, 4597 insertions, 323 deletions
diff --git a/src/core/crypto/include.am b/src/core/crypto/include.am index 28b7e22905..2d53b3cb0b 100644 --- a/src/core/crypto/include.am +++ b/src/core/crypto/include.am @@ -5,6 +5,7 @@ LIBTOR_APP_A_SOURCES += \ src/core/crypto/onion_crypto.c \ src/core/crypto/onion_fast.c \ src/core/crypto/onion_ntor.c \ + src/core/crypto/onion_ntor_v3.c \ src/core/crypto/onion_tap.c \ src/core/crypto/relay_crypto.c @@ -14,5 +15,6 @@ noinst_HEADERS += \ src/core/crypto/onion_crypto.h \ src/core/crypto/onion_fast.h \ src/core/crypto/onion_ntor.h \ + src/core/crypto/onion_ntor_v3.h \ src/core/crypto/onion_tap.h \ src/core/crypto/relay_crypto.h diff --git a/src/core/crypto/onion_ntor_v3.c b/src/core/crypto/onion_ntor_v3.c new file mode 100644 index 0000000000..491c69cf8d --- /dev/null +++ b/src/core/crypto/onion_ntor_v3.c @@ -0,0 +1,760 @@ +/* Copyright (c) 2001 Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file onion_ntor_v3.c + * @brief Implements the version 3 ntor handshake as first specified in + * proposal 332. + * + * The v3 ntor handshake differs from the earlier versions (ntor and hs-ntor) + * primarily in that it allows the client to send an authenticated encrypted + * message as part of its onion skin, and allows the relay to send and + * encrypted authenticated reply as part of its response. + * + * It also takes a "verification string" -- the handshake cannot succeed + * unless both parties use the same value for their verification stream. + **/ + +#define ONION_NTOR_V3_PRIVATE + +#include "orconfig.h" +#include "core/crypto/onion_ntor_v3.h" + +#include "lib/arch/bytes.h" +#include "lib/crypt_ops/crypto_digest.h" +#include "lib/crypt_ops/crypto_rand.h" +#include "lib/crypt_ops/crypto_util.h" +#include "lib/ctime/di_ops.h" +#include "lib/log/util_bug.h" + +#include <string.h> + +/* Parameters used to keep the outputs of this handshake from colliding with + * others. These are defined in the specification. */ +#define PROTOID "ntor3-curve25519-sha3_256-1" +#define TWEAK(A) (PROTOID ":" A) + +#define T_MSGKDF TWEAK("kdf_phase1") +#define T_MSGMAC TWEAK("msg_mac") +#define T_KEY_SEED TWEAK("key_seed") +#define T_VERIFY TWEAK("verify") +#define T_FINAL TWEAK("kdf_final") +#define T_AUTH TWEAK("auth_final") + +/** + * Add @a len bytes of @a data as input to the provided @a xof. + * + * (This is provided just for abbreviation). + **/ +#define xof_add(xof, data, len) crypto_xof_add_bytes((xof), (data), (len)) +/** + * Add @a len bytes of @a data as input to the provided @a xof, + * prefixed with an encoding of the length. + * + * This is equivalent to ENCAP(data) in the spec. + **/ +static void +xof_add_encap(crypto_xof_t *xof, const uint8_t *data, size_t len) +{ + uint64_t len64 = tor_htonll(len); + xof_add(xof, (uint8_t *)(&len64), 8); + xof_add(xof, data, len); +} +/** + * Add an encapsulated tweak to the provided xof. + **/ +#define xof_add_tweak(d, s) xof_add_encap((d), (const uint8_t *)(s), strlen(s)) + +/** + * Add @a len bytes of @a data to the provided @a digest. + * + * This is provided as an abbreviation, and to get the types right. + **/ +static void +d_add(crypto_digest_t *digest, const uint8_t *data, size_t len) +{ + crypto_digest_add_bytes(digest, (const char *)data, len); +} +/** + * Add @a len bytes of @a data to the provided @a digest, prefixed + * with the encoded length. + * + * This is equivalent to ENCAP(data) from the spec. + **/ +static void +d_add_encap(crypto_digest_t *digest, const uint8_t *data, size_t len) +{ + uint64_t len64 = tor_htonll(len); + d_add(digest, (const uint8_t *)(&len64), 8); + d_add(digest, data, len); +} +/** + * Add an encapsulated tweak to the provided digest. + **/ +#define d_add_tweak(d, s) d_add_encap((d), (const uint8_t *)(s), strlen(s)) + +/** + * Helper: copy @a len bytes of @a data onto *@a ptr, and advance @a ptr + * forward by @a len bytes. + * + * Asserts that @a ptr will not be advanced beyond @a endptr. + **/ +static void +push(uint8_t **ptr, const uint8_t *endptr, const uint8_t *data, size_t len) +{ + size_t remaining = endptr - *ptr; + tor_assert(len <= remaining); + memcpy(*ptr, data, len); + *ptr += len; +} + +/** + * Helper: Drop storage held by @a state, after wiping it. + **/ +void +ntor3_handshake_state_free_(ntor3_handshake_state_t *state) +{ + if (!state) + return; + + memwipe(state, 0, sizeof(*state)); + tor_free(state); +} + +/** + * Perform a client-side v3 ntor handshake with a given relay. + * + * As inputs this function takes the relay's Ed25519 identity (@a relay_id), + * the relay's current ntor onion key (@a relay_key), a verification string + * (@a verification_len bytes at @a verification), and a message to send + * as part of the handshake (@a message_len bytes at @a message). + * + * The message will be encrypted and authenticated to the relay, but will not + * receive the same forward secrecy as the rest of the handshake. We should + * not put any super-confidential data in it. + * + * The handshake will only succeed if the relay uses the same verification + * string as we are using. + * + * As outputs, this function returns 0 on success and -1 on failure. On + * success, it sets @a onion_skin_out and @a onion_skin_len_out to a newly + * allocated handshake message that the client can send as part of its CREATE2 + * or EXTEND2 cell. It also sets it sets @a handshake_state_out to a newly + * allocated handshake state object; the client needs to use this object to + * process the relay's eventual reply. + **/ +int +onion_skin_ntor3_create(const ed25519_public_key_t *relay_id, + const curve25519_public_key_t *relay_key, + const uint8_t *verification, + const size_t verification_len, + const uint8_t *message, + const size_t message_len, + ntor3_handshake_state_t **handshake_state_out, + uint8_t **onion_skin_out, + size_t *onion_skin_len_out) +{ + curve25519_keypair_t client_keypair; + if (curve25519_keypair_generate(&client_keypair, 0) < 0) { + return -1; + } + int r = onion_skin_ntor3_create_nokeygen( + &client_keypair, + relay_id, + relay_key, + verification, + verification_len, + message, + message_len, + handshake_state_out, + onion_skin_out, + onion_skin_len_out); + memwipe(&client_keypair, 0, sizeof(client_keypair)); + return r; +} + +/** + * Like onion_skin_ntor3_create, but do not generate a new ephemeral keypair. + * Instead, take the ephemeral keypair (x,X) from @a client_keypair. + * + * (Having a separate function for this lets us test the code for correct + * behavior.) + **/ +STATIC int +onion_skin_ntor3_create_nokeygen( + const curve25519_keypair_t *client_keypair, + const ed25519_public_key_t *relay_id, + const curve25519_public_key_t *relay_key, + const uint8_t *verification, + const size_t verification_len, + const uint8_t *message, + const size_t message_len, + ntor3_handshake_state_t **handshake_state_out, + uint8_t **onion_skin_out, + size_t *onion_skin_len_out) +{ + *handshake_state_out = NULL; + *onion_skin_out = NULL; + *onion_skin_len_out = 0; + + // Set up the handshake state object. + *handshake_state_out = tor_malloc_zero(sizeof(ntor3_handshake_state_t)); + memcpy(&(*handshake_state_out)->client_keypair, client_keypair, + sizeof(*client_keypair)); + memcpy(&(*handshake_state_out)->relay_id, relay_id, sizeof(*relay_id)); + memcpy(&(*handshake_state_out)->relay_key, relay_key, sizeof(*relay_key)); + + // Perform the first DH handshake. + curve25519_handshake((*handshake_state_out)->bx, + &client_keypair->seckey, relay_key); + if (safe_mem_is_zero((*handshake_state_out)->bx, CURVE25519_OUTPUT_LEN)) { + // Okay to return early here, since our behavior here doesn't + // cause a visible timing sidechannel. + return -1; + } + + // Compute phase1_keys. + uint8_t enc_key[CIPHER256_KEY_LEN]; + uint8_t mac_key[DIGEST256_LEN]; + { + crypto_xof_t *xof = crypto_xof_new(); + // secret_input_phase1 = Bx | ID | X | B | PROTOID | ENCAP(VER) + xof_add_tweak(xof, T_MSGKDF); + xof_add(xof, (*handshake_state_out)->bx, CURVE25519_OUTPUT_LEN); + xof_add(xof, relay_id->pubkey, ED25519_PUBKEY_LEN); + xof_add(xof, client_keypair->pubkey.public_key, CURVE25519_PUBKEY_LEN); + xof_add(xof, relay_key->public_key, CURVE25519_PUBKEY_LEN); + xof_add(xof, (const uint8_t *)PROTOID, strlen(PROTOID)); + xof_add_encap(xof, verification, verification_len); + crypto_xof_squeeze_bytes(xof, enc_key, sizeof(enc_key)); + crypto_xof_squeeze_bytes(xof, mac_key, sizeof(mac_key)); + crypto_xof_free(xof); + } + + // Compute encrypted message. + uint8_t *encrypted_message = tor_memdup(message, message_len); + { + crypto_cipher_t *c = + crypto_cipher_new_with_bits((const char *)enc_key, 256); + crypto_cipher_crypt_inplace(c, (char *)encrypted_message, message_len); + crypto_cipher_free(c); + } + + // Compute the MAC value. + { + crypto_digest_t *m = crypto_digest256_new(DIGEST_SHA3_256); + d_add_tweak(m, T_MSGMAC); + d_add_encap(m, mac_key, sizeof(mac_key)); + d_add(m, relay_id->pubkey, ED25519_PUBKEY_LEN); + d_add(m, relay_key->public_key, CURVE25519_PUBKEY_LEN); + d_add(m, client_keypair->pubkey.public_key, CURVE25519_PUBKEY_LEN); + d_add(m, encrypted_message, message_len); + crypto_digest_get_digest(m, + (char *)(*handshake_state_out)->msg_mac, + DIGEST256_LEN); + crypto_digest_free(m); + } + + // Build the onionskin. + *onion_skin_len_out = (ED25519_PUBKEY_LEN + CURVE25519_PUBKEY_LEN*2 + + DIGEST256_LEN + message_len); + *onion_skin_out = tor_malloc(*onion_skin_len_out); + { + uint8_t *ptr = *onion_skin_out, *end = ptr + *onion_skin_len_out; + + push(&ptr, end, relay_id->pubkey, ED25519_PUBKEY_LEN); + push(&ptr, end, relay_key->public_key, CURVE25519_PUBKEY_LEN); + push(&ptr, end, client_keypair->pubkey.public_key, CURVE25519_PUBKEY_LEN); + push(&ptr, end, encrypted_message, message_len); + push(&ptr, end, (*handshake_state_out)->msg_mac, DIGEST256_LEN); + tor_assert(ptr == end); + } + + memwipe(&enc_key, 0, sizeof(enc_key)); + memwipe(&mac_key, 0, sizeof(mac_key)); + memwipe(encrypted_message, 0, message_len); + tor_free(encrypted_message); + + return 0; +} + +/** + * Complete a client-side v3 ntor handshake. + * + * Takes a @a handshake_state returned earlier by `onion_skin_ntor3_create()`, + * and the relay's reply to that handshake (@a reply_len bytes at @a + * handshake_reply). Also takes a verification string (@a verification_len + * bytes at @a verification). + * + * Returns 0 on success and -1 on failure. On success, generates @a key_len + * bytes of key material into the provided @a keys_out buffer, and sets @a + * message_out to the message that the relay sent in reply to our message (and + * sets @a message_out_len to that message's length). + **/ +int +onion_ntor3_client_handshake(const ntor3_handshake_state_t *handshake_state, + const uint8_t *handshake_reply, + size_t reply_len, + const uint8_t *verification, + size_t verification_len, + uint8_t *keys_out, + size_t keys_out_len, + uint8_t **message_out, + size_t *message_len_out) +{ + *message_out = NULL; + *message_len_out = 0; + + int problems = 0; + + // Parse the relay's message. + curve25519_public_key_t relay_Y; + uint8_t relay_auth[DIGEST256_LEN]; + size_t encrypted_msg_len; + const uint8_t *encrypted_msg; + { + if (reply_len < CURVE25519_PUBKEY_LEN + DIGEST256_LEN) { + // Okay to return early here, since the message is completely + // ill-formed, so we can't leak anything. + ++problems; + goto done; + } + encrypted_msg_len = reply_len - (CURVE25519_PUBKEY_LEN + DIGEST256_LEN); + + memcpy(&relay_Y.public_key, handshake_reply, CURVE25519_PUBKEY_LEN); + handshake_reply += CURVE25519_PUBKEY_LEN; + memcpy(&relay_auth, handshake_reply, DIGEST256_LEN); + handshake_reply += DIGEST256_LEN; + encrypted_msg = handshake_reply; + } + + // Finish the second diffie hellman handshake. + uint8_t yx[CURVE25519_OUTPUT_LEN]; + curve25519_handshake(yx, &handshake_state->client_keypair.seckey, &relay_Y); + problems |= safe_mem_is_zero(yx, sizeof(yx)); + + // Compute two tweaked hashes of secret_input. + uint8_t key_seed[DIGEST256_LEN], verify[DIGEST256_LEN]; + { + crypto_digest_t *ks = crypto_digest256_new(DIGEST_SHA3_256); + crypto_digest_t *v = crypto_digest256_new(DIGEST_SHA3_256); + d_add_tweak(ks, T_KEY_SEED); + d_add_tweak(v, T_VERIFY); +#define ADD2(s,len) STMT_BEGIN { \ + d_add(ks, (s),(len)); d_add(v, (s), (len)); \ + } STMT_END +#define ADD2_ENCAP(s,len) STMT_BEGIN { \ + d_add_encap(ks, (s),(len)); d_add_encap(v, (s), (len)); \ + } STMT_END + + ADD2(yx, sizeof(yx)); + ADD2(handshake_state->bx, sizeof(handshake_state->bx)); + ADD2(handshake_state->relay_id.pubkey, ED25519_PUBKEY_LEN); + ADD2(handshake_state->relay_key.public_key, CURVE25519_PUBKEY_LEN); + ADD2(handshake_state->client_keypair.pubkey.public_key, + CURVE25519_PUBKEY_LEN); + ADD2(relay_Y.public_key, CURVE25519_PUBKEY_LEN); + ADD2((const uint8_t *)PROTOID, strlen(PROTOID)); + ADD2_ENCAP(verification, verification_len); + + crypto_digest_get_digest(ks, (char*) key_seed, DIGEST256_LEN); + crypto_digest_get_digest(v, (char*) verify, DIGEST256_LEN); + crypto_digest_free(ks); + crypto_digest_free(v); + } + + // compute expected auth value. + uint8_t auth_computed[DIGEST256_LEN]; + { + crypto_digest_t *d = crypto_digest256_new(DIGEST_SHA3_256); + d_add_tweak(d, T_AUTH); + d_add(d, verify, sizeof(verify)); + d_add(d, handshake_state->relay_id.pubkey, ED25519_PUBKEY_LEN); + d_add(d, handshake_state->relay_key.public_key, CURVE25519_PUBKEY_LEN); + d_add(d, relay_Y.public_key, CURVE25519_PUBKEY_LEN); + d_add(d, handshake_state->client_keypair.pubkey.public_key, + CURVE25519_PUBKEY_LEN); + d_add(d, handshake_state->msg_mac, DIGEST256_LEN); + d_add_encap(d, encrypted_msg, encrypted_msg_len); + d_add(d, (const uint8_t*)PROTOID, strlen(PROTOID)); + d_add(d, (const uint8_t*)"Server", strlen("Server")); + crypto_digest_get_digest(d, (char *)auth_computed, DIGEST256_LEN); + crypto_digest_free(d); + } + + // Check authentication value. + problems |= tor_memneq(auth_computed, relay_auth, DIGEST256_LEN); + + // Compute keystream, decrypt message, and return. + *message_out = tor_malloc(encrypted_msg_len); + *message_len_out = encrypted_msg_len; + uint8_t enc_key[CIPHER256_KEY_LEN]; + { + crypto_xof_t *xof = crypto_xof_new(); + xof_add_tweak(xof, T_FINAL); + xof_add(xof, key_seed, sizeof(key_seed)); + crypto_xof_squeeze_bytes(xof, enc_key, sizeof(enc_key)); + crypto_xof_squeeze_bytes(xof, (uint8_t *)keys_out, keys_out_len); + crypto_xof_free(xof); + + crypto_cipher_t *c = + crypto_cipher_new_with_bits((const char *)enc_key, 256); + crypto_cipher_decrypt(c, (char *)*message_out, + (const char *)encrypted_msg, encrypted_msg_len); + crypto_cipher_free(c); + } + + done: + memwipe(&relay_Y, 0, sizeof(relay_Y)); + memwipe(&relay_auth, 0, sizeof(relay_auth)); + memwipe(&yx, 0, sizeof(yx)); + memwipe(key_seed, 0, sizeof(key_seed)); + memwipe(verify, 0, sizeof(verify)); + memwipe(enc_key, 0, sizeof(enc_key)); + if (problems) { + if (*message_out) { + memwipe(*message_out, 0, *message_len_out); + tor_free(*message_out); // Sets it to NULL. + } + *message_len_out = 0; + crypto_rand((char*)keys_out, keys_out_len); // In case bad code uses it. + return -1; + } + + return 0; +} + +/** + * Wipe a server handshake state, and release the storage it holds. + **/ +void +ntor3_server_handshake_state_free_(ntor3_server_handshake_state_t *state) +{ + if (state == NULL) + return; + + memwipe(state, 0, sizeof(ntor3_server_handshake_state_t)); + tor_free(state); +} + +/** + * As a relay, start handling a client's v3 ntor handshake. + * + * This function performs the _first half_ of the handshake, up to the point + * where the client's message is decoded. After calling it, the relay should + * decide how and whether to reply to the client's message, compose its reply, + * and call `onion_skin_ntor3_server_handshake_part2`. + * + * It takes as input a map of the relay's known onion keys in @a private_keys, + * along with a fake @a junk_key to use if there is a complete mismatch. It + * takes the relay's ed25519 identity in @a my_id, along with the client's + * handshake message (@a client_handshake_len bytes in @a client_handshake), + * and a verification string (@a verification_len bytes in @a verification). + * + * Return 0 on success, and -1 on failure. On success, sets @a + * client_message_out to a newly allocated string holding the plaintext of the + * message that the client sent as part of its handshake, and @a + * client_message_out_len to its length. Also sets @a state_out to a newly + * allocated state object holding the intermediate computation for this + * handshake. + **/ +int +onion_skin_ntor3_server_handshake_part1( + const di_digest256_map_t *private_keys, + const curve25519_keypair_t *junk_key, + const ed25519_public_key_t *my_id, + const uint8_t *client_handshake, + size_t client_handshake_len, + const uint8_t *verification, + size_t verification_len, + uint8_t **client_message_out, + size_t *client_message_len_out, + ntor3_server_handshake_state_t **state_out) +{ + *client_message_out = NULL; + *client_message_len_out = 0; + *state_out = NULL; + + int problems = 0; + + // Initialize state. + (*state_out) = tor_malloc_zero(sizeof(ntor3_server_handshake_state_t)); + memcpy(&(*state_out)->my_id, my_id, sizeof(*my_id)); + + const uint8_t *wanted_id; // [ED25519_PUBKEY_LEN] + const uint8_t *wanted_key; // [CURVE25519_PUBKEY_LEN] + const uint8_t *encrypted_message; + size_t encrypted_message_len; + // Unpack the client handshake. + { + const uint8_t *ptr = client_handshake; + const uint8_t *end = ptr + client_handshake_len; + + if (client_handshake_len < + ED25519_PUBKEY_LEN + CURVE25519_PUBKEY_LEN * 2 + DIGEST256_LEN) { + // Okay to end early; the client knows this is unparseable already. + ++problems; + goto done; + } + wanted_id = ptr; + ptr += ED25519_PUBKEY_LEN; + wanted_key = ptr; + ptr += CURVE25519_PUBKEY_LEN; + memcpy((*state_out)->client_key.public_key, ptr, CURVE25519_PUBKEY_LEN); + ptr += CURVE25519_PUBKEY_LEN; + size_t remaining = (end-ptr); + if (BUG(remaining < DIGEST256_LEN)) { + // Okay to end early; this is a bug. + ++problems; + goto done; + } + encrypted_message = ptr; + encrypted_message_len = remaining - DIGEST256_LEN; + ptr += encrypted_message_len; + remaining = (end-ptr); + tor_assert(remaining == DIGEST256_LEN); + memcpy((*state_out)->msg_mac, ptr, DIGEST256_LEN); + } + + // Check the identity. + problems |= tor_memneq(my_id->pubkey, wanted_id, ED25519_PUBKEY_LEN); + + // Find the correct keypair. + const curve25519_keypair_t *keypair = + dimap_search(private_keys, wanted_key, (void *)junk_key); + tor_assert(keypair); + memcpy(&(*state_out)->my_key, &keypair->pubkey, + sizeof(curve25519_public_key_t)); + + // Do the first diffie hellman handshake. + curve25519_handshake((*state_out)->xb, + &keypair->seckey, &(*state_out)->client_key); + problems |= safe_mem_is_zero((*state_out)->xb, CURVE25519_OUTPUT_LEN); + + // Derive the encryption and mac keys + uint8_t enc_key[CIPHER256_KEY_LEN], mac_key[DIGEST256_LEN]; + { + crypto_xof_t *xof = crypto_xof_new(); + xof_add_tweak(xof, T_MSGKDF); + xof_add(xof, (*state_out)->xb, CURVE25519_OUTPUT_LEN); + xof_add(xof, wanted_id, ED25519_PUBKEY_LEN); + xof_add(xof, (*state_out)->client_key.public_key, CURVE25519_PUBKEY_LEN); + xof_add(xof, keypair->pubkey.public_key, CURVE25519_PUBKEY_LEN); + xof_add(xof, (const uint8_t *)PROTOID, strlen(PROTOID)); + xof_add_encap(xof, verification, verification_len); + crypto_xof_squeeze_bytes(xof, enc_key, sizeof(enc_key)); + crypto_xof_squeeze_bytes(xof, mac_key, sizeof(mac_key)); + crypto_xof_free(xof); + } + + // Check the MAC. + uint8_t computed_mac[DIGEST256_LEN]; + { + crypto_digest_t *d = crypto_digest256_new(DIGEST_SHA3_256); + d_add_tweak(d, T_MSGMAC); + d_add_encap(d, mac_key, sizeof(mac_key)); + d_add(d, my_id->pubkey, ED25519_PUBKEY_LEN); + d_add(d, keypair->pubkey.public_key, CURVE25519_PUBKEY_LEN); + d_add(d, (*state_out)->client_key.public_key, CURVE25519_PUBKEY_LEN); + d_add(d, encrypted_message, encrypted_message_len); + crypto_digest_get_digest(d, (char *)computed_mac, DIGEST256_LEN); + crypto_digest_free(d); + } + + problems |= tor_memneq((*state_out)->msg_mac, computed_mac, DIGEST256_LEN); + + // Decrypt the message. + *client_message_out = tor_malloc(encrypted_message_len); + *client_message_len_out = encrypted_message_len; + { + crypto_cipher_t *c = + crypto_cipher_new_with_bits((const char *)enc_key, 256); + crypto_cipher_decrypt(c, (char *)*client_message_out, + (const char *)encrypted_message, + encrypted_message_len); + crypto_cipher_free(c); + } + + done: + memwipe(enc_key, 0, sizeof(enc_key)); + memwipe(mac_key, 0, sizeof(mac_key)); + memwipe(computed_mac, 0, sizeof(computed_mac)); + if (problems) { + if (*client_message_out) { + memwipe(*client_message_out, 0, *client_message_len_out); + tor_free(*client_message_out); // Sets it to NULL. + } + *client_message_len_out = 0; + ntor3_server_handshake_state_free(*state_out); + return -1; + } + + return 0; +} + +/** + * Finish the relay side of an ntor v3 handshake. + * + * The relay calls this function after it has decided to respond to the + * client's original encrypted message. This function receives the relay's + * message in @a server_message and its length in @a server_message_len, and + * completes the handshake. + * + * Returns 0 on success and -1 on failure. On success, stores the newly + * allocated handshake for the relay to send in @a handshake_out, and its + * length in @a handshake_len_out. Stores @a keys_out_len bytes of generated + * keys in the provided buffer at @a keys_out. + **/ +int +onion_skin_ntor3_server_handshake_part2( + const ntor3_server_handshake_state_t *state, + const uint8_t *verification, + size_t verification_len, + const uint8_t *server_message, + size_t server_message_len, + uint8_t **handshake_out, + size_t *handshake_len_out, + uint8_t *keys_out, + size_t keys_out_len) +{ + curve25519_keypair_t relay_keypair; + if (curve25519_keypair_generate(&relay_keypair, 0) < 0) { + return -1; + } + int r = onion_skin_ntor3_server_handshake_part2_nokeygen( + &relay_keypair, + state, + verification, + verification_len, + server_message, + server_message_len, + handshake_out, + handshake_len_out, + keys_out, + keys_out_len); + memwipe(&relay_keypair, 0, sizeof(relay_keypair)); + return r; +} + +/** + * Like `onion_skin_ntor3_server_handshake_part2`, but do not generate + * an ephemeral (y,Y) keypair. + * + * Instead, this function takes that keypair as @a relay_keypair_y. + * + * (Having a separate function for this lets us test the code for correct + * behavior.) + **/ +STATIC int +onion_skin_ntor3_server_handshake_part2_nokeygen( + const curve25519_keypair_t *relay_keypair_y, + const ntor3_server_handshake_state_t *state, + const uint8_t *verification, + size_t verification_len, + const uint8_t *server_message, + size_t server_message_len, + uint8_t **handshake_out, + size_t *handshake_len_out, + uint8_t *keys_out, + size_t keys_out_len) +{ + *handshake_out = NULL; + *handshake_len_out = 0; + + int problems = 0; + + // Second diffie-hellman handshake. + uint8_t xy[CURVE25519_OUTPUT_LEN]; + curve25519_handshake(xy, &relay_keypair_y->seckey, &state->client_key); + problems |= safe_mem_is_zero(xy, sizeof(xy)); + + // Compute two tweaked hashes of secret_input. + uint8_t key_seed[DIGEST256_LEN], verify[DIGEST256_LEN]; + { + crypto_digest_t *ks = crypto_digest256_new(DIGEST_SHA3_256); + crypto_digest_t *v = crypto_digest256_new(DIGEST_SHA3_256); + d_add_tweak(ks, T_KEY_SEED); + d_add_tweak(v, T_VERIFY); + ADD2(xy, sizeof(xy)); + ADD2(state->xb, sizeof(state->xb)); + ADD2(state->my_id.pubkey, ED25519_PUBKEY_LEN); + ADD2(state->my_key.public_key, CURVE25519_PUBKEY_LEN); + ADD2(state->client_key.public_key, CURVE25519_PUBKEY_LEN); + ADD2(relay_keypair_y->pubkey.public_key, CURVE25519_PUBKEY_LEN); + ADD2((const uint8_t *)PROTOID, strlen(PROTOID)); + ADD2_ENCAP(verification, verification_len); + crypto_digest_get_digest(ks, (char*) key_seed, DIGEST256_LEN); + crypto_digest_get_digest(v, (char*) verify, DIGEST256_LEN); + crypto_digest_free(ks); + crypto_digest_free(v); + } + + // Compute enc_key and keystream. + uint8_t enc_key[CIPHER256_KEY_LEN]; + { + crypto_xof_t *xof = crypto_xof_new(); + xof_add_tweak(xof, T_FINAL); + xof_add(xof, key_seed, sizeof(key_seed)); + crypto_xof_squeeze_bytes(xof, enc_key, sizeof(enc_key)); + crypto_xof_squeeze_bytes(xof, keys_out, keys_out_len); + crypto_xof_free(xof); + } + + // Encrypt message. + uint8_t *encrypted_message = tor_memdup(server_message, server_message_len); + { + crypto_cipher_t *c = + crypto_cipher_new_with_bits((const char *)enc_key, 256); + crypto_cipher_crypt_inplace( + c, (char *)encrypted_message, server_message_len); + crypto_cipher_free(c); + } + + // Compute AUTH digest. + uint8_t auth[DIGEST256_LEN]; + { + crypto_digest_t *d = crypto_digest256_new(DIGEST_SHA3_256); + d_add_tweak(d, T_AUTH); + d_add(d, verify, sizeof(verify)); + d_add(d, state->my_id.pubkey, ED25519_PUBKEY_LEN); + d_add(d, state->my_key.public_key, CURVE25519_PUBKEY_LEN); + d_add(d, relay_keypair_y->pubkey.public_key, CURVE25519_PUBKEY_LEN); + d_add(d, state->client_key.public_key, CURVE25519_PUBKEY_LEN); + d_add(d, state->msg_mac, DIGEST256_LEN); + d_add_encap(d, encrypted_message, server_message_len); + d_add(d, (const uint8_t*)PROTOID, strlen(PROTOID)); + d_add(d, (const uint8_t*)"Server", strlen("Server")); + crypto_digest_get_digest(d, (char *)auth, DIGEST256_LEN); + crypto_digest_free(d); + } + + // Compose the reply. + *handshake_len_out = CURVE25519_PUBKEY_LEN + DIGEST256_LEN + + server_message_len; + *handshake_out = tor_malloc(*handshake_len_out); + uint8_t *ptr = *handshake_out, *end = ptr + *handshake_len_out; + push(&ptr, end, relay_keypair_y->pubkey.public_key, CURVE25519_PUBKEY_LEN); + push(&ptr, end, auth, sizeof(auth)); + push(&ptr, end, encrypted_message, server_message_len); + tor_assert(ptr == end); + + // Clean up and return. + memwipe(xy, 0, sizeof(xy)); + memwipe(key_seed, 0, sizeof(key_seed)); + memwipe(verify, 0, sizeof(verify)); + memwipe(enc_key, 0, sizeof(enc_key)); + memwipe(encrypted_message, 0, server_message_len); + tor_free(encrypted_message); + + if (problems) { + memwipe(*handshake_out, 0, *handshake_len_out); + tor_free(*handshake_out); // Sets it to NULL. + *handshake_len_out = 0; + crypto_rand((char*)keys_out, keys_out_len); // In case bad code uses it. + return -1; + } + return 0; +} diff --git a/src/core/crypto/onion_ntor_v3.h b/src/core/crypto/onion_ntor_v3.h new file mode 100644 index 0000000000..4449eb237d --- /dev/null +++ b/src/core/crypto/onion_ntor_v3.h @@ -0,0 +1,140 @@ +/* Copyright (c) 2001 Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file onion_ntor_v3.h + * @brief Header for core/crypto/onion_ntor_v3.c + **/ + +#ifndef TOR_CORE_CRYPTO_ONION_NTOR_V3_H +#define TOR_CORE_CRYPTO_ONION_NTOR_V3_H + +#include "lib/cc/torint.h" +#include "lib/testsupport/testsupport.h" +#include "lib/crypt_ops/crypto_cipher.h" +#include "lib/crypt_ops/crypto_curve25519.h" +#include "lib/crypt_ops/crypto_ed25519.h" +#include "lib/malloc/malloc.h" + +/** + * Client-side state held while an ntor v3 handshake is in progress. + **/ +typedef struct ntor3_handshake_state_t ntor3_handshake_state_t; + +/** + * Server-side state held while the relay is handling a client's + * encapsulated message, before replying to the v3 handshake. + **/ +typedef struct ntor3_server_handshake_state_t ntor3_server_handshake_state_t; + +void ntor3_handshake_state_free_(ntor3_handshake_state_t *st); +#define ntor3_handshake_state_free(ptr) \ + FREE_AND_NULL(ntor3_handshake_state_t, ntor3_handshake_state_free_, (ptr)) +void ntor3_server_handshake_state_free_(ntor3_server_handshake_state_t *st); +#define ntor3_server_handshake_state_free(ptr) \ + FREE_AND_NULL(ntor3_server_handshake_state_t, \ + ntor3_server_handshake_state_free_, (ptr)) + +int onion_skin_ntor3_create(const ed25519_public_key_t *relay_id, + const curve25519_public_key_t *relay_key, + const uint8_t *verification, + const size_t verification_len, + const uint8_t *message, + const size_t message_len, + ntor3_handshake_state_t **handshake_state_out, + uint8_t **onion_skin_out, + size_t *onion_skin_len_out); + +int onion_ntor3_client_handshake( + const ntor3_handshake_state_t *handshake_state, + const uint8_t *handshake_reply, + size_t reply_len, + const uint8_t *verification, + size_t verification_len, + uint8_t *keys_out, + size_t keys_out_len, + uint8_t **message_out, + size_t *message_len_out); + +struct di_digest256_map_t; +int onion_skin_ntor3_server_handshake_part1( + const struct di_digest256_map_t *private_keys, + const curve25519_keypair_t *junk_key, + const ed25519_public_key_t *my_id, + const uint8_t *client_handshake, + size_t client_handshake_len, + const uint8_t *verification, + size_t verification_len, + uint8_t **client_message_out, + size_t *client_message_len_out, + ntor3_server_handshake_state_t **state_out); + +int onion_skin_ntor3_server_handshake_part2( + const ntor3_server_handshake_state_t *state, + const uint8_t *verification, + size_t verification_len, + const uint8_t *server_message, + size_t server_message_len, + uint8_t **handshake_out, + size_t *handshake_len_out, + uint8_t *keys_out, + size_t keys_out_len); + +#ifdef ONION_NTOR_V3_PRIVATE +struct ntor3_handshake_state_t { + /** Ephemeral (x,X) keypair. */ + curve25519_keypair_t client_keypair; + /** Relay's ed25519 identity key (ID) */ + ed25519_public_key_t relay_id; + /** Relay's public key (B) */ + curve25519_public_key_t relay_key; + /** Shared secret (Bx). */ + uint8_t bx[CURVE25519_OUTPUT_LEN]; + /** MAC of the client's encrypted message data (MAC) */ + uint8_t msg_mac[DIGEST256_LEN]; +}; + +struct ntor3_server_handshake_state_t { + /** Relay's ed25519 identity key (ID) */ + ed25519_public_key_t my_id; + /** Relay's public key (B) */ + curve25519_public_key_t my_key; + /** Client's public ephemeral key (X). */ + curve25519_public_key_t client_key; + + /** Shared secret (Xb) */ + uint8_t xb[CURVE25519_OUTPUT_LEN]; + /** MAC of the client's encrypted message data */ + uint8_t msg_mac[DIGEST256_LEN]; +}; + +STATIC int onion_skin_ntor3_create_nokeygen( + const curve25519_keypair_t *client_keypair, + const ed25519_public_key_t *relay_id, + const curve25519_public_key_t *relay_key, + const uint8_t *verification, + const size_t verification_len, + const uint8_t *message, + const size_t message_len, + ntor3_handshake_state_t **handshake_state_out, + uint8_t **onion_skin_out, + size_t *onion_skin_len_out); + +STATIC int onion_skin_ntor3_server_handshake_part2_nokeygen( + const curve25519_keypair_t *relay_keypair_y, + const ntor3_server_handshake_state_t *state, + const uint8_t *verification, + size_t verification_len, + const uint8_t *server_message, + size_t server_message_len, + uint8_t **handshake_out, + size_t *handshake_len_out, + uint8_t *keys_out, + size_t keys_out_len); + +#endif + +#endif /* !defined(TOR_CORE_CRYPTO_ONION_NTOR_V3_H) */ diff --git a/src/core/mainloop/connection.c b/src/core/mainloop/connection.c index b17d7bf2bd..9271a70914 100644 --- a/src/core/mainloop/connection.c +++ b/src/core/mainloop/connection.c @@ -117,6 +117,7 @@ #include "lib/cc/ctassert.h" #include "lib/sandbox/sandbox.h" #include "lib/net/buffers_net.h" +#include "lib/net/address.h" #include "lib/tls/tortls.h" #include "lib/evloop/compat_libevent.h" #include "lib/compress/compress.h" @@ -146,6 +147,8 @@ #include "feature/nodelist/routerinfo_st.h" #include "core/or/socks_request_st.h" +#include "core/or/congestion_control_flow.h" + /** * On Windows and Linux we cannot reliably bind() a socket to an * address and port if: 1) There's already a socket bound to wildcard @@ -250,13 +253,13 @@ CONST_TO_LISTENER_CONN(const connection_t *c) } size_t -connection_get_inbuf_len(connection_t *conn) +connection_get_inbuf_len(const connection_t *conn) { return conn->inbuf ? buf_datalen(conn->inbuf) : 0; } size_t -connection_get_outbuf_len(connection_t *conn) +connection_get_outbuf_len(const connection_t *conn) { return conn->outbuf ? buf_datalen(conn->outbuf) : 0; } @@ -612,6 +615,11 @@ entry_connection_new(int type, int socket_family) entry_conn->entry_cfg.ipv4_traffic = 1; else if (socket_family == AF_INET6) entry_conn->entry_cfg.ipv6_traffic = 1; + + /* Initialize the read token bucket to the maximum value which is the same as + * no rate limiting. */ + token_bucket_rw_init(&ENTRY_TO_EDGE_CONN(entry_conn)->bucket, INT32_MAX, + INT32_MAX, monotime_coarse_get_stamp()); return entry_conn; } @@ -623,6 +631,10 @@ edge_connection_new(int type, int socket_family) edge_connection_t *edge_conn = tor_malloc_zero(sizeof(edge_connection_t)); tor_assert(type == CONN_TYPE_EXIT); connection_init(time(NULL), TO_CONN(edge_conn), type, socket_family); + /* Initialize the read token bucket to the maximum value which is the same as + * no rate limiting. */ + token_bucket_rw_init(&edge_conn->bucket, INT32_MAX, INT32_MAX, + monotime_coarse_get_stamp()); return edge_conn; } @@ -1261,7 +1273,7 @@ socket_failed_from_resource_exhaustion(void) */ if (get_max_sockets() > 65535) { /* TCP port exhaustion */ - rep_hist_note_overload(OVERLOAD_GENERAL); + rep_hist_note_tcp_exhaustion(); } else { /* File descriptor exhaustion */ rep_hist_note_overload(OVERLOAD_FD_EXHAUSTED); @@ -3457,6 +3469,19 @@ connection_bucket_read_limit(connection_t *conn, time_t now) base = get_cell_network_size(or_conn->wide_circ_ids); } + /* Edge connection have their own read bucket due to flow control being able + * to set a rate limit for them. However, for exit connections, we still need + * to honor the global bucket as well. */ + if (CONN_IS_EDGE(conn)) { + const edge_connection_t *edge_conn = CONST_TO_EDGE_CONN(conn); + conn_bucket = token_bucket_rw_get_read(&edge_conn->bucket); + if (conn->type == CONN_TYPE_EXIT) { + /* Decide between our limit and the global one. */ + goto end; + } + return conn_bucket; + } + if (!connection_is_rate_limited(conn)) { /* be willing to read on local conns even if our buckets are empty */ return conn_bucket>=0 ? conn_bucket : 1<<14; @@ -3467,6 +3492,7 @@ connection_bucket_read_limit(connection_t *conn, time_t now) global_bucket_val = MIN(global_bucket_val, relayed); } + end: return connection_bucket_get_share(base, priority, global_bucket_val, conn_bucket); } @@ -3644,6 +3670,13 @@ connection_buckets_decrement(connection_t *conn, time_t now, record_num_bytes_transferred_impl(conn, now, num_read, num_written); + /* Edge connection need to decrement the read side of the bucket used by our + * congestion control. */ + if (CONN_IS_EDGE(conn) && num_read > 0) { + edge_connection_t *edge_conn = TO_EDGE_CONN(conn); + token_bucket_rw_dec(&edge_conn->bucket, num_read, 0); + } + if (!connection_is_rate_limited(conn)) return; /* local IPs are free */ @@ -3697,14 +3730,16 @@ connection_write_bw_exhausted(connection_t *conn, bool is_global_bw) void connection_consider_empty_read_buckets(connection_t *conn) { + int is_global = 1; const char *reason; - if (!connection_is_rate_limited(conn)) + if (CONN_IS_EDGE(conn) && + token_bucket_rw_get_read(&TO_EDGE_CONN(conn)->bucket) <= 0) { + reason = "edge connection read bucket exhausted. Pausing."; + is_global = false; + } else if (!connection_is_rate_limited(conn)) { return; /* Always okay. */ - - int is_global = 1; - - if (token_bucket_rw_get_read(&global_bucket) <= 0) { + } else if (token_bucket_rw_get_read(&global_bucket) <= 0) { reason = "global read bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && token_bucket_rw_get_read(&global_relayed_bucket) <= 0) { @@ -3714,8 +3749,9 @@ connection_consider_empty_read_buckets(connection_t *conn) token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection read bucket exhausted. Pausing."; is_global = false; - } else + } else { return; /* all good, no need to stop it */ + } LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason)); connection_read_bw_exhausted(conn, is_global); @@ -3819,6 +3855,10 @@ connection_bucket_refill_single(connection_t *conn, uint32_t now_ts) or_connection_t *or_conn = TO_OR_CONN(conn); token_bucket_rw_refill(&or_conn->bucket, now_ts); } + + if (CONN_IS_EDGE(conn)) { + token_bucket_rw_refill(&TO_EDGE_CONN(conn)->bucket, now_ts); + } } /** @@ -4556,9 +4596,9 @@ connection_handle_write_impl(connection_t *conn, int force) !dont_stop_writing) { /* it's done flushing */ if (connection_finished_flushing(conn) < 0) { /* already marked */ - return -1; + goto err; } - return 0; + goto done; } /* Call even if result is 0, since the global write bucket may @@ -4568,7 +4608,17 @@ connection_handle_write_impl(connection_t *conn, int force) if (n_read > 0 && connection_is_reading(conn)) connection_consider_empty_read_buckets(conn); + done: + /* If this is an edge connection with congestion control, check to see + * if it is time to send an xon */ + if (conn_uses_flow_control(conn)) { + flow_control_decide_xon(TO_EDGE_CONN(conn), n_written); + } + return 0; + + err: + return -1; } /* DOCDOC connection_handle_write */ diff --git a/src/core/mainloop/connection.h b/src/core/mainloop/connection.h index 36c94d6570..8b378b15a4 100644 --- a/src/core/mainloop/connection.h +++ b/src/core/mainloop/connection.h @@ -274,8 +274,8 @@ void connection_buf_add_compress(const char *string, size_t len, struct dir_connection_t *conn, int done); void connection_buf_add_buf(struct connection_t *conn, struct buf_t *buf); -size_t connection_get_inbuf_len(struct connection_t *conn); -size_t connection_get_outbuf_len(struct connection_t *conn); +size_t connection_get_inbuf_len(const struct connection_t *conn); +size_t connection_get_outbuf_len(const struct connection_t *conn); struct connection_t *connection_get_by_global_id(uint64_t id); struct connection_t *connection_get_by_type(int type); diff --git a/src/core/mainloop/mainloop.c b/src/core/mainloop/mainloop.c index 69606c0d53..cd57dea3d4 100644 --- a/src/core/mainloop/mainloop.c +++ b/src/core/mainloop/mainloop.c @@ -641,6 +641,13 @@ connection_start_reading,(connection_t *conn)) if (connection_should_read_from_linked_conn(conn)) connection_start_reading_from_linked_conn(conn); } else { + if (CONN_IS_EDGE(conn) && TO_EDGE_CONN(conn)->xoff_received) { + /* We should not get called here if we're waiting for an XON, but + * belt-and-suspenders */ + log_notice(LD_NET, + "Request to start reading on an edgeconn blocked with XOFF"); + return; + } if (event_add(conn->read_event, NULL)) log_warn(LD_NET, "Error from libevent setting read event state for %d " "to watched: %s", @@ -1293,6 +1300,7 @@ signewnym_impl(time_t now) circuit_mark_all_dirty_circs_as_unusable(); addressmap_clear_transient(); hs_client_purge_state(); + purge_vanguards_lite(); time_of_last_signewnym = now; signewnym_is_pending = 0; @@ -1370,6 +1378,7 @@ CALLBACK(save_state); CALLBACK(write_stats_file); CALLBACK(control_per_second_events); CALLBACK(second_elapsed); +CALLBACK(manage_vglite); #undef CALLBACK @@ -1392,6 +1401,9 @@ STATIC periodic_event_item_t mainloop_periodic_events[] = { CALLBACK(second_elapsed, NET_PARTICIPANT, FL(RUN_ON_DISABLE)), + /* Update vanguards-lite once per hour, if we have networking */ + CALLBACK(manage_vglite, NET_PARTICIPANT, FL(NEED_NET)), + /* XXXX Do we have a reason to do this on a callback? Does it do any good at * all? For now, if we're dormant, we can let our listeners decay. */ CALLBACK(retry_listeners, NET_PARTICIPANT, FL(NEED_NET)), @@ -1662,6 +1674,21 @@ mainloop_schedule_shutdown(int delay_sec) mainloop_event_schedule(scheduled_shutdown_ev, &delay_tv); } +/** + * Update vanguards-lite layer2 nodes, once every 15 minutes + */ +static int +manage_vglite_callback(time_t now, const or_options_t *options) +{ + (void)now; + (void)options; +#define VANGUARDS_LITE_INTERVAL (15*60) + + maintain_layer2_guards(); + + return VANGUARDS_LITE_INTERVAL; +} + /** Perform regular maintenance tasks. This function gets run once per * second. */ diff --git a/src/core/or/channel.c b/src/core/or/channel.c index c4f3e76fc8..c46fa93e58 100644 --- a/src/core/or/channel.c +++ b/src/core/or/channel.c @@ -2629,24 +2629,42 @@ channel_dump_statistics, (channel_t *chan, int severity)) circuitmux_num_circuits(chan->cmux) : 0); /* Describe timestamps */ - tor_log(severity, LD_GENERAL, - " * Channel %"PRIu64 " was last used by a " - "client at %"PRIu64 " (%"PRIu64 " seconds ago)", - (chan->global_identifier), - (uint64_t)(chan->timestamp_client), - (uint64_t)(now - chan->timestamp_client)); - tor_log(severity, LD_GENERAL, - " * Channel %"PRIu64 " last received a cell " - "at %"PRIu64 " (%"PRIu64 " seconds ago)", - (chan->global_identifier), - (uint64_t)(chan->timestamp_recv), - (uint64_t)(now - chan->timestamp_recv)); - tor_log(severity, LD_GENERAL, - " * Channel %"PRIu64 " last transmitted a cell " - "at %"PRIu64 " (%"PRIu64 " seconds ago)", - (chan->global_identifier), - (uint64_t)(chan->timestamp_xmit), - (uint64_t)(now - chan->timestamp_xmit)); + if (chan->timestamp_client == 0) { + tor_log(severity, LD_GENERAL, + " * Channel %"PRIu64 " was never used by a " + "client", (chan->global_identifier)); + } else { + tor_log(severity, LD_GENERAL, + " * Channel %"PRIu64 " was last used by a " + "client at %"PRIu64 " (%"PRIu64 " seconds ago)", + (chan->global_identifier), + (uint64_t)(chan->timestamp_client), + (uint64_t)(now - chan->timestamp_client)); + } + if (chan->timestamp_recv == 0) { + tor_log(severity, LD_GENERAL, + " * Channel %"PRIu64 " never received a cell", + (chan->global_identifier)); + } else { + tor_log(severity, LD_GENERAL, + " * Channel %"PRIu64 " last received a cell " + "at %"PRIu64 " (%"PRIu64 " seconds ago)", + (chan->global_identifier), + (uint64_t)(chan->timestamp_recv), + (uint64_t)(now - chan->timestamp_recv)); + } + if (chan->timestamp_xmit == 0) { + tor_log(severity, LD_GENERAL, + " * Channel %"PRIu64 " never transmitted a cell", + (chan->global_identifier)); + } else { + tor_log(severity, LD_GENERAL, + " * Channel %"PRIu64 " last transmitted a cell " + "at %"PRIu64 " (%"PRIu64 " seconds ago)", + (chan->global_identifier), + (uint64_t)(chan->timestamp_xmit), + (uint64_t)(now - chan->timestamp_xmit)); + } /* Describe counters and rates */ tor_log(severity, LD_GENERAL, diff --git a/src/core/or/channeltls.c b/src/core/or/channeltls.c index 481dafef91..9db8e2392d 100644 --- a/src/core/or/channeltls.c +++ b/src/core/or/channeltls.c @@ -64,6 +64,7 @@ #include "trunnel/netinfo.h" #include "core/or/channelpadding.h" #include "core/or/extendinfo.h" +#include "core/or/congestion_control_common.h" #include "core/or/cell_st.h" #include "core/or/cell_queue_st.h" @@ -793,7 +794,7 @@ channel_tls_num_cells_writeable_method(channel_t *chan) cell_network_size = get_cell_network_size(tlschan->conn->wide_circ_ids); outbuf_len = connection_get_outbuf_len(TO_CONN(tlschan->conn)); /* Get the number of cells */ - n = CEIL_DIV(OR_CONN_HIGHWATER - outbuf_len, cell_network_size); + n = CEIL_DIV(or_conn_highwatermark() - outbuf_len, cell_network_size); if (n < 0) n = 0; #if SIZEOF_SIZE_T > SIZEOF_INT if (n > INT_MAX) n = INT_MAX; diff --git a/src/core/or/circuit_st.h b/src/core/or/circuit_st.h index 870bcbf7cf..be6429438a 100644 --- a/src/core/or/circuit_st.h +++ b/src/core/or/circuit_st.h @@ -22,6 +22,7 @@ struct hs_token_t; struct circpad_machine_spec_t; struct circpad_machine_runtime_t; +struct congestion_control_t; /** Number of padding state machines on a circuit. */ #define CIRCPAD_MAX_MACHINES (2) @@ -244,6 +245,9 @@ struct circuit_t { * that STOP commands actually correspond to the current machine, * and not a previous one. */ uint32_t padding_machine_ctr; + + /** Congestion control fields */ + struct congestion_control_t *ccontrol; }; #endif /* !defined(CIRCUIT_ST_H) */ diff --git a/src/core/or/circuitbuild.c b/src/core/or/circuitbuild.c index 2bcc642a97..31e3868b65 100644 --- a/src/core/or/circuitbuild.c +++ b/src/core/or/circuitbuild.c @@ -1359,7 +1359,9 @@ route_len_for_purpose(uint8_t purpose, extend_info_t *exit_ei) int routelen = DEFAULT_ROUTE_LEN; int known_purpose = 0; - if (circuit_should_use_vanguards(purpose)) { + /* If we're using L3 vanguards, we need longer paths for onion services */ + if (circuit_purpose_is_hidden_service(purpose) && + get_options()->HSLayer3Nodes) { /* Clients want an extra hop for rends to avoid linkability. * Services want it for intro points to avoid publishing their * layer3 guards. They want it for hsdir posts to use @@ -1374,14 +1376,6 @@ route_len_for_purpose(uint8_t purpose, extend_info_t *exit_ei) purpose == CIRCUIT_PURPOSE_S_ESTABLISH_INTRO) return routelen+1; - /* If we only have Layer2 vanguards, then we do not need - * the extra hop for linkabilty reasons (see below). - * This means all hops can be of the form: - * S/C - G - L2 - M - R/HSDir/I - */ - if (get_options()->HSLayer2Nodes && !get_options()->HSLayer3Nodes) - return routelen+1; - /* For connections to hsdirs, clients want two extra hops * when using layer3 guards, to avoid linkability. * Same goes for intro points. Note that the route len @@ -1400,16 +1394,14 @@ route_len_for_purpose(uint8_t purpose, extend_info_t *exit_ei) return routelen; switch (purpose) { - /* These two purposes connect to a router that we chose, so - * DEFAULT_ROUTE_LEN is safe. */ - case CIRCUIT_PURPOSE_S_ESTABLISH_INTRO: - /* hidden service connecting to introduction point */ + /* These purposes connect to a router that we chose, so DEFAULT_ROUTE_LEN + * is safe: */ case CIRCUIT_PURPOSE_TESTING: /* router reachability testing */ known_purpose = 1; break; - /* These three purposes connect to a router that someone else + /* These purposes connect to a router that someone else * might have chosen, so add an extra hop to protect anonymity. */ case CIRCUIT_PURPOSE_C_GENERAL: case CIRCUIT_PURPOSE_C_HSDIR_GET: @@ -1419,6 +1411,9 @@ route_len_for_purpose(uint8_t purpose, extend_info_t *exit_ei) /* client connecting to introduction point */ case CIRCUIT_PURPOSE_S_CONNECT_REND: /* hidden service connecting to rendezvous point */ + case CIRCUIT_PURPOSE_S_ESTABLISH_INTRO: + /* hidden service connecting to intro point. In this case we want an extra + hop to avoid linkability attacks by the introduction point. */ known_purpose = 1; routelen++; break; @@ -2019,7 +2014,7 @@ cpath_build_state_to_crn_ipv6_extend_flag(const cpath_build_state_t *state, } /** Decide a suitable length for circ's cpath, and pick an exit - * router (or use <b>exit</b> if provided). Store these in the + * router (or use <b>exit_ei</b> if provided). Store these in the * cpath. * * If <b>is_hs_v3_rp_circuit</b> is set, then this exit should be suitable to @@ -2072,7 +2067,7 @@ onion_pick_cpath_exit(origin_circuit_t *circ, extend_info_t *exit_ei, return 0; } -/** Give <b>circ</b> a new exit destination to <b>exit</b>, and add a +/** Give <b>circ</b> a new exit destination to <b>exit_ei</b>, and add a * hop to the cpath reflecting this. Don't send the next extend cell -- * the caller will do this if it wants to. */ @@ -2114,8 +2109,6 @@ circuit_extend_to_new_exit(origin_circuit_t *circ, extend_info_t *exit_ei) return -1; } - // XXX: Should cannibalized circuits be dirty or not? Not easy to say.. - return 0; } @@ -2261,8 +2254,14 @@ middle_node_must_be_vanguard(const or_options_t *options, return 0; } - /* If we have sticky L2 nodes, and this is an L2 pick, use vanguards */ - if (options->HSLayer2Nodes && cur_len == 1) { + /* Don't even bother if the feature is disabled */ + if (!vanguards_lite_is_enabled()) { + return 0; + } + + /* If we are a hidden service circuit, always use either vanguards-lite + * or HSLayer2Nodes for 2nd hop. */ + if (cur_len == 1) { return 1; } @@ -2286,7 +2285,8 @@ pick_vanguard_middle_node(const or_options_t *options, /* Pick the right routerset based on the current hop */ if (cur_len == 1) { - vanguard_routerset = options->HSLayer2Nodes; + vanguard_routerset = options->HSLayer2Nodes ? + options->HSLayer2Nodes : get_layer2_guards(); } else if (cur_len == 2) { vanguard_routerset = options->HSLayer3Nodes; } else { @@ -2295,6 +2295,10 @@ pick_vanguard_middle_node(const or_options_t *options, return NULL; } + if (BUG(!vanguard_routerset)) { + return NULL; + } + node = pick_restricted_middle_node(flags, vanguard_routerset, options->ExcludeNodes, excluded, cur_len+1); diff --git a/src/core/or/circuitlist.c b/src/core/or/circuitlist.c index 4f62284e29..4dbf4d4549 100644 --- a/src/core/or/circuitlist.c +++ b/src/core/or/circuitlist.c @@ -64,6 +64,7 @@ #include "core/or/circuitpadding.h" #include "core/or/crypt_path.h" #include "core/or/extendinfo.h" +#include "core/or/status.h" #include "core/or/trace_probes_circuit.h" #include "core/mainloop/connection.h" #include "app/config/config.h" @@ -100,6 +101,7 @@ #include "lib/compress/compress_zlib.h" #include "lib/compress/compress_zstd.h" #include "lib/buf/buffers.h" +#include "core/or/congestion_control_common.h" #include "core/or/ocirc_event.h" @@ -1143,6 +1145,8 @@ circuit_free_(circuit_t *circ) * hs identifier is freed. */ hs_circ_cleanup_on_free(circ); + congestion_control_free(circ->ccontrol); + if (CIRCUIT_IS_ORIGIN(circ)) { origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ); mem = ocirc; @@ -2343,6 +2347,12 @@ circuit_about_to_free(circuit_t *circ) circuitmux_detach_circuit(or_circ->p_chan->cmux, circ); circuit_set_p_circid_chan(or_circ, 0, NULL); } + + if (or_circ->n_cells_discarded_at_end) { + time_t age = approx_time() - circ->timestamp_created.tv_sec; + note_circ_closed_for_unrecognized_cells( + age, or_circ->n_cells_discarded_at_end); + } } else { origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ); edge_connection_t *conn; @@ -2586,8 +2596,10 @@ conns_compare_by_buffer_age_(const void **a_, const void **b_) /** We're out of memory for cells, having allocated <b>current_allocation</b> * bytes' worth. Kill the 'worst' circuits until we're under - * FRACTION_OF_DATA_TO_RETAIN_ON_OOM of our maximum usage. */ -void + * FRACTION_OF_DATA_TO_RETAIN_ON_OOM of our maximum usage. + * + * Return the number of bytes removed. */ +size_t circuits_handle_oom(size_t current_allocation) { smartlist_t *circlist; @@ -2597,6 +2609,7 @@ circuits_handle_oom(size_t current_allocation) size_t mem_recovered=0; int n_circuits_killed=0; int n_dirconns_killed=0; + int n_edgeconns_killed = 0; uint32_t now_ts; log_notice(LD_GENERAL, "We're low on memory (cell queues total alloc:" " %"TOR_PRIuSZ" buffer total alloc: %" TOR_PRIuSZ "," @@ -2613,12 +2626,11 @@ circuits_handle_oom(size_t current_allocation) tor_zstd_get_total_allocation(), tor_lzma_get_total_allocation(), hs_cache_get_total_allocation()); - { size_t mem_target = (size_t)(get_options()->MaxMemInQueues * FRACTION_OF_DATA_TO_RETAIN_ON_OOM); if (current_allocation <= mem_target) - return; + return 0; mem_to_recover = current_allocation - mem_target; } @@ -2664,12 +2676,19 @@ circuits_handle_oom(size_t current_allocation) if (conn_age < circ->age_tmp) { break; } - if (conn->type == CONN_TYPE_DIR && conn->linked_conn == NULL) { + /* Also consider edge connections so we don't accumulate bytes on the + * outbuf due to a malicious destination holding off the read on us. */ + if ((conn->type == CONN_TYPE_DIR && conn->linked_conn == NULL) || + CONN_IS_EDGE(conn)) { if (!conn->marked_for_close) connection_mark_for_close(conn); mem_recovered += single_conn_free_bytes(conn); - ++n_dirconns_killed; + if (conn->type == CONN_TYPE_DIR) { + ++n_dirconns_killed; + } else { + ++n_edgeconns_killed; + } if (mem_recovered >= mem_to_recover) goto done_recovering_mem; @@ -2697,14 +2716,16 @@ circuits_handle_oom(size_t current_allocation) } SMARTLIST_FOREACH_END(circ); done_recovering_mem: - log_notice(LD_GENERAL, "Removed %"TOR_PRIuSZ" bytes by killing %d circuits; " "%d circuits remain alive. Also killed %d non-linked directory " - "connections.", + "connections. Killed %d edge connections", mem_recovered, n_circuits_killed, smartlist_len(circlist) - n_circuits_killed, - n_dirconns_killed); + n_dirconns_killed, + n_edgeconns_killed); + + return mem_recovered; } /** Verify that circuit <b>c</b> has all of its invariants diff --git a/src/core/or/circuitlist.h b/src/core/or/circuitlist.h index f5791d7c12..147e2cb2f8 100644 --- a/src/core/or/circuitlist.h +++ b/src/core/or/circuitlist.h @@ -232,7 +232,7 @@ int circuit_count_pending_on_channel(channel_t *chan); MOCK_DECL(void, assert_circuit_ok,(const circuit_t *c)); void circuit_free_all(void); -void circuits_handle_oom(size_t current_allocation); +size_t circuits_handle_oom(size_t current_allocation); void circuit_clear_testing_cell_stats(circuit_t *circ); diff --git a/src/core/or/circuitmux_ewma.c b/src/core/or/circuitmux_ewma.c index 0382e62f75..adf256ab05 100644 --- a/src/core/or/circuitmux_ewma.c +++ b/src/core/or/circuitmux_ewma.c @@ -45,7 +45,10 @@ /*** EWMA parameter #defines ***/ /** How long does a tick last (seconds)? */ -#define EWMA_TICK_LEN 10 +#define EWMA_TICK_LEN_DEFAULT 10 +#define EWMA_TICK_LEN_MIN 1 +#define EWMA_TICK_LEN_MAX 600 +static int ewma_tick_len = EWMA_TICK_LEN_DEFAULT; /** The default per-tick scale factor, if it hasn't been overridden by a * consensus or a configuration setting. zero means "disabled". */ @@ -148,7 +151,7 @@ cell_ewma_get_tick(void) monotime_coarse_get(&now); int32_t msec_diff = monotime_coarse_diff_msec32(&start_of_current_tick, &now); - return current_tick_num + msec_diff / (1000*EWMA_TICK_LEN); + return current_tick_num + msec_diff / (1000*ewma_tick_len); } /** @@ -527,15 +530,15 @@ cell_ewma_get_current_tick_and_fraction(double *remainder_out) monotime_coarse_get(&now); int32_t msec_diff = monotime_coarse_diff_msec32(&start_of_current_tick, &now); - if (msec_diff > (1000*EWMA_TICK_LEN)) { - unsigned ticks_difference = msec_diff / (1000*EWMA_TICK_LEN); + if (msec_diff > (1000*ewma_tick_len)) { + unsigned ticks_difference = msec_diff / (1000*ewma_tick_len); monotime_coarse_add_msec(&start_of_current_tick, &start_of_current_tick, - ticks_difference * 1000 * EWMA_TICK_LEN); + ticks_difference * 1000 * ewma_tick_len); current_tick_num += ticks_difference; - msec_diff %= 1000*EWMA_TICK_LEN; + msec_diff %= 1000*ewma_tick_len; } - *remainder_out = ((double)msec_diff) / (1.0e3 * EWMA_TICK_LEN); + *remainder_out = ((double)msec_diff) / (1.0e3 * ewma_tick_len); return current_tick_num; } @@ -605,15 +608,20 @@ cmux_ewma_set_options(const or_options_t *options, /* Both options and consensus can be NULL. This assures us to either get a * valid configured value or the default one. */ halflife = get_circuit_priority_halflife(options, consensus, &source); + ewma_tick_len = networkstatus_get_param(consensus, + "CircuitPriorityTickSecs", + EWMA_TICK_LEN_DEFAULT, + EWMA_TICK_LEN_MIN, + EWMA_TICK_LEN_MAX); /* convert halflife into halflife-per-tick. */ - halflife /= EWMA_TICK_LEN; + halflife /= ewma_tick_len; /* compute per-tick scale factor. */ ewma_scale_factor = exp(LOG_ONEHALF / halflife); log_info(LD_OR, "Enabled cell_ewma algorithm because of value in %s; " "scale factor is %f per %d seconds", - source, ewma_scale_factor, EWMA_TICK_LEN); + source, ewma_scale_factor, ewma_tick_len); } /** Return the multiplier necessary to convert the value of a cell sent in diff --git a/src/core/or/circuitpadding.c b/src/core/or/circuitpadding.c index 6dfe94de01..99dc5f9d83 100644 --- a/src/core/or/circuitpadding.c +++ b/src/core/or/circuitpadding.c @@ -2967,6 +2967,8 @@ signed_error_t circpad_handle_padding_negotiate(circuit_t *circ, cell_t *cell) { int retval = 0; + /* Should we send back a STOP cell? */ + bool respond_with_stop = true; circpad_negotiate_t *negotiate; if (CIRCUIT_IS_ORIGIN(circ)) { @@ -2992,6 +2994,12 @@ circpad_handle_padding_negotiate(circuit_t *circ, cell_t *cell) negotiate->machine_type, negotiate->machine_ctr); goto done; } + + /* If we reached this point we received a STOP command from an old or + unknown machine. Don't reply with our own STOP since there is no one to + handle it on the other end */ + respond_with_stop = false; + if (negotiate->machine_ctr <= circ->padding_machine_ctr) { log_info(LD_CIRC, "Received STOP command for old machine %u, ctr %u", negotiate->machine_type, negotiate->machine_ctr); @@ -3023,10 +3031,13 @@ circpad_handle_padding_negotiate(circuit_t *circ, cell_t *cell) retval = -1; done: - circpad_padding_negotiated(circ, negotiate->machine_type, - negotiate->command, - (retval == 0) ? CIRCPAD_RESPONSE_OK : CIRCPAD_RESPONSE_ERR, - negotiate->machine_ctr); + if (respond_with_stop) { + circpad_padding_negotiated(circ, negotiate->machine_type, + negotiate->command, + (retval == 0) ? CIRCPAD_RESPONSE_OK : CIRCPAD_RESPONSE_ERR, + negotiate->machine_ctr); + } + circpad_negotiate_free(negotiate); return retval; diff --git a/src/core/or/circuituse.c b/src/core/or/circuituse.c index 044b30b8b3..2ec391eca0 100644 --- a/src/core/or/circuituse.c +++ b/src/core/or/circuituse.c @@ -1204,25 +1204,6 @@ needs_circuits_for_build(int num) return 0; } -/** - * Launch the appropriate type of predicted circuit for hidden - * services, depending on our options. - */ -static void -circuit_launch_predicted_hs_circ(int flags) -{ - /* K.I.S.S. implementation of bug #23101: If we are using - * vanguards or pinned middles, pre-build a specific purpose - * for HS circs. */ - if (circuit_should_use_vanguards(CIRCUIT_PURPOSE_HS_VANGUARDS)) { - circuit_launch(CIRCUIT_PURPOSE_HS_VANGUARDS, flags); - } else { - /* If no vanguards, then no HS-specific prebuilt circuits are needed. - * Normal GENERAL circs are fine */ - circuit_launch(CIRCUIT_PURPOSE_C_GENERAL, flags); - } -} - /** Determine how many circuits we have open that are clean, * Make sure it's enough for all the upcoming behaviors we predict we'll have. * But put an upper bound on the total number of circuits. @@ -1276,7 +1257,7 @@ circuit_predict_and_launch_new(void) "Have %d clean circs (%d internal), need another internal " "circ for my hidden service.", num, num_internal); - circuit_launch_predicted_hs_circ(flags); + circuit_launch(CIRCUIT_PURPOSE_HS_VANGUARDS, flags); return; } @@ -1295,7 +1276,10 @@ circuit_predict_and_launch_new(void) " another hidden service circ.", num, num_uptime_internal, num_internal); - circuit_launch_predicted_hs_circ(flags); + /* Always launch vanguards purpose circuits for HS clients, + * for vanguards-lite. This prevents us from cannibalizing + * to build these circuits (and thus not use vanguards). */ + circuit_launch(CIRCUIT_PURPOSE_HS_VANGUARDS, flags); return; } @@ -2022,16 +2006,12 @@ circuit_is_hs_v3(const circuit_t *circ) int circuit_should_use_vanguards(uint8_t purpose) { - const or_options_t *options = get_options(); - - /* Only hidden service circuits use vanguards */ - if (!circuit_purpose_is_hidden_service(purpose)) - return 0; - - /* Pinned middles are effectively vanguards */ - if (options->HSLayer2Nodes || options->HSLayer3Nodes) + /* All hidden service circuits use either vanguards or + * vanguards-lite. */ + if (circuit_purpose_is_hidden_service(purpose)) return 1; + /* Everything else is a normal circuit */ return 0; } @@ -2069,13 +2049,11 @@ circuit_should_cannibalize_to_build(uint8_t purpose_to_build, return 0; } - /* For vanguards, the server-side intro circ is not cannibalized - * because we pre-build 4 hop HS circuits, and it only needs a 3 hop - * circuit. It is also long-lived, so it is more important that - * it have lower latency than get built fast. + /* The server-side intro circ is not cannibalized because it only + * needs a 3 hop circuit. It is also long-lived, so it is more + * important that it have lower latency than get built fast. */ - if (circuit_should_use_vanguards(purpose_to_build) && - purpose_to_build == CIRCUIT_PURPOSE_S_ESTABLISH_INTRO) { + if (purpose_to_build == CIRCUIT_PURPOSE_S_ESTABLISH_INTRO) { return 0; } diff --git a/src/core/or/command.c b/src/core/or/command.c index 622217a78e..40eb1554c0 100644 --- a/src/core/or/command.c +++ b/src/core/or/command.c @@ -563,7 +563,7 @@ command_process_relay_cell(cell_t *cell, channel_t *chan) } if ((reason = circuit_receive_relay_cell(cell, circ, direction)) < 0) { - log_fn(LOG_PROTOCOL_WARN,LD_PROTOCOL,"circuit_receive_relay_cell " + log_fn(LOG_DEBUG,LD_PROTOCOL,"circuit_receive_relay_cell " "(%s) failed. Closing.", direction==CELL_DIRECTION_OUT?"forward":"backward"); /* Always emit a bandwidth event for closed circs */ diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c new file mode 100644 index 0000000000..0919f037db --- /dev/null +++ b/src/core/or/congestion_control_common.c @@ -0,0 +1,1038 @@ +/* Copyright (c) 2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_common.c + * \brief Common code used by all congestion control algorithms. + */ + +#define TOR_CONGESTION_CONTROL_COMMON_PRIVATE + +#include "core/or/or.h" + +#include "core/or/circuitlist.h" +#include "core/or/crypt_path.h" +#include "core/or/or_circuit_st.h" +#include "core/or/origin_circuit_st.h" +#include "core/or/channel.h" +#include "core/mainloop/connection.h" +#include "core/or/sendme.h" +#include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_vegas.h" +#include "core/or/congestion_control_nola.h" +#include "core/or/congestion_control_westwood.h" +#include "core/or/congestion_control_st.h" +#include "core/or/trace_probes_cc.h" +#include "lib/time/compat_time.h" +#include "feature/nodelist/networkstatus.h" + +/* Consensus parameter defaults. + * + * More details for each of the parameters can be found in proposal 324, + * section 6.5 including tuning notes. */ +#define CIRCWINDOW_INIT (500) +#define SENDME_INC_DFLT (50) + +#define CWND_INC_DFLT (50) +#define CWND_INC_PCT_SS_DFLT (100) +#define CWND_INC_RATE_DFLT (1) +#define CWND_MAX_DFLT (INT32_MAX) +#define CWND_MIN_DFLT (MAX(100, SENDME_INC_DFLT)) + +#define BWE_SENDME_MIN_DFLT (5) +#define EWMA_CWND_COUNT_DFLT (2) + +/* BDP algorithms for each congestion control algorithms use the piecewise + * estimattor. See section 3.1.4 of proposal 324. */ +#define WESTWOOD_BDP_ALG BDP_ALG_PIECEWISE +#define VEGAS_BDP_MIX_ALG BDP_ALG_PIECEWISE +#define NOLA_BDP_ALG BDP_ALG_PIECEWISE + +/* Indicate OR connection buffer limitations used to stop or start accepting + * cells in its outbuf. + * + * These watermarks are historical to tor in a sense that they've been used + * almost from the genesis point. And were likely defined to fit the bounds of + * TLS records of 16KB which would be around 32 cells. + * + * These are defaults of the consensus parameter "orconn_high" and "orconn_low" + * values. */ +#define OR_CONN_HIGHWATER_DFLT (32*1024) +#define OR_CONN_LOWWATER_DFLT (16*1024) + +/* Low and high values of circuit cell queue sizes. They are used to tell when + * to start or stop reading on the streams attached on the circuit. + * + * These are defaults of the consensus parameters "cellq_high" and "cellq_low". + */ +#define CELL_QUEUE_LOW_DFLT (10) +#define CELL_QUEUE_HIGH_DFLT (256) + +static uint64_t congestion_control_update_circuit_rtt(congestion_control_t *, + uint64_t); +static bool congestion_control_update_circuit_bdp(congestion_control_t *, + const circuit_t *, + const crypt_path_t *, + uint64_t, uint64_t); + +/* Consensus parameters cached. The non static ones are extern. */ +static uint32_t cwnd_max = CWND_MAX_DFLT; +int32_t cell_queue_high = CELL_QUEUE_HIGH_DFLT; +int32_t cell_queue_low = CELL_QUEUE_LOW_DFLT; +uint32_t or_conn_highwater = OR_CONN_HIGHWATER_DFLT; +uint32_t or_conn_lowwater = OR_CONN_LOWWATER_DFLT; + +/** + * Update global congestion control related consensus parameter values, + * every consensus update. + */ +void +congestion_control_new_consensus_params(const networkstatus_t *ns) +{ +#define CELL_QUEUE_HIGH_MIN (1) +#define CELL_QUEUE_HIGH_MAX (1000) + cell_queue_high = networkstatus_get_param(ns, "cellq_high", + CELL_QUEUE_HIGH_DFLT, + CELL_QUEUE_HIGH_MIN, + CELL_QUEUE_HIGH_MAX); + +#define CELL_QUEUE_LOW_MIN (1) +#define CELL_QUEUE_LOW_MAX (1000) + cell_queue_low = networkstatus_get_param(ns, "cellq_low", + CELL_QUEUE_LOW_DFLT, + CELL_QUEUE_LOW_MIN, + CELL_QUEUE_LOW_MAX); + +#define OR_CONN_HIGHWATER_MIN (CELL_PAYLOAD_SIZE) +#define OR_CONN_HIGHWATER_MAX (INT32_MAX) + or_conn_highwater = + networkstatus_get_param(ns, "orconn_high", + OR_CONN_HIGHWATER_DFLT, + OR_CONN_HIGHWATER_MIN, + OR_CONN_HIGHWATER_MAX); + +#define OR_CONN_LOWWATER_MIN (CELL_PAYLOAD_SIZE) +#define OR_CONN_LOWWATER_MAX (INT32_MAX) + or_conn_lowwater = + networkstatus_get_param(ns, "orconn_low", + OR_CONN_LOWWATER_DFLT, + OR_CONN_LOWWATER_MIN, + OR_CONN_LOWWATER_MAX); + +#define CWND_MAX_MIN 500 +#define CWND_MAX_MAX (INT32_MAX) + cwnd_max = + networkstatus_get_param(NULL, "cc_cwnd_max", + CWND_MAX_DFLT, + CWND_MAX_MIN, + CWND_MAX_MAX); +} + +/** + * Set congestion control parameters on a circuit's congestion + * control object based on values from the consensus. + * + * cc_alg is the negotiated congestion control algorithm. + * + * sendme_inc is the number of packaged cells that a sendme cell + * acks. This parameter will come from circuit negotiation. + */ +static void +congestion_control_init_params(congestion_control_t *cc, + cc_alg_t cc_alg, + int sendme_inc) +{ +#define CWND_INIT_MIN 100 +#define CWND_INIT_MAX (10000) + cc->cwnd = + networkstatus_get_param(NULL, "cc_cwnd_init", + CIRCWINDOW_INIT, + CWND_INIT_MIN, + CWND_INIT_MAX); + +#define CWND_INC_PCT_SS_MIN 1 +#define CWND_INC_PCT_SS_MAX (500) + cc->cwnd_inc_pct_ss = + networkstatus_get_param(NULL, "cc_cwnd_inc_pct_ss", + CWND_INC_PCT_SS_DFLT, + CWND_INC_PCT_SS_MIN, + CWND_INC_PCT_SS_MAX); + +#define CWND_INC_MIN 1 +#define CWND_INC_MAX (1000) + cc->cwnd_inc = + networkstatus_get_param(NULL, "cc_cwnd_inc", + CWND_INC_DFLT, + CWND_INC_MIN, + CWND_INC_MAX); + +#define CWND_INC_RATE_MIN 1 +#define CWND_INC_RATE_MAX (250) + cc->cwnd_inc_rate = + networkstatus_get_param(NULL, "cc_cwnd_inc_rate", + CWND_INC_RATE_DFLT, + CWND_INC_RATE_MIN, + CWND_INC_RATE_MAX); + +#define SENDME_INC_MIN 10 +#define SENDME_INC_MAX (1000) + cc->sendme_inc = + networkstatus_get_param(NULL, "cc_sendme_inc", + sendme_inc, + SENDME_INC_MIN, + SENDME_INC_MAX); + + // XXX: this min needs to abide by sendme_inc range rules somehow +#define CWND_MIN_MIN sendme_inc +#define CWND_MIN_MAX (1000) + cc->cwnd_min = + networkstatus_get_param(NULL, "cc_cwnd_min", + CWND_MIN_DFLT, + CWND_MIN_MIN, + CWND_MIN_MAX); + +#define EWMA_CWND_COUNT_MIN 1 +#define EWMA_CWND_COUNT_MAX (100) + cc->ewma_cwnd_cnt = + networkstatus_get_param(NULL, "cc_ewma_cwnd_cnt", + EWMA_CWND_COUNT_DFLT, + EWMA_CWND_COUNT_MIN, + EWMA_CWND_COUNT_MAX); + +#define BWE_SENDME_MIN_MIN 2 +#define BWE_SENDME_MIN_MAX (20) + cc->bwe_sendme_min = + networkstatus_get_param(NULL, "cc_bwe_min", + BWE_SENDME_MIN_DFLT, + BWE_SENDME_MIN_MIN, + BWE_SENDME_MIN_MAX); + +#define CC_ALG_MIN 0 +#define CC_ALG_MAX (NUM_CC_ALGS-1) + cc->cc_alg = + networkstatus_get_param(NULL, "cc_alg", + cc_alg, + CC_ALG_MIN, + CC_ALG_MAX); + + bdp_alg_t default_bdp_alg = 0; + + switch (cc->cc_alg) { + case CC_ALG_WESTWOOD: + default_bdp_alg = WESTWOOD_BDP_ALG; + break; + case CC_ALG_VEGAS: + default_bdp_alg = VEGAS_BDP_MIX_ALG; + break; + case CC_ALG_NOLA: + default_bdp_alg = NOLA_BDP_ALG; + break; + case CC_ALG_SENDME: + default: + tor_fragile_assert(); + return; // No alg-specific params + } + + cc->bdp_alg = + networkstatus_get_param(NULL, "cc_bdp_alg", + default_bdp_alg, + 0, + NUM_BDP_ALGS-1); + + /* Algorithm-specific parameters */ + if (cc->cc_alg == CC_ALG_WESTWOOD) { + congestion_control_westwood_set_params(cc); + } else if (cc->cc_alg == CC_ALG_VEGAS) { + congestion_control_vegas_set_params(cc); + } else if (cc->cc_alg == CC_ALG_NOLA) { + congestion_control_nola_set_params(cc); + } +} + +/** + * Allocate and initialize fields in congestion control object. + * + * cc_alg is the negotiated congestion control algorithm. + * + * sendme_inc is the number of packaged cells that a sendme cell + * acks. This parameter will come from circuit negotiation. + */ +static void +congestion_control_init(congestion_control_t *cc, cc_alg_t cc_alg, + int sendme_inc) +{ + cc->sendme_pending_timestamps = smartlist_new(); + cc->sendme_arrival_timestamps = smartlist_new(); + + cc->in_slow_start = 1; + congestion_control_init_params(cc, cc_alg, sendme_inc); + + cc->next_cc_event = CWND_UPDATE_RATE(cc); +} + +/** Allocate and initialize a new congestion control object */ +congestion_control_t * +congestion_control_new(void) +{ + congestion_control_t *cc = tor_malloc_zero(sizeof(congestion_control_t)); + + // XXX: the alg and the sendme_inc need to be negotiated during + // circuit handshake + congestion_control_init(cc, CC_ALG_VEGAS, SENDME_INC_DFLT); + + return cc; +} + +/** + * Free a congestion control object and its asssociated state. + */ +void +congestion_control_free_(congestion_control_t *cc) +{ + if (!cc) + return; + + SMARTLIST_FOREACH(cc->sendme_pending_timestamps, uint64_t *, t, tor_free(t)); + SMARTLIST_FOREACH(cc->sendme_arrival_timestamps, uint64_t *, t, tor_free(t)); + smartlist_free(cc->sendme_pending_timestamps); + smartlist_free(cc->sendme_arrival_timestamps); + + tor_free(cc); +} + +/** + * Enqueue a u64 timestamp to the end of a queue of timestamps. + */ +static inline void +enqueue_timestamp(smartlist_t *timestamps_u64, uint64_t timestamp_usec) +{ + uint64_t *timestamp_ptr = tor_malloc(sizeof(uint64_t)); + *timestamp_ptr = timestamp_usec; + + smartlist_add(timestamps_u64, timestamp_ptr); +} + +/** + * Peek at the head of a smartlist queue of u64 timestamps. + */ +static inline uint64_t +peek_timestamp(const smartlist_t *timestamps_u64_usecs) +{ + uint64_t *timestamp_ptr = smartlist_get(timestamps_u64_usecs, 0); + + if (BUG(!timestamp_ptr)) { + log_err(LD_CIRC, "Congestion control timestamp list became empty!"); + return 0; + } + + return *timestamp_ptr; +} + +/** + * Dequeue a u64 monotime usec timestamp from the front of a + * smartlist of pointers to 64. + */ +static inline uint64_t +dequeue_timestamp(smartlist_t *timestamps_u64_usecs) +{ + uint64_t *timestamp_ptr = smartlist_get(timestamps_u64_usecs, 0); + uint64_t timestamp_u64; + + if (BUG(!timestamp_ptr)) { + log_err(LD_CIRC, "Congestion control timestamp list became empty!"); + return 0; + } + + timestamp_u64 = *timestamp_ptr; + smartlist_del_keeporder(timestamps_u64_usecs, 0); + tor_free(timestamp_ptr); + + return timestamp_u64; +} + +/** + * Returns the number of sendme acks that will be recieved in the + * current congestion window size, rounded to nearest int. + */ +static inline uint64_t +sendme_acks_per_cwnd(const congestion_control_t *cc) +{ + /* We add half a sendme_inc to cwnd to round to the nearest int */ + return ((cc->cwnd + cc->sendme_inc/2)/cc->sendme_inc); +} + +/** + * Get a package window from either old sendme logic, or congestion control. + * + * A package window is how many cells you can still send. + */ +int +congestion_control_get_package_window(const circuit_t *circ, + const crypt_path_t *cpath) +{ + int package_window; + congestion_control_t *cc; + + tor_assert(circ); + + if (cpath) { + package_window = cpath->package_window; + cc = cpath->ccontrol; + } else { + package_window = circ->package_window; + cc = circ->ccontrol; + } + + if (!cc) { + return package_window; + } else { + /* Inflight can be above cwnd if cwnd was just reduced */ + if (cc->inflight > cc->cwnd) + return 0; + /* In the extremely unlikely event that cwnd-inflight is larger than + * INT32_MAX, just return that cap, so old code doesn't explode. */ + else if (cc->cwnd - cc->inflight > INT32_MAX) + return INT32_MAX; + else + return (int)(cc->cwnd - cc->inflight); + } +} + +/** + * Returns the number of cells that are acked by every sendme. + */ +int +sendme_get_inc_count(const circuit_t *circ, const crypt_path_t *layer_hint) +{ + int sendme_inc = CIRCWINDOW_INCREMENT; + congestion_control_t *cc = NULL; + + if (layer_hint) { + cc = layer_hint->ccontrol; + } else { + cc = circ->ccontrol; + } + + if (cc) { + sendme_inc = cc->sendme_inc; + } + + return sendme_inc; +} + +/** Return true iff the next cell we send will result in the other endpoint + * sending a SENDME. + * + * We are able to know that because the package or inflight window value minus + * one cell (the possible SENDME cell) should be a multiple of the + * cells-per-sendme increment value (set via consensus parameter, negotiated + * for the circuit, and passed in as sendme_inc). + * + * This function is used when recording a cell digest and this is done quite + * low in the stack when decrypting or encrypting a cell. The window is only + * updated once the cell is actually put in the outbuf. + */ +bool +circuit_sent_cell_for_sendme(const circuit_t *circ, + const crypt_path_t *layer_hint) +{ + congestion_control_t *cc; + int window; + + tor_assert(circ); + + if (layer_hint) { + window = layer_hint->package_window; + cc = layer_hint->ccontrol; + } else { + window = circ->package_window; + cc = circ->ccontrol; + } + + /* If we are using congestion control and the alg is not + * old-school 'fixed', then use cc->inflight to determine + * when sendmes will be sent */ + if (cc) { + if (!cc->inflight) + return false; + + /* This check must be +1 because this function is called *before* + * inflight is incremented for the sent cell */ + if ((cc->inflight+1) % cc->sendme_inc != 0) + return false; + + return true; + } + + /* At the start of the window, no SENDME will be expected. */ + if (window == CIRCWINDOW_START) { + return false; + } + + /* Are we at the limit of the increment and if not, we don't expect next + * cell is a SENDME. + * + * We test against the window minus 1 because when we are looking if the + * next cell is a SENDME, the window (either package or deliver) hasn't been + * decremented just yet so when this is called, we are currently processing + * the "window - 1" cell. + */ + if (((window - 1) % CIRCWINDOW_INCREMENT) != 0) { + return false; + } + + /* Next cell is expected to be a SENDME. */ + return true; +} + +/** + * Call-in to tell congestion control code that this circuit sent a cell. + * + * This updates the 'inflight' counter, and if this is a cell that will + * cause the other end to send a SENDME, record the current time in a list + * of pending timestamps, so that we can later compute the circuit RTT when + * the SENDME comes back. */ +void +congestion_control_note_cell_sent(congestion_control_t *cc, + const circuit_t *circ, + const crypt_path_t *cpath) +{ + tor_assert(circ); + tor_assert(cc); + + /* Is this the last cell before a SENDME? The idea is that if the + * package_window reaches a multiple of the increment, after this cell, we + * should expect a SENDME. Note that this function must be called *before* + * we account for the sent cell. */ + if (!circuit_sent_cell_for_sendme(circ, cpath)) { + cc->inflight++; + return; + } + + cc->inflight++; + + /* Record this cell time for RTT computation when SENDME arrives */ + enqueue_timestamp(cc->sendme_pending_timestamps, + monotime_absolute_usec()); +} + +/** + * Returns true if any edge connections are active. + * + * We need to know this so that we can stop computing BDP if the + * edges are not sending on the circuit. + */ +static int +circuit_has_active_streams(const circuit_t *circ, + const crypt_path_t *layer_hint) +{ + const edge_connection_t *streams; + + if (CIRCUIT_IS_ORIGIN(circ)) { + streams = CONST_TO_ORIGIN_CIRCUIT(circ)->p_streams; + } else { + streams = CONST_TO_OR_CIRCUIT(circ)->n_streams; + } + + /* Check linked list of streams */ + for (const edge_connection_t *conn = streams; conn != NULL; + conn = conn->next_stream) { + if (conn->base_.marked_for_close) + continue; + + if (!layer_hint || conn->cpath_layer == layer_hint) { + if (connection_get_inbuf_len(TO_CONN(conn)) > 0) { + log_info(LD_CIRC, "CC: More in edge inbuf..."); + return 1; + } + + /* If we did not reach EOF on this read, there's more */ + if (!TO_CONN(conn)->inbuf_reached_eof) { + log_info(LD_CIRC, "CC: More on edge conn..."); + return 1; + } + + if (TO_CONN(conn)->linked_conn) { + if (connection_get_inbuf_len(TO_CONN(conn)->linked_conn) > 0) { + log_info(LD_CIRC, "CC: More in linked inbuf..."); + return 1; + } + + /* If there is a linked conn, and *it* did not each EOF, + * there's more */ + if (!TO_CONN(conn)->linked_conn->inbuf_reached_eof) { + log_info(LD_CIRC, "CC: More on linked conn..."); + return 1; + } + } + } + } + + return 0; +} + +/** + * Upon receipt of a SENDME, pop the oldest timestamp off the timestamp + * list, and use this to update RTT. + * + * Returns true if circuit estimates were successfully updated, false + * otherwise. + */ +bool +congestion_control_update_circuit_estimates(congestion_control_t *cc, + const circuit_t *circ, + const crypt_path_t *layer_hint) +{ + uint64_t now_usec = monotime_absolute_usec(); + + /* Update RTT first, then BDP. BDP needs fresh RTT */ + uint64_t curr_rtt_usec = congestion_control_update_circuit_rtt(cc, now_usec); + return congestion_control_update_circuit_bdp(cc, circ, layer_hint, now_usec, + curr_rtt_usec); +} + +/** + * Returns true if we have enough time data to use heuristics + * to compare RTT to a baseline. + */ +static bool +time_delta_should_use_heuristics(const congestion_control_t *cc) +{ + + /* If we have exited slow start, we should have processed at least + * a cwnd worth of RTTs */ + if (!cc->in_slow_start) { + return true; + } + + /* If we managed to get enough acks to estimate a SENDME BDP, then + * we have enough to estimate clock jumps relative to a baseline, + * too. (This is at least 'cc_bwe_min' acks). */ + if (cc->bdp[BDP_ALG_SENDME_RATE]) { + return true; + } + + /* Not enough data to estimate clock jumps */ + return false; +} + +static bool is_monotime_clock_broken = false; + +/** + * Returns true if the monotime delta is 0, or is significantly + * different than the previous delta. Either case indicates + * that the monotime time source stalled or jumped. + * + * Also caches the clock state in the is_monotime_clock_broken flag, + * so we can also provide a is_monotime_clock_reliable() function, + * used by flow control rate timing. + */ +static bool +time_delta_stalled_or_jumped(const congestion_control_t *cc, + uint64_t old_delta, uint64_t new_delta) +{ +#define DELTA_DISCREPENCY_RATIO_MAX 100 + /* If we have a 0 new_delta, that is definitely a monotime stall */ + if (new_delta == 0) { + static ratelim_t stall_info_limit = RATELIM_INIT(60); + log_fn_ratelim(&stall_info_limit, LOG_INFO, LD_CIRC, + "Congestion control cannot measure RTT due to monotime stall."); + + /* If delta is every 0, the monotime clock has stalled, and we should + * not use it anywhere. */ + is_monotime_clock_broken = true; + + return is_monotime_clock_broken; + } + + /* If the old_delta is 0, we have no previous values on this circuit. + * + * So, return the global monotime status from other circuits, and + * do not update. + */ + if (old_delta == 0) { + return is_monotime_clock_broken; + } + + /* + * For the heuristic cases, we need at least a few timestamps, + * to average out any previous partial stalls or jumps. So until + * than point, let's just use the cached status from other circuits. + */ + if (!time_delta_should_use_heuristics(cc)) { + return is_monotime_clock_broken; + } + + /* If old_delta is significantly larger than new_delta, then + * this means that the monotime clock recently stopped moving + * forward. */ + if (old_delta > new_delta * DELTA_DISCREPENCY_RATIO_MAX) { + static ratelim_t dec_notice_limit = RATELIM_INIT(300); + log_fn_ratelim(&dec_notice_limit, LOG_NOTICE, LD_CIRC, + "Sudden decrease in circuit RTT (%"PRIu64" vs %"PRIu64 + "), likely due to clock jump.", + new_delta/1000, old_delta/1000); + + is_monotime_clock_broken = true; + + return is_monotime_clock_broken; + } + + /* If new_delta is significantly larger than old_delta, then + * this means that the monotime clock suddenly jumped forward. */ + if (new_delta > old_delta * DELTA_DISCREPENCY_RATIO_MAX) { + static ratelim_t dec_notice_limit = RATELIM_INIT(300); + log_fn_ratelim(&dec_notice_limit, LOG_NOTICE, LD_CIRC, + "Sudden increase in circuit RTT (%"PRIu64" vs %"PRIu64 + "), likely due to clock jump.", + new_delta/1000, old_delta/1000); + + is_monotime_clock_broken = true; + + return is_monotime_clock_broken; + } + + /* All good! Update cached status, too */ + is_monotime_clock_broken = false; + + return is_monotime_clock_broken; +} + +/** + * Is the monotime clock stalled according to any circuits? + */ +bool +is_monotime_clock_reliable(void) +{ + return !is_monotime_clock_broken; +} + +/** + * Called when we get a SENDME. Updates circuit RTT by pulling off a + * timestamp of when we sent the CIRCWINDOW_INCREMENT-th cell from + * the queue of such timestamps, and comparing that to current time. + * + * Also updates min, max, and EWMA of RTT. + * + * Returns the current circuit RTT in usecs, or 0 if it could not be + * measured (due to clock jump, stall, etc). + */ +static uint64_t +congestion_control_update_circuit_rtt(congestion_control_t *cc, + uint64_t now_usec) +{ + uint64_t rtt, ewma_cnt; + uint64_t sent_at_timestamp; + + tor_assert(cc); + + /* Get the time that we sent the cell that resulted in the other + * end sending this sendme. Use this to calculate RTT */ + sent_at_timestamp = dequeue_timestamp(cc->sendme_pending_timestamps); + + rtt = now_usec - sent_at_timestamp; + + /* Do not update RTT at all if it looks fishy */ + if (time_delta_stalled_or_jumped(cc, cc->ewma_rtt_usec, rtt)) { + return 0; + } + + ewma_cnt = cc->ewma_cwnd_cnt*sendme_acks_per_cwnd(cc); + ewma_cnt = MAX(ewma_cnt, 2); // Use at least 2 + + cc->ewma_rtt_usec = n_count_ewma(rtt, cc->ewma_rtt_usec, ewma_cnt); + + if (rtt > cc->max_rtt_usec) { + cc->max_rtt_usec = rtt; + } + + if (cc->min_rtt_usec == 0 || rtt < cc->min_rtt_usec) { + cc->min_rtt_usec = rtt; + } + + return rtt; +} + +/** + * Called when we get a SENDME. Updates the bandwidth-delay-product (BDP) + * estimates of a circuit. Several methods of computing BDP are used, + * depending on scenario. While some congestion control algorithms only + * use one of these methods, we update them all because it's quick and easy. + * + * - now_usec is the current monotime in usecs. + * - curr_rtt_usec is the current circuit RTT in usecs. It may be 0 if no + * RTT could bemeasured. + * + * Returns true if we were able to update BDP, false otherwise. + */ +static bool +congestion_control_update_circuit_bdp(congestion_control_t *cc, + const circuit_t *circ, + const crypt_path_t *layer_hint, + uint64_t now_usec, + uint64_t curr_rtt_usec) +{ + int chan_q = 0; + unsigned int blocked_on_chan = 0; + uint64_t timestamp_usec; + uint64_t sendme_rate_bdp = 0; + + tor_assert(cc); + + if (CIRCUIT_IS_ORIGIN(circ)) { + /* origin circs use n_chan */ + chan_q = circ->n_chan_cells.n; + blocked_on_chan = circ->streams_blocked_on_n_chan; + } else { + /* Both onion services and exits use or_circuit and p_chan */ + chan_q = CONST_TO_OR_CIRCUIT(circ)->p_chan_cells.n; + blocked_on_chan = circ->streams_blocked_on_p_chan; + } + + /* If we have no EWMA RTT, it is because monotime has been stalled + * or messed up the entire time so far. Set our BDP estimates directly + * to current cwnd */ + if (!cc->ewma_rtt_usec) { + uint64_t cwnd = cc->cwnd; + + /* If the channel is blocked, keep subtracting off the chan_q + * until we hit the min cwnd. */ + if (blocked_on_chan) { + cwnd = MAX(cwnd - chan_q, cc->cwnd_min); + cc->blocked_chan = 1; + } else { + cc->blocked_chan = 0; + } + + cc->bdp[BDP_ALG_CWND_RTT] = cwnd; + cc->bdp[BDP_ALG_INFLIGHT_RTT] = cwnd; + cc->bdp[BDP_ALG_SENDME_RATE] = cwnd; + cc->bdp[BDP_ALG_PIECEWISE] = cwnd; + + static ratelim_t dec_notice_limit = RATELIM_INIT(300); + log_fn_ratelim(&dec_notice_limit, LOG_NOTICE, LD_CIRC, + "Our clock has been stalled for the entire lifetime of a circuit. " + "Performance may be sub-optimal."); + + return blocked_on_chan; + } + + /* Congestion window based BDP will respond to changes in RTT only, and is + * relative to cwnd growth. It is useful for correcting for BDP + * overestimation, but if BDP is higher than the current cwnd, it will + * underestimate it. + * + * We multiply here first to avoid precision issues from min_RTT being + * close to ewma RTT. Since all fields are u64, there is plenty of + * room here to multiply first. + */ + cc->bdp[BDP_ALG_CWND_RTT] = cc->cwnd*cc->min_rtt_usec/cc->ewma_rtt_usec; + + /* + * If we have no pending streams, we do not have enough data to fill + * the BDP, so preserve our old estimates but do not make any more. + */ + if (!blocked_on_chan && !circuit_has_active_streams(circ, layer_hint)) { + log_info(LD_CIRC, + "CC: Streams drained. Spare package window: %"PRIu64 + ", no BDP update", cc->cwnd - cc->inflight); + + /* Clear SENDME timestamps; they will be wrong with intermittent data */ + SMARTLIST_FOREACH(cc->sendme_arrival_timestamps, uint64_t *, t, + tor_free(t)); + smartlist_clear(cc->sendme_arrival_timestamps); + } else if (curr_rtt_usec && is_monotime_clock_reliable()) { + /* Sendme-based BDP will quickly measure BDP in much less than + * a cwnd worth of data when in use (in 2-10 SENDMEs). + * + * But if the link goes idle, it will be vastly lower than true BDP. Hence + * we only compute it if we have either pending stream data, or streams + * are still blocked on the channel queued data. + * + * We also do not compute it if we do not have a current RTT passed in, + * because that means that monotime is currently stalled or just jumped. + */ + enqueue_timestamp(cc->sendme_arrival_timestamps, now_usec); + + if (smartlist_len(cc->sendme_arrival_timestamps) >= cc->bwe_sendme_min) { + /* If we have more sendmes than fit in a cwnd, trim the list. + * Those are not acurrately measuring throughput, if cwnd is + * currently smaller than BDP */ + while (smartlist_len(cc->sendme_arrival_timestamps) > + cc->bwe_sendme_min && + (uint64_t)smartlist_len(cc->sendme_arrival_timestamps) > + sendme_acks_per_cwnd(cc)) { + (void)dequeue_timestamp(cc->sendme_arrival_timestamps); + } + int sendme_cnt = smartlist_len(cc->sendme_arrival_timestamps); + + /* Calculate SENDME_BWE_COUNT pure average */ + timestamp_usec = peek_timestamp(cc->sendme_arrival_timestamps); + uint64_t delta = now_usec - timestamp_usec; + + /* The acked data is in sendme_cnt-1 chunks, because we are counting the + * data that is processed by the other endpoint *between* all of these + * sendmes. There's one less gap between the sendmes than the number + * of sendmes. */ + uint64_t cells = (sendme_cnt-1)*cc->sendme_inc; + + /* The bandwidth estimate is cells/delta, which when multiplied + * by min RTT obtains the BDP. However, we multiply first to + * avoid precision issues with the RTT being close to delta in size. */ + sendme_rate_bdp = cells*cc->min_rtt_usec/delta; + + /* Calculate BDP_EWMA_COUNT N-EWMA */ + cc->bdp[BDP_ALG_SENDME_RATE] = + n_count_ewma(sendme_rate_bdp, cc->bdp[BDP_ALG_SENDME_RATE], + cc->ewma_cwnd_cnt*sendme_acks_per_cwnd(cc)); + } + + /* In-flight BDP will cause the cwnd to drift down when underutilized. + * It is most useful when the local OR conn is blocked, so we only + * compute it if we're utilized. */ + cc->bdp[BDP_ALG_INFLIGHT_RTT] = + (cc->inflight - chan_q)*cc->min_rtt_usec/ + MAX(cc->ewma_rtt_usec, curr_rtt_usec); + } else { + /* We can still update inflight with just an EWMA RTT, but only + * if there is data flowing */ + cc->bdp[BDP_ALG_INFLIGHT_RTT] = + (cc->inflight - chan_q)*cc->min_rtt_usec/cc->ewma_rtt_usec; + } + + /* The orconn is blocked; use smaller of inflight vs SENDME */ + if (blocked_on_chan) { + log_info(LD_CIRC, "CC: Streams blocked on circ channel. Chanq: %d", + chan_q); + + /* A blocked channel is an immediate congestion signal, but it still + * happens only once per cwnd */ + if (!cc->blocked_chan) { + cc->next_cc_event = 0; + cc->blocked_chan = 1; + } + + if (cc->bdp[BDP_ALG_SENDME_RATE]) { + cc->bdp[BDP_ALG_PIECEWISE] = MIN(cc->bdp[BDP_ALG_INFLIGHT_RTT], + cc->bdp[BDP_ALG_SENDME_RATE]); + } else { + cc->bdp[BDP_ALG_PIECEWISE] = cc->bdp[BDP_ALG_INFLIGHT_RTT]; + } + } else { + /* If we were previously blocked, emit a new congestion event + * now that we are unblocked, to re-evaluate cwnd */ + if (cc->blocked_chan) { + cc->blocked_chan = 0; + cc->next_cc_event = 0; + log_info(LD_CIRC, "CC: Streams un-blocked on circ channel. Chanq: %d", + chan_q); + } + + cc->bdp[BDP_ALG_PIECEWISE] = MAX(cc->bdp[BDP_ALG_SENDME_RATE], + cc->bdp[BDP_ALG_CWND_RTT]); + } + + /* We can end up with no piecewise value if we didn't have either + * a SENDME estimate or enough data for an inflight estimate. + * It also happens on the very first sendme, since we need two + * to get a BDP. In these cases, use the cwnd method. */ + if (!cc->bdp[BDP_ALG_PIECEWISE]) { + cc->bdp[BDP_ALG_PIECEWISE] = cc->bdp[BDP_ALG_CWND_RTT]; + log_info(LD_CIRC, "CC: No piecewise BDP. Using %"PRIu64, + cc->bdp[BDP_ALG_PIECEWISE]); + } + + if (cc->next_cc_event == 0) { + if (CIRCUIT_IS_ORIGIN(circ)) { + log_info(LD_CIRC, + "CC: Circuit %d " + "SENDME RTT: %"PRIu64", %"PRIu64", %"PRIu64", %"PRIu64", " + "BDP estimates: " + "%"PRIu64", " + "%"PRIu64", " + "%"PRIu64", " + "%"PRIu64", " + "%"PRIu64". ", + CONST_TO_ORIGIN_CIRCUIT(circ)->global_identifier, + cc->min_rtt_usec/1000, + curr_rtt_usec/1000, + cc->ewma_rtt_usec/1000, + cc->max_rtt_usec/1000, + cc->bdp[BDP_ALG_INFLIGHT_RTT], + cc->bdp[BDP_ALG_CWND_RTT], + sendme_rate_bdp, + cc->bdp[BDP_ALG_SENDME_RATE], + cc->bdp[BDP_ALG_PIECEWISE] + ); + } else { + log_info(LD_CIRC, + "CC: Circuit %"PRIu64":%d " + "SENDME RTT: %"PRIu64", %"PRIu64", %"PRIu64", %"PRIu64", " + "%"PRIu64", " + "%"PRIu64", " + "%"PRIu64", " + "%"PRIu64", " + "%"PRIu64". ", + // XXX: actually, is this p_chan here? This is + // an or_circuit (exit or onion) + circ->n_chan->global_identifier, circ->n_circ_id, + cc->min_rtt_usec/1000, + curr_rtt_usec/1000, + cc->ewma_rtt_usec/1000, + cc->max_rtt_usec/1000, + cc->bdp[BDP_ALG_INFLIGHT_RTT], + cc->bdp[BDP_ALG_CWND_RTT], + sendme_rate_bdp, + cc->bdp[BDP_ALG_SENDME_RATE], + cc->bdp[BDP_ALG_PIECEWISE] + ); + } + } + + /* We updated BDP this round if either we had a blocked channel, or + * the curr_rtt_usec was not 0. */ + bool ret = (blocked_on_chan || curr_rtt_usec != 0); + if (ret) { + tor_trace(TR_SUBSYS(cc), TR_EV(bdp_update), circ, cc, curr_rtt_usec, + sendme_rate_bdp); + } + return ret; +} + +/** + * Dispatch the sendme to the appropriate congestion control algorithm. + */ +int +congestion_control_dispatch_cc_alg(congestion_control_t *cc, + const circuit_t *circ, + const crypt_path_t *layer_hint) +{ + int ret = -END_CIRC_REASON_INTERNAL; + switch (cc->cc_alg) { + case CC_ALG_WESTWOOD: + ret = congestion_control_westwood_process_sendme(cc, circ, layer_hint); + break; + + case CC_ALG_VEGAS: + ret = congestion_control_vegas_process_sendme(cc, circ, layer_hint); + break; + + case CC_ALG_NOLA: + ret = congestion_control_nola_process_sendme(cc, circ, layer_hint); + break; + + case CC_ALG_SENDME: + default: + tor_assert(0); + } + + if (cc->cwnd > cwnd_max) { + static ratelim_t cwnd_limit = RATELIM_INIT(60); + log_fn_ratelim(&cwnd_limit, LOG_NOTICE, LD_CIRC, + "Congestion control cwnd %"PRIu64" exceeds max %d, clamping.", + cc->cwnd, cwnd_max); + cc->cwnd = cwnd_max; + } + + return ret; +} diff --git a/src/core/or/congestion_control_common.h b/src/core/or/congestion_control_common.h new file mode 100644 index 0000000000..01dbc1ceb4 --- /dev/null +++ b/src/core/or/congestion_control_common.h @@ -0,0 +1,113 @@ +/* Copyright (c) 2019-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_common.h + * \brief Public APIs for congestion control + **/ + +#ifndef TOR_CONGESTION_CONTROL_COMMON_H +#define TOR_CONGESTION_CONTROL_COMMON_H + +#include "core/or/crypt_path_st.h" +#include "core/or/circuit_st.h" + +typedef struct congestion_control_t congestion_control_t; + +/** Wrapper for the free function, set the CC pointer to NULL after free */ +#define congestion_control_free(cc) \ + FREE_AND_NULL(congestion_control_t, congestion_control_free_, cc) + +void congestion_control_free_(congestion_control_t *cc); + +congestion_control_t *congestion_control_new(void); + +int congestion_control_dispatch_cc_alg(congestion_control_t *cc, + const circuit_t *circ, + const crypt_path_t *layer_hint); + +void congestion_control_note_cell_sent(congestion_control_t *cc, + const circuit_t *circ, + const crypt_path_t *cpath); + +bool congestion_control_update_circuit_estimates(congestion_control_t *, + const circuit_t *, + const crypt_path_t *); + +int congestion_control_get_package_window(const circuit_t *, + const crypt_path_t *); + +int sendme_get_inc_count(const circuit_t *, const crypt_path_t *); +bool circuit_sent_cell_for_sendme(const circuit_t *, const crypt_path_t *); +bool is_monotime_clock_reliable(void); + +void congestion_control_new_consensus_params(const networkstatus_t *ns); + +/* Ugh, C.. these four are private. Use the getter instead, when + * external to the congestion control code. */ +extern uint32_t or_conn_highwater; +extern uint32_t or_conn_lowwater; +extern int32_t cell_queue_high; +extern int32_t cell_queue_low; + +/** Stop writing on an orconn when its outbuf is this large */ +static inline uint32_t +or_conn_highwatermark(void) +{ + return or_conn_highwater; +} + +/** Resume writing on an orconn when its outbuf is less than this */ +static inline uint32_t +or_conn_lowwatermark(void) +{ + return or_conn_lowwater; +} + +/** Stop reading on edge connections when we have this many cells + * waiting on the appropriate queue. */ +static inline int32_t +cell_queue_highwatermark(void) +{ + return cell_queue_high; +} + +/** Start reading from edge connections again when we get down to this many + * cells. */ +static inline int32_t +cell_queue_lowwatermark(void) +{ + return cell_queue_low; +} + +/** + * Compute an N-count EWMA, aka N-EWMA. N-EWMA is defined as: + * EWMA = alpha*value + (1-alpha)*EWMA_prev + * with alpha = 2/(N+1). + * + * This works out to: + * EWMA = value*2/(N+1) + EMA_prev*(N-1)/(N+1) + * = (value*2 + EWMA_prev*(N-1))/(N+1) + */ +static inline uint64_t +n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N) +{ + if (prev == 0) + return curr; + else + return (2*curr + (N-1)*prev)/(N+1); +} + +/* Private section starts. */ +#ifdef TOR_CONGESTION_CONTROL_PRIVATE + +/* + * Unit tests declaractions. + */ +#ifdef TOR_UNIT_TESTS + +#endif /* defined(TOR_UNIT_TESTS) */ + +#endif /* defined(TOR_CONGESTION_CONTROL_PRIVATE) */ + +#endif /* !defined(TOR_CONGESTION_CONTROL_COMMON_H) */ diff --git a/src/core/or/congestion_control_flow.c b/src/core/or/congestion_control_flow.c new file mode 100644 index 0000000000..805654664c --- /dev/null +++ b/src/core/or/congestion_control_flow.c @@ -0,0 +1,710 @@ +/* Copyright (c) 2019-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_flow.c + * \brief Code that implements flow control for congestion controlled + * circuits. + */ + +#define TOR_CONGESTION_CONTROL_FLOW_PRIVATE + +#include "core/or/or.h" + +#include "core/or/relay.h" +#include "core/mainloop/connection.h" +#include "core/or/connection_edge.h" +#include "core/mainloop/mainloop.h" +#include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_flow.h" +#include "core/or/congestion_control_st.h" +#include "core/or/circuitlist.h" +#include "core/or/trace_probes_cc.h" +#include "feature/nodelist/networkstatus.h" +#include "trunnel/flow_control_cells.h" + +#include "core/or/connection_st.h" +#include "core/or/cell_st.h" +#include "app/config/config.h" + +/** Cache consensus parameters */ +static uint32_t xoff_client; +static uint32_t xoff_exit; + +static uint32_t xon_change_pct; +static uint32_t xon_ewma_cnt; +static uint32_t xon_rate_bytes; + +/* In normal operation, we can get a burst of up to 32 cells before returning + * to libevent to flush the outbuf. This is a heuristic from hardcoded values + * and strange logic in connection_bucket_get_share(). */ +#define MAX_EXPECTED_CELL_BURST 32 + +/* The following three are for dropmark rate limiting. They define when we + * scale down our XON, XOFF, and xmit byte counts. Early scaling is beneficial + * because it limits the ability of spurious XON/XOFF to be sent after large + * amounts of data without XON/XOFF. At these limits, after 10MB of data (or + * more), an adversary can only inject (log2(10MB)-log2(200*500))*100 ~= 1000 + * cells of fake XOFF/XON before the xmit byte count will be halved enough to + * triggering a limit. */ +#define XON_COUNT_SCALE_AT 200 +#define XOFF_COUNT_SCALE_AT 200 +#define ONE_MEGABYTE (UINT64_C(1) << 20) +#define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE) + +/** + * Return the congestion control object of the given edge connection. + * + * Returns NULL if the edge connection doesn't have a cpath_layer or not + * attached to a circuit. But also if the cpath_layer or circuit doesn't have a + * congestion control object. + */ +static inline const congestion_control_t * +edge_get_ccontrol(const edge_connection_t *edge) +{ + if (edge->cpath_layer) + return edge->cpath_layer->ccontrol; + else if (edge->on_circuit) + return edge->on_circuit->ccontrol; + else + return NULL; +} + +/** + * Update global congestion control related consensus parameter values, every + * consensus update. + * + * More details for each of the parameters can be found in proposal 324, + * section 6.5 including tuning notes. + */ +void +flow_control_new_consensus_params(const networkstatus_t *ns) +{ +#define CC_XOFF_CLIENT_DFLT 500 +#define CC_XOFF_CLIENT_MIN 1 +#define CC_XOFF_CLIENT_MAX 10000 + xoff_client = networkstatus_get_param(ns, "cc_xoff_client", + CC_XOFF_CLIENT_DFLT, + CC_XOFF_CLIENT_MIN, + CC_XOFF_CLIENT_MAX)*RELAY_PAYLOAD_SIZE; + +#define CC_XOFF_EXIT_DFLT 500 +#define CC_XOFF_EXIT_MIN 1 +#define CC_XOFF_EXIT_MAX 10000 + xoff_exit = networkstatus_get_param(ns, "cc_xoff_exit", + CC_XOFF_EXIT_DFLT, + CC_XOFF_EXIT_MIN, + CC_XOFF_EXIT_MAX)*RELAY_PAYLOAD_SIZE; + +#define CC_XON_CHANGE_PCT_DFLT 25 +#define CC_XON_CHANGE_PCT_MIN 1 +#define CC_XON_CHANGE_PCT_MAX 99 + xon_change_pct = networkstatus_get_param(ns, "cc_xon_change_pct", + CC_XON_CHANGE_PCT_DFLT, + CC_XON_CHANGE_PCT_MIN, + CC_XON_CHANGE_PCT_MAX); + +#define CC_XON_RATE_BYTES_DFLT (500) +#define CC_XON_RATE_BYTES_MIN (1) +#define CC_XON_RATE_BYTES_MAX (5000) + xon_rate_bytes = networkstatus_get_param(ns, "cc_xon_rate", + CC_XON_RATE_BYTES_DFLT, + CC_XON_RATE_BYTES_MIN, + CC_XON_RATE_BYTES_MAX)*RELAY_PAYLOAD_SIZE; + +#define CC_XON_EWMA_CNT_DFLT (2) +#define CC_XON_EWMA_CNT_MIN (1) +#define CC_XON_EWMA_CNT_MAX (100) + xon_ewma_cnt = networkstatus_get_param(ns, "cc_xon_ewma_cnt", + CC_XON_EWMA_CNT_DFLT, + CC_XON_EWMA_CNT_MIN, + CC_XON_EWMA_CNT_MAX); +} + +/** + * Send an XOFF for this stream, and note that we sent one + */ +static void +circuit_send_stream_xoff(edge_connection_t *stream) +{ + xoff_cell_t xoff; + uint8_t payload[CELL_PAYLOAD_SIZE]; + ssize_t xoff_size; + + memset(&xoff, 0, sizeof(xoff)); + memset(payload, 0, sizeof(payload)); + + xoff_cell_set_version(&xoff, 0); + + if ((xoff_size = xoff_cell_encode(payload, CELL_PAYLOAD_SIZE, &xoff)) < 0) { + log_warn(LD_BUG, "Failed to encode xon cell"); + return; + } + + if (connection_edge_send_command(stream, RELAY_COMMAND_XOFF, + (char*)payload, (size_t)xoff_size) == 0) { + stream->xoff_sent = true; + } +} + +/** + * Compute the recent drain rate (write rate) for this edge + * connection and return it, in KB/sec (1000 bytes/sec). + * + * Returns 0 if the monotime clock is busted. + */ +static inline uint32_t +compute_drain_rate(const edge_connection_t *stream) +{ + if (BUG(!is_monotime_clock_reliable())) { + log_warn(LD_BUG, "Computing drain rate with stalled monotime clock"); + return 0; + } + + uint64_t delta = monotime_absolute_usec() - stream->drain_start_usec; + + if (delta == 0) { + log_warn(LD_BUG, "Computing stream drain rate with zero time delta"); + return 0; + } + + /* Overflow checks */ + if (stream->prev_drained_bytes > INT32_MAX/1000 || /* Intermediate */ + stream->prev_drained_bytes/delta > INT32_MAX/1000) { /* full value */ + return INT32_MAX; + } + + /* kb/sec = bytes/usec * 1000 usec/msec * 1000 msec/sec * kb/1000bytes */ + return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta); +} + +/** + * Send an XON for this stream, with appropriate advisory rate information. + * + * Reverts the xoff sent status, and stores the rate information we sent, + * in case it changes. + */ +static void +circuit_send_stream_xon(edge_connection_t *stream) +{ + xon_cell_t xon; + uint8_t payload[CELL_PAYLOAD_SIZE]; + ssize_t xon_size; + + memset(&xon, 0, sizeof(xon)); + memset(payload, 0, sizeof(payload)); + + xon_cell_set_version(&xon, 0); + xon_cell_set_kbps_ewma(&xon, stream->ewma_drain_rate); + + if ((xon_size = xon_cell_encode(payload, CELL_PAYLOAD_SIZE, &xon)) < 0) { + log_warn(LD_BUG, "Failed to encode xon cell"); + return; + } + + /* Store the advisory rate information, to send advisory updates if + * it changes */ + stream->ewma_rate_last_sent = stream->ewma_drain_rate; + + if (connection_edge_send_command(stream, RELAY_COMMAND_XON, (char*)payload, + (size_t)xon_size) == 0) { + /* Revert the xoff sent status, so we can send another one if need be */ + stream->xoff_sent = false; + } +} + +/** + * Process a stream XOFF, parsing it, and then stopping reading on + * the edge connection. + * + * Record that we have recieved an xoff, so we know not to resume + * reading on this edge conn until we get an XON. + * + * Returns false if the XOFF did not validate; true if it does. + */ +bool +circuit_process_stream_xoff(edge_connection_t *conn, + const crypt_path_t *layer_hint, + const cell_t *cell) +{ + (void)cell; + bool retval = true; + + if (BUG(!conn)) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got XOFF on invalid stream?"); + return false; + } + + /* Make sure this XOFF came from the right hop */ + if (layer_hint && layer_hint != conn->cpath_layer) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got XOFF from wrong hop."); + return false; + } + + if (edge_get_ccontrol(conn) == NULL) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got XOFF for non-congestion control circuit"); + return false; + } + + if (conn->xoff_received) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got multiple XOFF on connection"); + return false; + } + + /* If we are near the max, scale everything down */ + if (conn->num_xoff_recv == XOFF_COUNT_SCALE_AT) { + log_info(LD_EDGE, "Scaling down for XOFF count: %d %d %d", + conn->total_bytes_xmit, + conn->num_xoff_recv, + conn->num_xon_recv); + conn->total_bytes_xmit /= 2; + conn->num_xoff_recv /= 2; + conn->num_xon_recv /= 2; + } + + conn->num_xoff_recv++; + + /* Client-side check to make sure that XOFF is not sent too early, + * for dropmark attacks. The main sidechannel risk is early cells, + * but we also check to make sure that we have not received more XOFFs + * than could have been generated by the bytes we sent. + */ + if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) { + uint32_t limit = 0; + + /* TODO: This limit technically needs to come from negotiation, + * and be bounds checked for sanity, because the other endpoint + * may have a different consensus */ + if (conn->hs_ident) + limit = xoff_client; + else + limit = xoff_exit; + + if (conn->total_bytes_xmit < limit*conn->num_xoff_recv) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got extra XOFF for bytes sent. Got %d, expected max %d", + conn->num_xoff_recv, conn->total_bytes_xmit/limit); + /* We still process this, because the only dropmark defenses + * in C tor are via the vanguards addon's use of the read valid + * cells. So just signal that we think this is not valid protocol + * data and proceed. */ + retval = false; + } + } + + // TODO: Count how many xoffs we have; log if "too many", for shadow + // analysis of chatter. Possibly add to extra-info? + + log_info(LD_EDGE, "Got XOFF!"); + connection_stop_reading(TO_CONN(conn)); + conn->xoff_received = true; + + return retval; +} + +/** + * Process a stream XON, and if it validates, clear the xoff + * flag and resume reading on this edge connection. + * + * Also, use provided rate information to rate limit + * reading on this edge (or packagaing from it onto + * the circuit), to avoid XON/XOFF chatter. + * + * Returns true if the XON validates, false otherwise. + */ +bool +circuit_process_stream_xon(edge_connection_t *conn, + const crypt_path_t *layer_hint, + const cell_t *cell) +{ + xon_cell_t *xon; + bool retval = true; + + if (BUG(!conn)) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got XON on invalid stream?"); + return false; + } + + /* Make sure this XON came from the right hop */ + if (layer_hint && layer_hint != conn->cpath_layer) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got XON from wrong hop."); + return false; + } + + if (edge_get_ccontrol(conn) == NULL) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got XON for non-congestion control circuit"); + return false; + } + + if (xon_cell_parse(&xon, cell->payload+RELAY_HEADER_SIZE, + CELL_PAYLOAD_SIZE-RELAY_HEADER_SIZE) < 0) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Received malformed XON cell."); + return false; + } + + /* If we are near the max, scale everything down */ + if (conn->num_xon_recv == XON_COUNT_SCALE_AT) { + log_info(LD_EDGE, "Scaling down for XON count: %d %d %d", + conn->total_bytes_xmit, + conn->num_xoff_recv, + conn->num_xon_recv); + conn->total_bytes_xmit /= 2; + conn->num_xoff_recv /= 2; + conn->num_xon_recv /= 2; + } + + conn->num_xon_recv++; + + /* Client-side check to make sure that XON is not sent too early, + * for dropmark attacks. The main sidechannel risk is early cells, + * but we also check to see that we did not get more XONs than make + * sense for the number of bytes we sent. + */ + if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) { + uint32_t limit = 0; + + /* TODO: This limit technically needs to come from negotiation, + * and be bounds checked for sanity, because the other endpoint + * may have a different consensus */ + if (conn->hs_ident) + limit = MIN(xoff_client, xon_rate_bytes); + else + limit = MIN(xoff_exit, xon_rate_bytes); + + if (conn->total_bytes_xmit < limit*conn->num_xon_recv) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got extra XON for bytes sent. Got %d, expected max %d", + conn->num_xon_recv, conn->total_bytes_xmit/limit); + + /* We still process this, because the only dropmark defenses + * in C tor are via the vanguards addon's use of the read valid + * cells. So just signal that we think this is not valid protocol + * data and proceed. */ + retval = false; + } + } + + log_info(LD_EDGE, "Got XON: %d", xon->kbps_ewma); + + /* Adjust the token bucket of this edge connection with the drain rate in + * the XON. Rate is in bytes from kilobit (kpbs). */ + uint64_t rate = ((uint64_t) xon_cell_get_kbps_ewma(xon) * 1000); + if (rate == 0 || INT32_MAX < rate) { + /* No rate. */ + rate = INT32_MAX; + } + token_bucket_rw_adjust(&conn->bucket, (uint32_t) rate, (uint32_t) rate); + + if (conn->xoff_received) { + /* Clear the fact that we got an XOFF, so that this edge can + * start and stop reading normally */ + conn->xoff_received = false; + connection_start_reading(TO_CONN(conn)); + } + + xon_cell_free(xon); + + return retval; +} + +/** + * Called from sendme_stream_data_received(), when data arrives + * from a circuit to our edge's outbuf, to decide if we need to send + * an XOFF. + * + * Returns the amount of cells remaining until the buffer is full, at + * which point it sends an XOFF, and returns 0. + * + * Returns less than 0 if we have queued more than a congestion window + * worth of data and need to close the circuit. + */ +int +flow_control_decide_xoff(edge_connection_t *stream) +{ + size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream)); + uint32_t buffer_limit_xoff = 0; + + if (BUG(edge_get_ccontrol(stream) == NULL)) { + log_err(LD_BUG, "Flow control called for non-congestion control circuit"); + return -1; + } + + /* Onion services and clients are typically localhost edges, so they + * need different buffering limits than exits do */ + if (TO_CONN(stream)->type == CONN_TYPE_AP || stream->hs_ident != NULL) { + buffer_limit_xoff = xoff_client; + } else { + buffer_limit_xoff = xoff_exit; + } + + if (total_buffered > buffer_limit_xoff) { + if (!stream->xoff_sent) { + log_info(LD_EDGE, "Sending XOFF: %"TOR_PRIuSZ" %d", + total_buffered, buffer_limit_xoff); + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream); + + circuit_send_stream_xoff(stream); + + /* Clear the drain rate. It is considered wrong if we + * got all the way to XOFF */ + stream->ewma_drain_rate = 0; + } + } + + /* If the outbuf has accumulated more than the expected burst limit of + * cells, then assume it is not draining, and call decide_xon. We must + * do this because writes only happen when the socket unblocks, so + * may not otherwise notice accumulation of data in the outbuf for + * advisory XONs. */ + if (total_buffered > MAX_EXPECTED_CELL_BURST*RELAY_PAYLOAD_SIZE) { + flow_control_decide_xon(stream, 0); + } + + /* Flow control always takes more data; we rely on the oomkiller to + * handle misbehavior. */ + return 0; +} + +/** + * Returns true if the stream's drain rate has changed significantly. + * + * Returns false if the monotime clock is stalled, or if we have + * no previous drain rate information. + */ +static bool +stream_drain_rate_changed(const edge_connection_t *stream) +{ + if (!is_monotime_clock_reliable()) { + return false; + } + + if (!stream->ewma_rate_last_sent) { + return false; + } + + if (stream->ewma_drain_rate > + (100+(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) { + return true; + } + + if (stream->ewma_drain_rate < + (100-(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) { + return true; + } + + return false; +} + +/** + * Called whenever we drain an edge connection outbuf by writing on + * its socket, to decide if it is time to send an xon. + * + * The n_written parameter tells us how many bytes we have written + * this time, which is used to compute the advisory drain rate fields. + */ +void +flow_control_decide_xon(edge_connection_t *stream, size_t n_written) +{ + size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream)); + + /* Bounds check the number of drained bytes, and scale */ + if (stream->drained_bytes >= UINT32_MAX - n_written) { + /* Cut the bytes in half, and move the start time up halfway to now + * (if we have one). */ + stream->drained_bytes /= 2; + + if (stream->drain_start_usec) { + uint64_t now = monotime_absolute_usec(); + + stream->drain_start_usec = now - (now-stream->drain_start_usec)/2; + } + } + + /* Accumulate drained bytes since last rate computation */ + stream->drained_bytes += n_written; + + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written); + + /* Check for bad monotime clock and bytecount wrap */ + if (!is_monotime_clock_reliable()) { + /* If the monotime clock ever goes wrong, the safest thing to do + * is just clear our short-term rate info and wait for the clock to + * become reliable again.. */ + stream->drain_start_usec = 0; + stream->drained_bytes = 0; + } else { + /* If we have no drain start timestamp, and we still have + * remaining buffer, start the buffering counter */ + if (!stream->drain_start_usec && total_buffered > 0) { + log_debug(LD_EDGE, "Began edge buffering: %d %d %"TOR_PRIuSZ, + stream->ewma_rate_last_sent, + stream->ewma_drain_rate, + total_buffered); + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start), + stream); + stream->drain_start_usec = monotime_absolute_usec(); + stream->drained_bytes = 0; + } + } + + if (stream->drain_start_usec) { + /* If we have spent enough time in a queued state, update our drain + * rate. */ + if (stream->drained_bytes > xon_rate_bytes) { + /* No previous drained bytes means it is the first time we are computing + * it so use the value we just drained onto the socket as a baseline. It + * won't be accurate but it will be a start towards the right value. + * + * We have to do this in order to have a drain rate else we could be + * sending a drain rate of 0 in an XON which would be undesirable and + * basically like sending an XOFF. */ + if (stream->prev_drained_bytes == 0) { + stream->prev_drained_bytes = stream->drained_bytes; + } + uint32_t drain_rate = compute_drain_rate(stream); + /* Once the drain rate has been computed, note how many bytes we just + * drained so it can be used at the next calculation. We do this here + * because it gets reset once the rate is changed. */ + stream->prev_drained_bytes = stream->drained_bytes; + + if (drain_rate) { + stream->ewma_drain_rate = + (uint32_t)n_count_ewma(drain_rate, + stream->ewma_drain_rate, + xon_ewma_cnt); + log_debug(LD_EDGE, "Updating drain rate: %d %d %"TOR_PRIuSZ, + drain_rate, + stream->ewma_drain_rate, + total_buffered); + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update), + stream, drain_rate); + /* Reset recent byte counts. This prevents us from sending advisory + * XONs more frequent than every xon_rate_bytes. */ + stream->drained_bytes = 0; + stream->drain_start_usec = 0; + } + } + } + + /* If we don't have an XOFF outstanding, consider updating an + * old rate */ + if (!stream->xoff_sent) { + if (stream_drain_rate_changed(stream)) { + /* If we are still buffering and the rate changed, update + * advisory XON */ + log_info(LD_EDGE, "Sending rate-change XON: %d %d %"TOR_PRIuSZ, + stream->ewma_rate_last_sent, + stream->ewma_drain_rate, + total_buffered); + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream); + circuit_send_stream_xon(stream); + } + } else if (total_buffered == 0) { + log_info(LD_EDGE, "Sending XON: %d %d %"TOR_PRIuSZ, + stream->ewma_rate_last_sent, + stream->ewma_drain_rate, + total_buffered); + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream); + circuit_send_stream_xon(stream); + } + + /* If the buffer has fully emptied, clear the drain timestamp, + * so we can total only bytes drained while outbuf is 0. */ + if (total_buffered == 0) { + stream->drain_start_usec = 0; + + /* After we've spent 'xon_rate_bytes' with the queue fully drained, + * double any rate we sent. */ + if (stream->drained_bytes >= xon_rate_bytes && + stream->ewma_rate_last_sent) { + stream->ewma_drain_rate = MIN(INT32_MAX, 2*stream->ewma_drain_rate); + + log_debug(LD_EDGE, + "Queue empty for xon_rate_limit bytes: %d %d", + stream->ewma_rate_last_sent, + stream->ewma_drain_rate); + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream); + /* Resetting the drained bytes count. We need to keep its value as a + * previous so the drain rate calculation takes into account what was + * actually drain the last time. */ + stream->prev_drained_bytes = stream->drained_bytes; + stream->drained_bytes = 0; + } + } + + return; +} + +/** + * Note that we packaged some data on this stream. Used to enforce + * client-side dropmark limits + */ +void +flow_control_note_sent_data(edge_connection_t *stream, size_t len) +{ + /* If we are near the max, scale everything down */ + if (stream->total_bytes_xmit >= TOTAL_XMIT_SCALE_AT-len) { + log_info(LD_EDGE, "Scaling down for flow control xmit bytes:: %d %d %d", + stream->total_bytes_xmit, + stream->num_xoff_recv, + stream->num_xon_recv); + + stream->total_bytes_xmit /= 2; + stream->num_xoff_recv /= 2; + stream->num_xon_recv /= 2; + } + + stream->total_bytes_xmit += len; +} + +/** Returns true if an edge connection uses flow control */ +bool +edge_uses_flow_control(const edge_connection_t *stream) +{ + bool ret = (stream->on_circuit && stream->on_circuit->ccontrol) || + (stream->cpath_layer && stream->cpath_layer->ccontrol); + + /* All circuits with congestion control use flow control */ + return ret; +} + +/** + * Returns the max RTT for the circuit that carries this stream, + * as observed by congestion control. + */ +uint64_t +edge_get_max_rtt(const edge_connection_t *stream) +{ + if (stream->on_circuit && stream->on_circuit->ccontrol) + return stream->on_circuit->ccontrol->max_rtt_usec; + else if (stream->cpath_layer && stream->cpath_layer->ccontrol) + return stream->cpath_layer->ccontrol->max_rtt_usec; + + return 0; +} + +/** Returns true if a connection is an edge conn that uses flow control */ +bool +conn_uses_flow_control(connection_t *conn) +{ + bool ret = false; + + if (CONN_IS_EDGE(conn)) { + edge_connection_t *edge = TO_EDGE_CONN(conn); + + if (edge_uses_flow_control(edge)) { + ret = true; + } + } + + return ret; +} + diff --git a/src/core/or/congestion_control_flow.h b/src/core/or/congestion_control_flow.h new file mode 100644 index 0000000000..6c318027ea --- /dev/null +++ b/src/core/or/congestion_control_flow.h @@ -0,0 +1,48 @@ +/* Copyright (c) 2019-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_flow.h + * \brief APIs for stream flow control on congestion controlled circuits. + **/ + +#ifndef TOR_CONGESTION_CONTROL_FLOW_H +#define TOR_CONGESTION_CONTROL_FLOW_H + +#include "core/or/crypt_path_st.h" +#include "core/or/circuit_st.h" +#include "core/or/edge_connection_st.h" + +void flow_control_new_consensus_params(const struct networkstatus_t *); + +bool circuit_process_stream_xoff(edge_connection_t *conn, + const crypt_path_t *layer_hint, + const cell_t *cell); +bool circuit_process_stream_xon(edge_connection_t *conn, + const crypt_path_t *layer_hint, + const cell_t *cell); + +int flow_control_decide_xoff(edge_connection_t *stream); +void flow_control_decide_xon(edge_connection_t *stream, size_t n_written); + +void flow_control_note_sent_data(edge_connection_t *stream, size_t len); + +bool edge_uses_flow_control(const edge_connection_t *stream); + +bool conn_uses_flow_control(connection_t *stream); + +uint64_t edge_get_max_rtt(const edge_connection_t *); + +/* Private section starts. */ +#ifdef TOR_CONGESTION_CONTROL_FLOW_PRIVATE + +/* + * Unit tests declaractions. + */ +#ifdef TOR_UNIT_TESTS + +#endif /* defined(TOR_UNIT_TESTS) */ + +#endif /* defined(TOR_CONGESTION_CONTROL_FLOW_PRIVATE) */ + +#endif /* !defined(TOR_CONGESTION_CONTROL_FLOW_H) */ diff --git a/src/core/or/congestion_control_nola.c b/src/core/or/congestion_control_nola.c new file mode 100644 index 0000000000..09f88d4699 --- /dev/null +++ b/src/core/or/congestion_control_nola.c @@ -0,0 +1,126 @@ +/* Copyright (c) 2019-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_nola.c + * \brief Code that implements the TOR_NOLA congestion control algorithm + * from Proposal #324. + */ + +#define TOR_CONGESTION_CONTROL_NOLA_PRIVATE + +#include "core/or/or.h" + +#include "core/or/crypt_path.h" +#include "core/or/or_circuit_st.h" +#include "core/or/sendme.h" +#include "core/or/congestion_control_st.h" +#include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_nola.h" +#include "core/or/circuituse.h" +#include "core/or/circuitlist.h" +#include "core/or/origin_circuit_st.h" +#include "core/or/channel.h" +#include "feature/nodelist/networkstatus.h" + +#define NOLA_BDP_OVERSHOOT 100 + +/** + * Cache NOLA consensus parameters. + */ +void +congestion_control_nola_set_params(congestion_control_t *cc) +{ + tor_assert(cc->cc_alg == CC_ALG_NOLA); + + cc->nola_params.bdp_overshoot = + networkstatus_get_param(NULL, "cc_nola_overshoot", + NOLA_BDP_OVERSHOOT, + 0, + 1000); +} + +/** +* Process a SENDME and update the congestion window according to the +* rules specified in TOR_NOLA of Proposal #324. +* +* TOR_NOLA updates the congestion window to match the current +* BDP estimate, every sendme. Because this can result in downward +* drift, a fixed overhead is added to the BDP estimate. This will +* cause some queuing, but ensures that the algorithm always uses +* the full BDP. +* +* To handle the case where the local orconn blocks, TOR_NOLA uses +* the 'piecewise' BDP estimate, which uses more a conservative BDP +* estimate method when blocking occurrs, but a more aggressive BDP +* estimate when there is no local blocking. This minimizes local +* client queues. +*/ +int +congestion_control_nola_process_sendme(congestion_control_t *cc, + const circuit_t *circ, + const crypt_path_t *layer_hint) +{ + tor_assert(cc && cc->cc_alg == CC_ALG_NOLA); + tor_assert(circ); + + if (cc->next_cc_event) + cc->next_cc_event--; + + /* If we get a congestion event, the only thing NOLA + * does is note this as if we exited slow-start + * (which for NOLA just means we finished our ICW). */ + if (cc->next_cc_event == 0) + cc->in_slow_start = 0; + + /* If we did not successfully update BDP, we must return. Otherwise, + * NOLA can drift downwards */ + if (!congestion_control_update_circuit_estimates(cc, circ, layer_hint)) { + cc->inflight = cc->inflight - cc->sendme_inc; + return 0; + } + + /* We overshoot the BDP by the cwnd_inc param amount, because BDP + * may otherwise drift down. This helps us probe for more capacity. + * But there is no sense to do it if the local channel is blocked. */ + if (cc->blocked_chan) + cc->cwnd = cc->bdp[cc->bdp_alg]; + else + cc->cwnd = cc->bdp[cc->bdp_alg] + cc->nola_params.bdp_overshoot; + + /* cwnd can never fall below 1 increment */ + cc->cwnd = MAX(cc->cwnd, cc->cwnd_min); + + if (CIRCUIT_IS_ORIGIN(circ)) { + log_info(LD_CIRC, + "CC TOR_NOLA: Circuit %d " + "CWND: %"PRIu64", " + "INFL: %"PRIu64", " + "NCCE: %"PRIu64", " + "SS: %d", + CONST_TO_ORIGIN_CIRCUIT(circ)->global_identifier, + cc->cwnd, + cc->inflight, + cc->next_cc_event, + cc->in_slow_start + ); + } else { + log_info(LD_CIRC, + "CC TOR_NOLA: Circuit %"PRIu64":%d " + "CWND: %"PRIu64", " + "INFL: %"PRIu64", " + "NCCE: %"PRIu64", " + "SS: %d", + circ->n_chan->global_identifier, circ->n_circ_id, + cc->cwnd, + cc->inflight, + cc->next_cc_event, + cc->in_slow_start + ); + } + + /* Update inflight with ack */ + cc->inflight = cc->inflight - cc->sendme_inc; + + return 0; +} diff --git a/src/core/or/congestion_control_nola.h b/src/core/or/congestion_control_nola.h new file mode 100644 index 0000000000..9c7d6e0635 --- /dev/null +++ b/src/core/or/congestion_control_nola.h @@ -0,0 +1,33 @@ +/* Copyright (c) 2019-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_nola.h + * \brief Private-ish APIs for the TOR_NOLA congestion control algorithm + **/ + +#ifndef TOR_CONGESTION_CONTROL_NOLA_H +#define TOR_CONGESTION_CONTROL_NOLA_H + +#include "core/or/crypt_path_st.h" +#include "core/or/circuit_st.h" + +/* Processing SENDME cell. */ +int congestion_control_nola_process_sendme(struct congestion_control_t *cc, + const circuit_t *circ, + const crypt_path_t *layer_hint); +void congestion_control_nola_set_params(struct congestion_control_t *cc); + +/* Private section starts. */ +#ifdef TOR_CONGESTION_CONTROL_NOLA_PRIVATE + +/* + * Unit tests declaractions. + */ +#ifdef TOR_UNIT_TESTS + +#endif /* defined(TOR_UNIT_TESTS) */ + +#endif /* defined(TOR_CONGESTION_CONTROL_NOLA_PRIVATE) */ + +#endif /* !defined(TOR_CONGESTION_CONTROL_NOLA_H) */ diff --git a/src/core/or/congestion_control_st.h b/src/core/or/congestion_control_st.h new file mode 100644 index 0000000000..251ebd82e3 --- /dev/null +++ b/src/core/or/congestion_control_st.h @@ -0,0 +1,257 @@ +/* Copyright (c) 2019-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_st.h + * \brief Structure definitions for congestion control. + **/ + +#ifndef CONGESTION_CONTROL_ST_H +#define CONGESTION_CONTROL_ST_H + +#include "core/or/crypt_path_st.h" +#include "core/or/circuit_st.h" + +/** Signifies which sendme algorithm to use */ +typedef enum { + /** OG Tor fixed-sized circ and stream windows. It sucks, but it is important + * to make sure that the new algs can compete with the old garbage. */ + CC_ALG_SENDME = 0, + + /** + * Prop#324 TOR_WESTWOOD - Deliberately agressive. Westwood may not even + * converge to fairness in some cases because max RTT will also increase + * on congesgtion, which boosts the Westwood RTT congestion threshhold. So it + * can cause runaway queue bloat, which may or may not lead to a robot + * uprising... Ok that's Westworld, not Westwood. Still, we need to test + * Vegas and NOLA against something more agressive to ensure they do not + * starve in the presence of cheaters. We also need to make sure cheaters + * trigger the oomkiller in those cases. + */ + CC_ALG_WESTWOOD = 1, + + /** + * Prop#324 TOR_VEGAS - TCP Vegas-style BDP tracker. Because Vegas backs off + * whenever it detects queue delay, it can be beaten out by more agressive + * algs. However, in live network testing, it seems to do just fine against + * current SENDMEs. It outperforms Westwood and does not stall. */ + CC_ALG_VEGAS = 2, + + /** + * Prop#324: TOR_NOLA - NOLA looks the BDP right in the eye and uses it + * immediately as CWND. No slow start, no other congestion signals, no delay, + * no bullshit. Like TOR_VEGAS, it also uses agressive BDP estimates, to + * avoid out-competition. It seems a bit better throughput than Vegas, + * but its agressive BDP and rapid updates may lead to more queue latency. */ + CC_ALG_NOLA = 3, +} cc_alg_t; + +/* Total number of CC algs in cc_alg_t enum */ +#define NUM_CC_ALGS (CC_ALG_NOLA+1) + +/** Signifies how we estimate circuit BDP */ +typedef enum { + /* CWND-based BDP will respond to changes in RTT only, and is relative + * to cwnd growth. So in slow-start, this will under-estimate BDP */ + BDP_ALG_CWND_RTT = 0, + + /* Sendme-based BDP will quickly measure BDP in less than + * a cwnd worth of data when in use. So it should be good for slow-start. + * But if the link goes idle, it will be vastly lower than true BDP. Thus, + * this estimate gets reset when the cwnd is not fully utilized. */ + BDP_ALG_SENDME_RATE = 1, + + /* Inflight BDP is similar to the cwnd estimator, except it uses + * packets inflight minus local circuit queues instead of current cwnd. + * Because it is strictly less than or equal to the cwnd, it will cause + * the cwnd to drift downward. It is only used if the local OR connection + * is blocked. */ + BDP_ALG_INFLIGHT_RTT = 2, + + /* The Piecewise BDP estimator uses the CWND estimator before there + * are sufficient SENDMEs to calculate the SENDME estimator. At that + * point, it uses the SENDME estimator, unless the local OR connection + * becomes blocked. In that case, it switches to the inflight estimator. */ + BDP_ALG_PIECEWISE = 3, + +} bdp_alg_t; + +/** Total number of BDP algs in bdp_alg_t enum */ +#define NUM_BDP_ALGS (BDP_ALG_PIECEWISE+1) + +/** Westwood algorithm parameters */ +struct westwood_params_t { + /** Cwnd backoff multiplier upon congestion (as percent) */ + uint8_t cwnd_backoff_m; + /** Max RTT backoff multiplier upon congestion (as percent) */ + uint8_t rtt_backoff_m; + + /** Threshold between min and max RTT, to signal congestion (percent) */ + uint8_t rtt_thresh; + + /** + * If true, use minimum of BDP and backoff multiplication in backoff. + * If false, use maximum of BDP and backoff multiplication in backoff. */ + bool min_backoff; +}; + +/** Vegas algorithm parameters. */ +struct vegas_params_t { + /** The queue use allowed before we exit slow start */ + uint16_t gamma; + /** The queue use below which we increment cwnd */ + uint16_t alpha; + /** The queue use above which we decrement cwnd */ + uint16_t beta; + /** Weighted average (percent) between cwnd estimator and + * piecewise estimator. */ + uint8_t bdp_mix_pct; +}; + +/** NOLA consensus params */ +struct nola_params_t { + /** How many cells to add to BDP estimate to obtain cwnd */ + uint16_t bdp_overshoot; +}; + +/** Fields common to all congestion control algorithms */ +typedef struct congestion_control_t { + /** + * Smartlist of uint64_t monotime usec timestamps of when we sent a data + * cell that is pending a sendme. FIFO queue that is managed similar to + * sendme_last_digests. */ + smartlist_t *sendme_pending_timestamps; + + /** + * Smartlist of uint64_t monotime timestamp of when sendme's arrived. + * FIFO queue that is managed similar to sendme_last_digests. + * Used to estimate circuitbandwidth and BDP. */ + smartlist_t *sendme_arrival_timestamps; + + /** RTT time data for congestion control. */ + uint64_t ewma_rtt_usec; + uint64_t min_rtt_usec; + uint64_t max_rtt_usec; + + /* BDP estimates by algorithm */ + uint64_t bdp[NUM_BDP_ALGS]; + + /** Congestion window */ + uint64_t cwnd; + + /** Number of cells in-flight (sent but awaiting SENDME ack). */ + uint64_t inflight; + + /** + * For steady-state: the number of sendme acks until we will acknowledge + * a congestion event again. It starts out as the number of sendme acks + * in a congestion windowm and is decremented each ack. When this reaches + * 0, it means we should examine our congestion algorithm conditions. + * In this way, we only react to one congestion event per congestion window. + * + * It is also reset to 0 immediately whenever the circuit's orconn is + * blocked, and when a previously blocked orconn is unblocked. + */ + uint64_t next_cc_event; + + /** Are we in slow start? */ + bool in_slow_start; + + /** Is the local channel blocked on us? That's a congestion signal */ + bool blocked_chan; + + /* The following parameters are cached from consensus values upon + * circuit setup. */ + + /** Percent of cwnd to increment by during slow start */ + uint16_t cwnd_inc_pct_ss; + + /** Number of cells to increment cwnd by during steady state */ + uint16_t cwnd_inc; + + /** Minimum congestion window (must be at least sendme_inc) */ + uint16_t cwnd_min; + + /** + * Number of times per congestion window to update based on congestion + * signals */ + uint8_t cwnd_inc_rate; + + /** + * Number of cwnd worth of sendme acks to smooth RTT and BDP with, + * using N_EWMA */ + uint8_t ewma_cwnd_cnt; + + /** + * Minimum number of sendmes before we begin BDP estimates + */ + uint8_t bwe_sendme_min; + + /** + * Number of cells to ack with every sendme. Taken from consensus parameter + * and negotiation during circuit setup. */ + uint8_t sendme_inc; + + /** Which congestion control algorithm to use. Taken from + * consensus parameter and negotiation during circuit setup. */ + cc_alg_t cc_alg; + + /** Which algorithm to estimate circuit bandwidth with. Taken from + * consensus parameter during circuit setup. */ + bdp_alg_t bdp_alg; + + /** Algorithm-specific parameters. The specific struct that is used + * depends upon the algoritghm selected by the cc_alg parameter. + * These should not be accessed anywhere other than the algorithm-specific + * files. */ + union { + struct westwood_params_t westwood_params; + struct vegas_params_t vegas_params; + struct nola_params_t nola_params; + }; +} congestion_control_t; + +/** + * Returns the number of sendme acks we will recieve before we update cwnd. + * + * Congestion control literature recommends only one update of cwnd per + * cwnd worth of acks. However, we can also tune this to be more frequent + * by increasing the 'cc_cwnd_inc_rate' consensus parameter. + * + * If this returns 0 due to high cwnd_inc_rate, the calling code will + * update every sendme ack. + */ +static inline uint64_t CWND_UPDATE_RATE(const congestion_control_t *cc) +{ + /* We add cwnd_inc_rate*sendme_inc/2 to round to nearest integer number + * of acks */ + return ((cc->cwnd + cc->cwnd_inc_rate*cc->sendme_inc/2) + / (cc->cwnd_inc_rate*cc->sendme_inc)); +} + +/** + * Returns the amount to increment the congestion window each update, + * during slow start. + * + * Congestion control literature recommends either doubling the cwnd + * every cwnd during slow start, or some similar exponential growth + * (such as 50% more every cwnd, for Vegas). + * + * This is controlled by a consensus parameter 'cwnd_inc_pct_ss', which + * allows us to specify the percent of the current consensus window + * to update by. + */ +static inline uint64_t CWND_INC_SS(const congestion_control_t *cc) +{ + return (cc->cwnd_inc_pct_ss*cc->cwnd/100); +} + +/** + * Returns the amount to increment (and for Vegas, also decrement) the + * congestion window by, every update period. + * + * This is controlled by the cc_cwnd_inc consensus parameter. + */ +#define CWND_INC(cc) ((cc)->cwnd_inc) + +#endif /* !defined(CONGESTION_CONTROL_ST_H) */ diff --git a/src/core/or/congestion_control_vegas.c b/src/core/or/congestion_control_vegas.c new file mode 100644 index 0000000000..3206821f4c --- /dev/null +++ b/src/core/or/congestion_control_vegas.c @@ -0,0 +1,200 @@ +/* Copyright (c) 2019-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_vegas.c + * \brief Code that implements the TOR_VEGAS congestion control algorithm + * from Proposal #324. + */ + +#define TOR_CONGESTION_CONTROL_VEGAS_PRIVATE + +#include "core/or/or.h" + +#include "core/or/crypt_path.h" +#include "core/or/or_circuit_st.h" +#include "core/or/sendme.h" +#include "core/or/congestion_control_st.h" +#include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_vegas.h" +#include "core/or/circuitlist.h" +#include "core/or/circuituse.h" +#include "core/or/origin_circuit_st.h" +#include "core/or/channel.h" +#include "feature/nodelist/networkstatus.h" + +#define VEGAS_GAMMA(cc) (6*(cc)->sendme_inc) +#define VEGAS_ALPHA(cc) (3*(cc)->sendme_inc) +#define VEGAS_BETA(cc) (6*(cc)->sendme_inc) + +#define VEGAS_BDP_MIX_PCT 0 + +/** + * The original TCP Vegas used only a congestion window BDP estimator. We + * believe that the piecewise estimator is likely to perform better, but + * for purposes of experimentation, we might as well have a way to blend + * them. It also lets us set Vegas to its original estimator while other + * algorithms on the same network use piecewise (by setting the + * 'vegas_bdp_mix_pct' consensus parameter to 100, while leaving the + * 'cc_bdp_alg' parameter set to piecewise). + * + * Returns a percentage weighted average between the CWND estimator and + * the specified consensus BDP estimator. + */ +static inline uint64_t +vegas_bdp_mix(const congestion_control_t *cc) +{ + return cc->vegas_params.bdp_mix_pct*cc->bdp[BDP_ALG_CWND_RTT]/100 + + (100-cc->vegas_params.bdp_mix_pct)*cc->bdp[cc->bdp_alg]/100; +} + +/** + * Cache Vegas consensus parameters. + */ +void +congestion_control_vegas_set_params(congestion_control_t *cc) +{ + tor_assert(cc->cc_alg == CC_ALG_VEGAS); + + cc->vegas_params.gamma = + networkstatus_get_param(NULL, "cc_vegas_gamma", + VEGAS_GAMMA(cc), + 0, + 1000); + + cc->vegas_params.alpha = + networkstatus_get_param(NULL, "cc_vegas_alpha", + VEGAS_ALPHA(cc), + 0, + 1000); + + cc->vegas_params.beta = + networkstatus_get_param(NULL, "cc_vegas_beta", + VEGAS_BETA(cc), + 0, + 1000); + + cc->vegas_params.bdp_mix_pct = + networkstatus_get_param(NULL, "cc_vegas_bdp_mix", + VEGAS_BDP_MIX_PCT, + 0, + 100); +} + +/** + * Process a SENDME and update the congestion window according to the + * rules specified in TOR_VEGAS of Proposal #324. + * + * Essentially, this algorithm attempts to measure queue lengths on + * the circuit by subtracting the bandwidth-delay-product estimate + * from the current congestion window. + * + * If the congestion window is larger than the bandwidth-delay-product, + * then data is assumed to be queuing. We reduce the congestion window + * in that case. + * + * If the congestion window is smaller than the bandwidth-delay-product, + * then there is spare bandwidth capacity on the circuit. We increase the + * congestion window in that case. + * + * The congestion window is updated only once every congestion window worth of + * packets, even if the signal persists. It is also updated whenever the + * upstream orcon blocks, or unblocks. This minimizes local client queues. + */ +int +congestion_control_vegas_process_sendme(congestion_control_t *cc, + const circuit_t *circ, + const crypt_path_t *layer_hint) +{ + uint64_t queue_use; + + tor_assert(cc && cc->cc_alg == CC_ALG_VEGAS); + tor_assert(circ); + + /* Update ack counter until next congestion signal event is allowed */ + if (cc->next_cc_event) + cc->next_cc_event--; + + /* Compute BDP and RTT. If we did not update, don't run the alg */ + if (!congestion_control_update_circuit_estimates(cc, circ, layer_hint)) { + cc->inflight = cc->inflight - cc->sendme_inc; + return 0; + } + + /* We only update anything once per window */ + if (cc->next_cc_event == 0) { + /* The queue use is the amount in which our cwnd is above BDP; + * if it is below, then 0 queue use. */ + if (vegas_bdp_mix(cc) > cc->cwnd) + queue_use = 0; + else + queue_use = cc->cwnd - vegas_bdp_mix(cc); + + if (cc->in_slow_start) { + if (queue_use < cc->vegas_params.gamma && !cc->blocked_chan) { + /* Grow to BDP immediately, then exponential growth until + * congestion signal */ + cc->cwnd = MAX(cc->cwnd + CWND_INC_SS(cc), + vegas_bdp_mix(cc)); + } else { + /* Congestion signal: Fall back to Vegas equilibrium (BDP) */ + cc->cwnd = vegas_bdp_mix(cc); + cc->in_slow_start = 0; + log_info(LD_CIRC, "CC: TOR_VEGAS exiting slow start"); + } + } else { + if (queue_use > cc->vegas_params.beta || cc->blocked_chan) { + cc->cwnd -= CWND_INC(cc); + } else if (queue_use < cc->vegas_params.alpha) { + cc->cwnd += CWND_INC(cc); + } + } + + /* cwnd can never fall below 1 increment */ + cc->cwnd = MAX(cc->cwnd, cc->cwnd_min); + + /* Schedule next update */ + cc->next_cc_event = CWND_UPDATE_RATE(cc); + + if (CIRCUIT_IS_ORIGIN(circ)) { + log_info(LD_CIRC, + "CC: TOR_VEGAS Circuit %d " + "CWND: %"PRIu64", " + "INFL: %"PRIu64", " + "VBDP: %"PRIu64", " + "QUSE: %"PRIu64", " + "NCCE: %"PRIu64", " + "SS: %d", + CONST_TO_ORIGIN_CIRCUIT(circ)->global_identifier, + cc->cwnd, + cc->inflight, + vegas_bdp_mix(cc), + queue_use, + cc->next_cc_event, + cc->in_slow_start + ); + } else { + log_info(LD_CIRC, + "CC: TOR_VEGAS Circuit %"PRIu64":%d " + "CWND: %"PRIu64", " + "INFL: %"PRIu64", " + "VBDP: %"PRIu64", " + "QUSE: %"PRIu64", " + "NCCE: %"PRIu64", " + "SS: %d", + circ->n_chan->global_identifier, circ->n_circ_id, + cc->cwnd, + cc->inflight, + vegas_bdp_mix(cc), + queue_use, + cc->next_cc_event, + cc->in_slow_start + ); + } + } + + /* Update inflight with ack */ + cc->inflight = cc->inflight - cc->sendme_inc; + + return 0; +} diff --git a/src/core/or/congestion_control_vegas.h b/src/core/or/congestion_control_vegas.h new file mode 100644 index 0000000000..111345081c --- /dev/null +++ b/src/core/or/congestion_control_vegas.h @@ -0,0 +1,33 @@ +/* Copyright (c) 2019-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_vegas.h + * \brief Private-ish APIs for the TOR_VEGAS congestion control algorithm + **/ + +#ifndef TOR_CONGESTION_CONTROL_VEGAS_H +#define TOR_CONGESTION_CONTROL_VEGAS_H + +#include "core/or/crypt_path_st.h" +#include "core/or/circuit_st.h" + +/* Processing SENDME cell. */ +int congestion_control_vegas_process_sendme(struct congestion_control_t *cc, + const circuit_t *circ, + const crypt_path_t *layer_hint); +void congestion_control_vegas_set_params(struct congestion_control_t *cc); + +/* Private section starts. */ +#ifdef TOR_CONGESTION_CONTROL_VEGAS_PRIVATE + +/* + * Unit tests declaractions. + */ +#ifdef TOR_UNIT_TESTS + +#endif /* defined(TOR_UNIT_TESTS) */ + +#endif /* defined(TOR_CONGESTION_CONTROL_VEGAS_PRIVATE) */ + +#endif /* !defined(TOR_CONGESTION_CONTROL_VEGAS_H) */ diff --git a/src/core/or/congestion_control_westwood.c b/src/core/or/congestion_control_westwood.c new file mode 100644 index 0000000000..4b24234212 --- /dev/null +++ b/src/core/or/congestion_control_westwood.c @@ -0,0 +1,231 @@ +/* Copyright (c) 2019-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_westwood.c + * \brief Code that implements the TOR_WESTWOOD congestion control algorithm + * from Proposal #324. + */ + +#define TOR_CONGESTION_CONTROL_WESTWOOD_PRIVATE + +#include "core/or/or.h" + +#include "core/or/crypt_path.h" +#include "core/or/or_circuit_st.h" +#include "core/or/sendme.h" +#include "core/or/congestion_control_st.h" +#include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_westwood.h" +#include "core/or/circuitlist.h" +#include "core/or/circuituse.h" +#include "core/or/origin_circuit_st.h" +#include "core/or/channel.h" +#include "feature/nodelist/networkstatus.h" + +#define USEC_ONE_MS (1000) + +#define WESTWOOD_CWND_BACKOFF_M 75 +#define WESTWOOD_RTT_BACKOFF_M 100 +#define WESTWOOD_RTT_THRESH 33 +#define WESTWOOD_MIN_BACKOFF 0 + +/** + * Cache westwood consensus parameters. + */ +void +congestion_control_westwood_set_params(congestion_control_t *cc) +{ + tor_assert(cc->cc_alg == CC_ALG_WESTWOOD); + + cc->westwood_params.cwnd_backoff_m = + networkstatus_get_param(NULL, "cc_westwood_cwnd_m", + WESTWOOD_CWND_BACKOFF_M, + 0, + 100); + + cc->westwood_params.rtt_backoff_m = + networkstatus_get_param(NULL, "cc_westwood_rtt_m", + WESTWOOD_RTT_BACKOFF_M, + 50, + 100); + + cc->westwood_params.rtt_thresh = + networkstatus_get_param(NULL, "cc_westwood_rtt_thresh", + WESTWOOD_RTT_THRESH, + 0, + 100); + + cc->westwood_params.min_backoff = + networkstatus_get_param(NULL, "cc_westwood_min_backoff", + WESTWOOD_MIN_BACKOFF, + 0, + 1); +} + +/** + * Return the RTT threshhold that signals congestion. + * + * Computed from the threshold parameter that specifies a + * percent between the min and max RTT obseved so far. + */ +static inline uint64_t +westwood_rtt_signal(const congestion_control_t *cc) +{ + return ((100 - cc->westwood_params.rtt_thresh)*cc->min_rtt_usec + + cc->westwood_params.rtt_thresh*(cc)->max_rtt_usec)/100; +} + +/** + * Compute a backoff to reduce the max RTT. + * + * This may be necessary to ensure that westwood does not have + * a runaway condition where congestion inflates the max RTT, which + * inflates the congestion threshold. That cannot happen with one + * Westwood instance, but it may happen in aggregate. Hence, this is + * a safety parameter, in case we need it. + */ +static inline uint64_t +westwood_rtt_max_backoff(const congestion_control_t *cc) +{ + return cc->min_rtt_usec + + (cc->westwood_params.rtt_backoff_m * + (cc->max_rtt_usec - cc->min_rtt_usec))/100; +} + +/** + * Returns true if the circuit is experiencing congestion, as per + * TOR_WESTWOOD rules. + */ +static inline bool +westwood_is_congested(const congestion_control_t *cc) +{ + /* If the local channel is blocked, that is always congestion */ + if (cc->blocked_chan) + return true; + + /* If the min RTT is within 1ms of the signal, then there is not enough + * range in RTTs to signify congestion. Treat that as not congested. */ + if (westwood_rtt_signal(cc) < cc->min_rtt_usec || + westwood_rtt_signal(cc) - cc->min_rtt_usec < USEC_ONE_MS) + return false; + + /* If the EWMA-smoothed RTT exceeds the westwood RTT threshhold, + * then it is congestion. */ + if (cc->ewma_rtt_usec > westwood_rtt_signal(cc)) + return true; + + return false; +} + +/** + * Process a SENDME and update the congestion window according to the + * rules specified in TOR_WESTWOOD of Proposal #324. + * + * Essentially, this algorithm uses a threshhold of 'rtt_thresh', which + * is a midpoint between the min and max RTT. If the RTT exceeds this + * threshhold, then queue delay due to congestion is assumed to be present, + * and the algirithm reduces the congestion window. If the RTT is below the + * threshhold, the circuit is not congested (ie: queue delay is low), and we + * increase the congestion window. + * + * The congestion window is updated only once every congestion window worth of + * packets, even if the signal persists. It is also updated whenever the + * upstream orcon blocks, or unblocks. This minimizes local client queues. + */ +int +congestion_control_westwood_process_sendme(congestion_control_t *cc, + const circuit_t *circ, + const crypt_path_t *layer_hint) +{ + tor_assert(cc && cc->cc_alg == CC_ALG_WESTWOOD); + tor_assert(circ); + + /* Update ack counter until next congestion signal event is allowed */ + if (cc->next_cc_event) + cc->next_cc_event--; + + /* If we were unable to update our circuit estimates, Westwood must + * *not* update its cwnd, otherwise it could run to infinity, or to 0. + * Just update inflight from the sendme and return. */ + if (!congestion_control_update_circuit_estimates(cc, circ, layer_hint)) { + cc->inflight = cc->inflight - cc->sendme_inc; + return 0; + } + + /* We only update anything once per window */ + if (cc->next_cc_event == 0) { + if (!westwood_is_congested(cc)) { + if (cc->in_slow_start) { + cc->cwnd = MAX(cc->cwnd + CWND_INC_SS(cc), + cc->bdp[cc->bdp_alg]); + } else { + cc->cwnd = cc->cwnd + CWND_INC(cc); + } + } else { + if (cc->westwood_params.min_backoff) + cc->cwnd = MIN(cc->cwnd*cc->westwood_params.cwnd_backoff_m/100, + cc->bdp[cc->bdp_alg]); + else + cc->cwnd = MAX(cc->cwnd*cc->westwood_params.cwnd_backoff_m/100, + cc->bdp[cc->bdp_alg]); + + cc->in_slow_start = 0; + + // Because Westwood's congestion can runaway and boost max rtt, + // which increases its congestion signal, we backoff the max rtt + // too. + cc->max_rtt_usec = westwood_rtt_max_backoff(cc); + + log_info(LD_CIRC, "CC: TOR_WESTWOOD congestion. New max RTT: %"PRIu64, + cc->max_rtt_usec/1000); + } + + /* cwnd can never fall below 1 increment */ + cc->cwnd = MAX(cc->cwnd, cc->cwnd_min); + + /* Schedule next update */ + cc->next_cc_event = CWND_UPDATE_RATE(cc); + + if (CIRCUIT_IS_ORIGIN(circ)) { + log_info(LD_CIRC, + "CC: TOR_WESTWOOD Circuit %d " + "CWND: %"PRIu64", " + "INFL: %"PRIu64", " + "NCCE: %"PRIu64", " + "WRTT: %"PRIu64", " + "WSIG: %"PRIu64", " + "SS: %d", + CONST_TO_ORIGIN_CIRCUIT(circ)->global_identifier, + cc->cwnd, + cc->inflight, + cc->next_cc_event, + cc->ewma_rtt_usec/1000, + westwood_rtt_signal(cc)/1000, + cc->in_slow_start + ); + } else { + log_info(LD_CIRC, + "CC: TOR_WESTWOOD Circuit %"PRIu64":%d " + "CWND: %"PRIu64", " + "INFL: %"PRIu64", " + "NCCE: %"PRIu64", " + "WRTT: %"PRIu64", " + "WSIG: %"PRIu64", " + "SS: %d", + circ->n_chan->global_identifier, circ->n_circ_id, + cc->cwnd, + cc->inflight, + cc->next_cc_event, + cc->ewma_rtt_usec/1000, + westwood_rtt_signal(cc)/1000, + cc->in_slow_start + ); + } + } + + /* Update inflight with ack */ + cc->inflight = cc->inflight - cc->sendme_inc; + + return 0; +} diff --git a/src/core/or/congestion_control_westwood.h b/src/core/or/congestion_control_westwood.h new file mode 100644 index 0000000000..c6fd596df4 --- /dev/null +++ b/src/core/or/congestion_control_westwood.h @@ -0,0 +1,33 @@ +/* Copyright (c) 2019-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_westwood.h + * \brief Private-ish APIs for the TOR_WESTWOOD congestion control algorithm + **/ + +#ifndef TOR_CONGESTION_CONTROL_WESTWOOD_H +#define TOR_CONGESTION_CONTROL_WESTWOOD_H + +#include "core/or/crypt_path_st.h" +#include "core/or/circuit_st.h" + +/* Processing SENDME cell. */ +int congestion_control_westwood_process_sendme(struct congestion_control_t *cc, + const circuit_t *circ, + const crypt_path_t *layer_hint); +void congestion_control_westwood_set_params(struct congestion_control_t *cc); + +/* Private section starts. */ +#ifdef TOR_CONGESTION_CONTROL_WESTWOOD_PRIVATE + +/* + * Unit tests declaractions. + */ +#ifdef TOR_UNIT_TESTS + +#endif /* defined(TOR_UNIT_TESTS) */ + +#endif /* defined(TOR_CONGESTION_CONTROL_WESTWOOD_PRIVATE) */ + +#endif /* !defined(TOR_CONGESTION_CONTROL_WESTWOOD_H) */ diff --git a/src/core/or/connection_edge.c b/src/core/or/connection_edge.c index d3979b3a7e..ea4bf00735 100644 --- a/src/core/or/connection_edge.c +++ b/src/core/or/connection_edge.c @@ -69,6 +69,8 @@ #include "core/or/circuituse.h" #include "core/or/circuitpadding.h" #include "core/or/connection_edge.h" +#include "core/or/congestion_control_flow.h" +#include "core/or/circuitstats.h" #include "core/or/connection_or.h" #include "core/or/extendinfo.h" #include "core/or/policies.h" @@ -614,20 +616,39 @@ connection_half_edge_add(const edge_connection_t *conn, half_conn->stream_id = conn->stream_id; - // How many sendme's should I expect? - half_conn->sendmes_pending = - (STREAMWINDOW_START-conn->package_window)/STREAMWINDOW_INCREMENT; - // Is there a connected cell pending? half_conn->connected_pending = conn->base_.state == AP_CONN_STATE_CONNECT_WAIT; - /* Data should only arrive if we're not waiting on a resolved cell. - * It can arrive after waiting on connected, because of optimistic - * data. */ - if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) { - // How many more data cells can arrive on this id? - half_conn->data_pending = conn->deliver_window; + if (edge_uses_flow_control(conn)) { + /* If the edge uses the new congestion control flow control, we must use + * time-based limits on half-edge activity. */ + uint64_t timeout_usec = (uint64_t)(get_circuit_build_timeout_ms()*1000); + half_conn->used_ccontrol = 1; + + /* If this is an onion service circuit, double the CBT as an approximate + * value for the other half of the circuit */ + if (conn->hs_ident) { + timeout_usec *= 2; + } + + /* The stream should stop seeing any use after the larger of the circuit + * RTT and the overall circuit build timeout */ + half_conn->end_ack_expected_usec = MAX(timeout_usec, + edge_get_max_rtt(conn)) + + monotime_absolute_usec(); + } else { + // How many sendme's should I expect? + half_conn->sendmes_pending = + (STREAMWINDOW_START-conn->package_window)/STREAMWINDOW_INCREMENT; + + /* Data should only arrive if we're not waiting on a resolved cell. + * It can arrive after waiting on connected, because of optimistic + * data. */ + if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) { + // How many more data cells can arrive on this id? + half_conn->data_pending = conn->deliver_window; + } } insert_at = smartlist_bsearch_idx(circ->half_streams, &half_conn->stream_id, @@ -688,6 +709,12 @@ connection_half_edge_is_valid_data(const smartlist_t *half_conns, if (!half) return 0; + if (half->used_ccontrol) { + if (monotime_absolute_usec() > half->end_ack_expected_usec) + return 0; + return 1; + } + if (half->data_pending > 0) { half->data_pending--; return 1; @@ -740,6 +767,10 @@ connection_half_edge_is_valid_sendme(const smartlist_t *half_conns, if (!half) return 0; + /* congestion control edges don't use sendmes */ + if (half->used_ccontrol) + return 0; + if (half->sendmes_pending > 0) { half->sendmes_pending--; return 1; @@ -1269,15 +1300,6 @@ connection_ap_rescan_and_attach_pending(void) connection_ap_attach_pending(1); } -#ifdef DEBUGGING_17659 -#define UNMARK() do { \ - entry_conn->marked_pending_circ_line = 0; \ - entry_conn->marked_pending_circ_file = 0; \ - } while (0) -#else /* !defined(DEBUGGING_17659) */ -#define UNMARK() do { } while (0) -#endif /* defined(DEBUGGING_17659) */ - /** Tell any AP streams that are listed as waiting for a new circuit to try * again. If there is an available circuit for a stream, attach it. Otherwise, * launch a new circuit. @@ -1306,21 +1328,18 @@ connection_ap_attach_pending(int retry) connection_t *conn = ENTRY_TO_CONN(entry_conn); tor_assert(conn && entry_conn); if (conn->marked_for_close) { - UNMARK(); continue; } if (conn->magic != ENTRY_CONNECTION_MAGIC) { log_warn(LD_BUG, "%p has impossible magic value %u.", entry_conn, (unsigned)conn->magic); - UNMARK(); continue; } if (conn->state != AP_CONN_STATE_CIRCUIT_WAIT) { - log_warn(LD_BUG, "%p is no longer in circuit_wait. Its current state " - "is %s. Why is it on pending_entry_connections?", - entry_conn, - conn_state_to_string(conn->type, conn->state)); - UNMARK(); + /* The connection_ap_handshake_attach_circuit() call, for onion service, + * can lead to more than one connections in the "pending" list to change + * state and so it is OK to get here. Ignore it because this connection + * won't be in pending_entry_connections list after this point. */ continue; } @@ -1345,7 +1364,6 @@ connection_ap_attach_pending(int retry) /* If we got here, then we either closed the connection, or * we attached it. */ - UNMARK(); } SMARTLIST_FOREACH_END(entry_conn); smartlist_free(pending); @@ -1416,7 +1434,6 @@ connection_ap_mark_as_non_pending_circuit(entry_connection_t *entry_conn) { if (PREDICT_UNLIKELY(NULL == pending_entry_connections)) return; - UNMARK(); smartlist_remove(pending_entry_connections, entry_conn); } @@ -1612,23 +1629,6 @@ consider_plaintext_ports(entry_connection_t *conn, uint16_t port) return 0; } -/** Return true iff <b>query</b> is a syntactically valid service ID (as - * generated by rend_get_service_id). */ -static int -rend_valid_v2_service_id(const char *query) -{ - /** Length of 'y' portion of 'y.onion' URL. */ -#define REND_SERVICE_ID_LEN_BASE32 16 - - if (strlen(query) != REND_SERVICE_ID_LEN_BASE32) - return 0; - - if (strspn(query, BASE32_CHARS) != REND_SERVICE_ID_LEN_BASE32) - return 0; - - return 1; -} - /** Parse the given hostname in address. Returns true if the parsing was * successful and type_out contains the type of the hostname. Else, false is * returned which means it was not recognized and type_out is set to @@ -1692,14 +1692,6 @@ parse_extended_hostname(char *address, hostname_type_t *type_out) if (q != address) { memmove(address, q, strlen(q) + 1 /* also get \0 */); } - /* v2 onion address check. */ - if (strlen(query) == REND_SERVICE_ID_LEN_BASE32) { - *type_out = ONION_V2_HOSTNAME; - if (rend_valid_v2_service_id(query)) { - goto success; - } - goto failed; - } /* v3 onion address check. */ if (strlen(query) == HS_SERVICE_ADDR_LEN_BASE32) { @@ -1719,8 +1711,7 @@ parse_extended_hostname(char *address, hostname_type_t *type_out) failed: /* otherwise, return to previous state and return 0 */ *s = '.'; - const bool is_onion = (*type_out == ONION_V2_HOSTNAME) || - (*type_out == ONION_V3_HOSTNAME); + const bool is_onion = (*type_out == ONION_V3_HOSTNAME); log_warn(LD_APP, "Invalid %shostname %s; rejecting", is_onion ? "onion " : "", safe_str_client(address)); @@ -2242,7 +2233,7 @@ connection_ap_handshake_rewrite_and_attach(entry_connection_t *conn, } /* Now, we handle everything that isn't a .onion address. */ - if (addresstype != ONION_V3_HOSTNAME && addresstype != ONION_V2_HOSTNAME) { + if (addresstype != ONION_V3_HOSTNAME) { /* Not a hidden-service request. It's either a hostname or an IP, * possibly with a .exit that we stripped off. We're going to check * if we're allowed to connect/resolve there, and then launch the @@ -2527,28 +2518,6 @@ connection_ap_handshake_rewrite_and_attach(entry_connection_t *conn, return 0; } else { /* If we get here, it's a request for a .onion address! */ - - /* We don't support v2 onions anymore. Log a warning and bail. */ - if (addresstype == ONION_V2_HOSTNAME) { - static bool log_once = false; - if (!log_once) { - log_warn(LD_PROTOCOL, "Tried to connect to a v2 onion address, but " - "this version of Tor no longer supports them. Please " - "encourage the site operator to upgrade. For more " - "information see " - "https://blog.torproject.org/v2-deprecation-timeline."); - log_once = true; - } - control_event_client_status(LOG_WARN, "SOCKS_BAD_HOSTNAME HOSTNAME=%s", - escaped(socks->address)); - /* Send back the 0xF6 extended code indicating a bad hostname. This is - * mostly so Tor Browser can make a proper UX with regards to v2 - * addresses. */ - conn->socks_request->socks_extended_error_code = SOCKS5_HS_BAD_ADDRESS; - connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL); - return -1; - } - tor_assert(addresstype == ONION_V3_HOSTNAME); tor_assert(!automap); return connection_ap_handle_onion(conn, socks, circ); diff --git a/src/core/or/connection_edge.h b/src/core/or/connection_edge.h index 72869f348b..966a9391d8 100644 --- a/src/core/or/connection_edge.h +++ b/src/core/or/connection_edge.h @@ -80,7 +80,6 @@ typedef enum hostname_type_t { BAD_HOSTNAME, EXIT_HOSTNAME, NORMAL_HOSTNAME, - ONION_V2_HOSTNAME, ONION_V3_HOSTNAME, } hostname_type_t; diff --git a/src/core/or/connection_or.c b/src/core/or/connection_or.c index dd31638eb3..db9f93e6f6 100644 --- a/src/core/or/connection_or.c +++ b/src/core/or/connection_or.c @@ -65,6 +65,7 @@ #include "core/or/scheduler.h" #include "feature/nodelist/torcert.h" #include "core/or/channelpadding.h" +#include "core/or/congestion_control_common.h" #include "feature/dirauth/authmode.h" #include "feature/hs/hs_service.h" @@ -636,7 +637,7 @@ connection_or_flushed_some(or_connection_t *conn) /* If we're under the low water mark, add cells until we're just over the * high water mark. */ datalen = connection_get_outbuf_len(TO_CONN(conn)); - if (datalen < OR_CONN_LOWWATER) { + if (datalen < or_conn_lowwatermark()) { /* Let the scheduler know */ scheduler_channel_wants_writes(TLS_CHAN_TO_BASE(conn->chan)); } @@ -660,9 +661,9 @@ connection_or_num_cells_writeable(or_connection_t *conn) * used to trigger when to start writing after we've stopped. */ datalen = connection_get_outbuf_len(TO_CONN(conn)); - if (datalen < OR_CONN_HIGHWATER) { + if (datalen < or_conn_highwatermark()) { cell_network_size = get_cell_network_size(conn->wide_circ_ids); - n = CEIL_DIV(OR_CONN_HIGHWATER - datalen, cell_network_size); + n = CEIL_DIV(or_conn_highwatermark() - datalen, cell_network_size); } return n; diff --git a/src/core/or/crypt_path.c b/src/core/or/crypt_path.c index 29356d7c2a..7673bc306f 100644 --- a/src/core/or/crypt_path.c +++ b/src/core/or/crypt_path.c @@ -27,6 +27,7 @@ #include "core/or/circuitbuild.h" #include "core/or/circuitlist.h" #include "core/or/extendinfo.h" +#include "core/or/congestion_control_common.h" #include "lib/crypt_ops/crypto_dh.h" #include "lib/crypt_ops/crypto_util.h" @@ -165,6 +166,7 @@ cpath_free(crypt_path_t *victim) onion_handshake_state_release(&victim->handshake_state); crypto_dh_free(victim->rend_dh_handshake_state); extend_info_free(victim->extend_info); + congestion_control_free(victim->ccontrol); memwipe(victim, 0xBB, sizeof(crypt_path_t)); /* poison memory */ tor_free(victim); diff --git a/src/core/or/crypt_path_st.h b/src/core/or/crypt_path_st.h index 2529b6ee41..ddc85eec14 100644 --- a/src/core/or/crypt_path_st.h +++ b/src/core/or/crypt_path_st.h @@ -29,6 +29,8 @@ struct onion_handshake_state_t { } u; }; +struct congestion_control_t; + /** Macro to encapsulate private members of a struct. * * Renames 'x' to 'x_crypt_path_private_field'. @@ -80,6 +82,9 @@ struct crypt_path_t { int deliver_window; /**< How many cells are we willing to deliver originating * at this step? */ + /** Congestion control info */ + struct congestion_control_t *ccontrol; + /*********************** Private members ****************************/ /** Private member: Cryptographic state used for encrypting and diff --git a/src/core/or/edge_connection_st.h b/src/core/or/edge_connection_st.h index 0120c3df25..dab32fc8d0 100644 --- a/src/core/or/edge_connection_st.h +++ b/src/core/or/edge_connection_st.h @@ -15,6 +15,7 @@ #include "core/or/or.h" #include "core/or/connection_st.h" +#include "lib/evloop/token_bucket.h" /** Subtype of connection_t for an "edge connection" -- that is, an entry (ap) * connection, or an exit. */ @@ -73,6 +74,60 @@ struct edge_connection_t { * that's going away and being used on channels instead. We still tag * edge connections with dirreq_id from circuits, so it's copied here. */ uint64_t dirreq_id; + + /* The following are flow control fields */ + + /** Used for rate limiting the read side of this edge connection when + * congestion control is enabled on its circuit. The XON cell ewma_drain_rate + * parameter is used to set the bucket limits. */ + token_bucket_rw_t bucket; + + /** + * Monotime timestamp of the last time we sent a flow control message + * for this edge, used to compute advisory rates */ + uint64_t drain_start_usec; + + /** + * Number of bytes written since we either emptied our buffers, + * or sent an advisory drate rate. Can wrap, buf if so, + * we must reset the usec timestamp above. (Or make this u64, idk). + */ + uint32_t drained_bytes; + uint32_t prev_drained_bytes; + + /** + * N_EWMA of the drain rate of writes on this edge conn + * while buffers were present. + */ + uint32_t ewma_drain_rate; + + /** + * The ewma drain rate the last time we sent an xon. + */ + uint32_t ewma_rate_last_sent; + + /** + * The following fields are used to count the total bytes sent on this + * stream, and compare them to the number of XON and XOFFs recieved, so + * that clients can check rate limits of XOFF/XON to prevent dropmark + * attacks. */ + uint32_t total_bytes_xmit; + + /** Number of XOFFs received */ + uint8_t num_xoff_recv; + + /** Number of XONs received */ + uint8_t num_xon_recv; + + /** + * Flag that tells us if an XOFF has been sent; cleared when we send an XON. + * Used to avoid sending multiple */ + uint8_t xoff_sent : 1; + + /** Flag that tells us if an XOFF has been received; cleared when we get + * an XON. Used to ensure that this edge keeps reads on its edge socket + * disabled. */ + uint8_t xoff_received : 1; }; #endif /* !defined(EDGE_CONNECTION_ST_H) */ diff --git a/src/core/or/half_edge_st.h b/src/core/or/half_edge_st.h index c956c7434a..ac97eb19f1 100644 --- a/src/core/or/half_edge_st.h +++ b/src/core/or/half_edge_st.h @@ -31,6 +31,18 @@ typedef struct half_edge_t { * our deliver window */ int data_pending; + /** + * Monotime timestamp of when the other end should have successfuly + * shut down the stream and stop sending data, based on the larger + * of circuit RTT and CBT. Used if 'used_ccontrol' is true, to expire + * the half_edge at this monotime timestamp. */ + uint64_t end_ack_expected_usec; + + /** + * Did this edge use congestion control? If so, use + * timer instead of pending data approach */ + int used_ccontrol : 1; + /** Is there a connected cell pending? */ int connected_pending : 1; } half_edge_t; diff --git a/src/core/or/include.am b/src/core/or/include.am index b578b75673..b08f8509cc 100644 --- a/src/core/or/include.am +++ b/src/core/or/include.am @@ -28,13 +28,17 @@ LIBTOR_APP_A_SOURCES += \ src/core/or/orconn_event.c \ src/core/or/policies.c \ src/core/or/protover.c \ - src/core/or/protover_rust.c \ src/core/or/reasons.c \ src/core/or/relay.c \ src/core/or/scheduler.c \ src/core/or/scheduler_kist.c \ src/core/or/scheduler_vanilla.c \ src/core/or/sendme.c \ + src/core/or/congestion_control_common.c \ + src/core/or/congestion_control_vegas.c \ + src/core/or/congestion_control_nola.c \ + src/core/or/congestion_control_westwood.c \ + src/core/or/congestion_control_flow.c \ src/core/or/status.c \ src/core/or/versions.c @@ -57,6 +61,7 @@ noinst_HEADERS += \ src/core/or/circuitpadding_machines.h \ src/core/or/circuituse.h \ src/core/or/command.h \ + src/core/or/congestion_control_st.h \ src/core/or/connection_edge.h \ src/core/or/connection_or.h \ src/core/or/connection_st.h \ @@ -77,6 +82,7 @@ noinst_HEADERS += \ src/core/or/entry_port_cfg_st.h \ src/core/or/extend_info_st.h \ src/core/or/listener_connection_st.h \ + src/core/or/lttng_cc.inc \ src/core/or/lttng_circuit.inc \ src/core/or/onion.h \ src/core/or/or.h \ @@ -97,6 +103,11 @@ noinst_HEADERS += \ src/core/or/relay_crypto_st.h \ src/core/or/scheduler.h \ src/core/or/sendme.h \ + src/core/or/congestion_control_flow.h \ + src/core/or/congestion_control_common.h \ + src/core/or/congestion_control_vegas.h \ + src/core/or/congestion_control_nola.h \ + src/core/or/congestion_control_westwood.h \ src/core/or/server_port_cfg_st.h \ src/core/or/socks_request_st.h \ src/core/or/status.h \ @@ -106,7 +117,9 @@ noinst_HEADERS += \ if USE_TRACING_INSTRUMENTATION_LTTNG LIBTOR_APP_A_SOURCES += \ + src/core/or/trace_probes_cc.c \ src/core/or/trace_probes_circuit.c noinst_HEADERS += \ + src/core/or/trace_probes_cc.h \ src/core/or/trace_probes_circuit.h endif diff --git a/src/core/or/lttng_cc.inc b/src/core/or/lttng_cc.inc new file mode 100644 index 0000000000..b7bf58e196 --- /dev/null +++ b/src/core/or/lttng_cc.inc @@ -0,0 +1,166 @@ +/* Copyright (c) 2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file lttng_cc.inc + * \brief LTTng tracing probe declaration for the congestion control subsystem. + * It is in this .inc file due to the non C standard syntax and the way + * we guard the header with the LTTng specific + * TRACEPOINT_HEADER_MULTI_READ. + **/ + +#include "orconfig.h" + +/* We only build the following if LTTng instrumentation has been enabled. */ +#ifdef USE_TRACING_INSTRUMENTATION_LTTNG + +/* The following defines are LTTng-UST specific. */ +#undef TRACEPOINT_PROVIDER +#define TRACEPOINT_PROVIDER tor_cc + +#undef TRACEPOINT_INCLUDE +#define TRACEPOINT_INCLUDE "./src/core/or/lttng_cc.inc" + +#if !defined(LTTNG_CC_INC) || defined(TRACEPOINT_HEADER_MULTI_READ) +#define LTTNG_CC_INC + +#include <lttng/tracepoint.h> + +/* + * Flow Control + */ + +/* Emitted everytime the flow_control_decide_xon() function is called. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xon, + TP_ARGS(const edge_connection_t *, stream, size_t, n_written), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(size_t, written_bytes, n_written) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* Emitted when flow control starts measuring the drain rate. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xon_drain_start, + TP_ARGS(const edge_connection_t *, stream), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* Emitted when the drain rate is updated. The new_drain_rate value is what was + * just computed. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xon_drain_update, + TP_ARGS(const edge_connection_t *, stream, uint32_t, drain_rate), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, new_drain_rate, drain_rate) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* Emitted when an XON cell is sent due to a notice in a drain rate change. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xon_rate_change, + TP_ARGS(const edge_connection_t *, stream), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* Emitted when an XON cell is sent because we partially or fully drained the + * edge connection buffer. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xon_partial_drain, + TP_ARGS(const edge_connection_t *, stream), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* Emitted when we double the drain rate which is an attempt to see if we can + * speed things up. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xon_drain_doubled, + TP_ARGS(const edge_connection_t *, stream), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* XOFF */ + +/* Emitted when we send an XOFF cell. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xoff_sending, + TP_ARGS(const edge_connection_t *, stream), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* + * Congestion Control + */ + +/* Emitted when the BDP value has been updated. */ +TRACEPOINT_EVENT(tor_cc, bdp_update, + TP_ARGS(const circuit_t *, circ, const congestion_control_t *, cc, + uint64_t, curr_rtt_usec, uint64_t, sendme_rate_bdp), + TP_FIELDS( + ctf_integer(uint64_t, circuit_ptr, circ) + ctf_integer(uint32_t, n_circ_id, circ->n_circ_id) + ctf_integer(uint64_t, min_rtt_usec, cc->min_rtt_usec) + ctf_integer(uint64_t, curr_rtt_usec, curr_rtt_usec) + ctf_integer(uint64_t, ewma_rtt_usec, cc->ewma_rtt_usec) + ctf_integer(uint64_t, max_rtt_usec, cc->max_rtt_usec) + ctf_integer(uint64_t, bdp_inflight_rtt, cc->bdp[BDP_ALG_INFLIGHT_RTT]) + ctf_integer(uint64_t, bdp_cwnd_rtt, cc->bdp[BDP_ALG_CWND_RTT]) + ctf_integer(uint64_t, bdp_sendme_rate, cc->bdp[BDP_ALG_SENDME_RATE]) + ctf_integer(uint64_t, bdp_piecewise, cc->bdp[BDP_ALG_PIECEWISE]) + ctf_integer(uint64_t, sendme_rate_bdp, sendme_rate_bdp) + ) +) + +#endif /* LTTNG_CC_INC || TRACEPOINT_HEADER_MULTI_READ */ + +/* Must be included after the probes declaration. */ +#include <lttng/tracepoint-event.h> + +#endif /* USE_TRACING_INSTRUMENTATION_LTTNG */ diff --git a/src/core/or/or.h b/src/core/or/or.h index 99948f26e2..392a848ee7 100644 --- a/src/core/or/or.h +++ b/src/core/or/or.h @@ -210,6 +210,9 @@ struct curve25519_public_key_t; #define RELAY_COMMAND_PADDING_NEGOTIATE 41 #define RELAY_COMMAND_PADDING_NEGOTIATED 42 +#define RELAY_COMMAND_XOFF 43 +#define RELAY_COMMAND_XON 44 + /* Reasons why an OR connection is closed. */ #define END_OR_CONN_REASON_DONE 1 #define END_OR_CONN_REASON_REFUSED 2 /* connection refused */ @@ -591,18 +594,6 @@ typedef struct or_handshake_state_t or_handshake_state_t; /** Length of Extended ORPort connection identifier. */ #define EXT_OR_CONN_ID_LEN DIGEST_LEN /* 20 */ -/* - * OR_CONN_HIGHWATER and OR_CONN_LOWWATER moved from connection_or.c so - * channeltls.c can see them too. - */ - -/** When adding cells to an OR connection's outbuf, keep adding until the - * outbuf is at least this long, or we run out of cells. */ -#define OR_CONN_HIGHWATER (32*1024) - -/** Add cells to an OR connection's outbuf whenever the outbuf's data length - * drops below this size. */ -#define OR_CONN_LOWWATER (16*1024) typedef struct connection_t connection_t; typedef struct control_connection_t control_connection_t; diff --git a/src/core/or/or_circuit_st.h b/src/core/or/or_circuit_st.h index b8fbf9658e..11695ec301 100644 --- a/src/core/or/or_circuit_st.h +++ b/src/core/or/or_circuit_st.h @@ -52,6 +52,10 @@ struct or_circuit_t { /** Stores KH for the handshake. */ char rend_circ_nonce[DIGEST_LEN];/* KH in tor-spec.txt */ + /** Number of cells which we have discarded because of having no next hop, + * despite not recognizing the cell. */ + uint32_t n_cells_discarded_at_end; + /** How many more relay_early cells can we send on this circuit, according * to the specification? */ unsigned int remaining_relay_early_cells : 4; @@ -93,4 +97,3 @@ struct or_circuit_t { }; #endif /* !defined(OR_CIRCUIT_ST_H) */ - diff --git a/src/core/or/origin_circuit_st.h b/src/core/or/origin_circuit_st.h index 9264077c50..6c86a56000 100644 --- a/src/core/or/origin_circuit_st.h +++ b/src/core/or/origin_circuit_st.h @@ -180,6 +180,12 @@ struct origin_circuit_t { unsigned first_hop_from_controller : 1; /** + * If true, this circuit's path has been chosen, in full or in part, + * by the controller API, and it's okay to ignore checks that we'd + * usually do on the path as whole. */ + unsigned int any_hop_from_controller : 1; + + /** * Tristate variable to guard against pathbias miscounting * due to circuit purpose transitions changing the decision * of pathbias_should_count(). This variable is informational diff --git a/src/core/or/policies.c b/src/core/or/policies.c index f91c23ad31..a53849b4d0 100644 --- a/src/core/or/policies.c +++ b/src/core/or/policies.c @@ -59,6 +59,9 @@ static smartlist_t *authdir_invalid_policy = NULL; /** Policy that addresses for incoming router descriptors must <b>not</b> * match in order to not be marked as BadExit. */ static smartlist_t *authdir_badexit_policy = NULL; +/** Policy that addresses for incoming router descriptors must <b>not</b> + * match in order to not be marked as MiddleOnly. */ +static smartlist_t *authdir_middleonly_policy = NULL; /** Parsed addr_policy_t describing which addresses we believe we can start * circuits at. */ @@ -1119,6 +1122,17 @@ authdir_policy_badexit_address(const tor_addr_t *addr, uint16_t port) return addr_is_in_cc_list(addr, get_options()->AuthDirBadExitCCs); } +/** Return 1 if <b>addr</b>:<b>port</b> should be marked as MiddleOnly, + * based on <b>authdir_middleonly_policy</b>. Else return 0. + */ +int +authdir_policy_middleonly_address(const tor_addr_t *addr, uint16_t port) +{ + if (!addr_policy_permits_tor_addr(addr, port, authdir_middleonly_policy)) + return 1; + return addr_is_in_cc_list(addr, get_options()->AuthDirMiddleOnlyCCs); +} + #define REJECT(arg) \ STMT_BEGIN *msg = tor_strdup(arg); goto err; STMT_END @@ -1173,6 +1187,9 @@ validate_addr_policies(const or_options_t *options, char **msg) if (parse_addr_policy(options->AuthDirBadExit, &addr_policy, ADDR_POLICY_REJECT)) REJECT("Error in AuthDirBadExit entry."); + if (parse_addr_policy(options->AuthDirMiddleOnly, &addr_policy, + ADDR_POLICY_REJECT)) + REJECT("Error in AuthDirMiddleOnly entry."); if (parse_addr_policy(options->ReachableAddresses, &addr_policy, ADDR_POLICY_ACCEPT)) @@ -1266,6 +1283,9 @@ policies_parse_from_options(const or_options_t *options) if (load_policy_from_option(options->AuthDirBadExit, "AuthDirBadExit", &authdir_badexit_policy, ADDR_POLICY_REJECT) < 0) ret = -1; + if (load_policy_from_option(options->AuthDirMiddleOnly, "AuthDirMiddleOnly", + &authdir_middleonly_policy, ADDR_POLICY_REJECT) < 0) + ret = -1; if (parse_metrics_port_policy(options) < 0) { ret = -1; } @@ -3112,6 +3132,8 @@ policies_free_all(void) authdir_invalid_policy = NULL; addr_policy_list_free(authdir_badexit_policy); authdir_badexit_policy = NULL; + addr_policy_list_free(authdir_middleonly_policy); + authdir_middleonly_policy = NULL; if (!HT_EMPTY(&policy_root)) { policy_map_ent_t **ent; diff --git a/src/core/or/policies.h b/src/core/or/policies.h index a32f50ab1d..e11e1d0ff5 100644 --- a/src/core/or/policies.h +++ b/src/core/or/policies.h @@ -106,6 +106,7 @@ int metrics_policy_permits_address(const tor_addr_t *addr); int authdir_policy_permits_address(const tor_addr_t *addr, uint16_t port); int authdir_policy_valid_address(const tor_addr_t *addr, uint16_t port); int authdir_policy_badexit_address(const tor_addr_t *addr, uint16_t port); +int authdir_policy_middleonly_address(const tor_addr_t *addr, uint16_t port); int validate_addr_policies(const or_options_t *options, char **msg); void policy_expand_private(smartlist_t **policy); diff --git a/src/core/or/protover.c b/src/core/or/protover.c index 04df8aeeb8..199fc830a0 100644 --- a/src/core/or/protover.c +++ b/src/core/or/protover.c @@ -28,8 +28,6 @@ #include "core/or/versions.h" #include "lib/tls/tortls.h" -#ifndef HAVE_RUST - static const smartlist_t *get_supported_protocol_list(void); static int protocol_list_contains(const smartlist_t *protos, protocol_type_t pr, uint32_t ver); @@ -855,5 +853,3 @@ protover_free_all(void) supported_protocol_list = NULL; } } - -#endif /* !defined(HAVE_RUST) */ diff --git a/src/core/or/protover.h b/src/core/or/protover.h index c0739a092e..ae258d74a5 100644 --- a/src/core/or/protover.h +++ b/src/core/or/protover.h @@ -103,13 +103,13 @@ typedef struct proto_entry_t { uint64_t bitmask; } proto_entry_t; -#if !defined(HAVE_RUST) && defined(TOR_UNIT_TESTS) +#if defined(TOR_UNIT_TESTS) STATIC struct smartlist_t *parse_protocol_list(const char *s); STATIC char *encode_protocol_list(const struct smartlist_t *sl); STATIC const char *protocol_type_to_str(protocol_type_t pr); STATIC int str_to_protocol_type(const char *s, protocol_type_t *pr_out); STATIC void proto_entry_free_(proto_entry_t *entry); -#endif /* !defined(HAVE_RUST) && defined(TOR_UNIT_TESTS) */ +#endif /* defined(TOR_UNIT_TESTS) */ #define proto_entry_free(entry) \ FREE_AND_NULL(proto_entry_t, proto_entry_free_, (entry)) diff --git a/src/core/or/protover_rust.c b/src/core/or/protover_rust.c deleted file mode 100644 index 31ddfa1bdf..0000000000 --- a/src/core/or/protover_rust.c +++ /dev/null @@ -1,34 +0,0 @@ -/* Copyright (c) 2016-2021, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -/* - * \file protover_rust.c - * \brief Provide a C wrapper for functions exposed in /src/rust/protover, - * and safe translation/handling between the Rust/C boundary. - */ - -#include "core/or/or.h" -#include "core/or/protover.h" - -#ifdef HAVE_RUST - -/* Define for compatibility, used in main.c */ -void -protover_free_all(void) -{ -} - -int protover_contains_long_protocol_names_(const char *s); - -/** - * Return true if the unparsed protover in <b>s</b> would contain a protocol - * name longer than MAX_PROTOCOL_NAME_LENGTH, and false otherwise. - */ -bool -protover_list_is_invalid(const char *s) -{ - return protover_contains_long_protocol_names_(s) != 0; -} - -#endif /* defined(HAVE_RUST) */ - diff --git a/src/core/or/relay.c b/src/core/or/relay.c index f5a9e73856..68fddd1ae7 100644 --- a/src/core/or/relay.c +++ b/src/core/or/relay.c @@ -97,6 +97,8 @@ #include "feature/nodelist/routerinfo_st.h" #include "core/or/socks_request_st.h" #include "core/or/sendme.h" +#include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_flow.h" static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, @@ -115,13 +117,6 @@ static void adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ, node_t *node, const tor_addr_t *addr); -/** Stop reading on edge connections when we have this many cells - * waiting on the appropriate queue. */ -#define CELL_QUEUE_HIGHWATER_SIZE 256 -/** Start reading from edge connections again when we get down to this many - * cells. */ -#define CELL_QUEUE_LOWWATER_SIZE 64 - /** Stats: how many relay cells have originated at this hop, or have * been relayed onward (not recognized at this hop)? */ @@ -338,8 +333,17 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ, } return 0; } - log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, - "Didn't recognize cell, but circ stops here! Closing circ."); + if (BUG(CIRCUIT_IS_ORIGIN(circ))) { + /* Should be impossible at this point. */ + return -END_CIRC_REASON_TORPROTOCOL; + } + or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); + if (++or_circ->n_cells_discarded_at_end == 1) { + time_t seconds_open = approx_time() - circ->timestamp_created.tv_sec; + log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, + "Didn't recognize a cell, but circ stops here! Closing circuit. " + "It was created %ld seconds ago.", (long)seconds_open); + } return -END_CIRC_REASON_TORPROTOCOL; } @@ -1574,6 +1578,7 @@ process_sendme_cell(const relay_header_t *rh, const cell_t *cell, } /* Stream level SENDME cell. */ + // TODO: Turn this off for cc_alg=1,2,3; use XON/XOFF instead ret = sendme_process_stream_level(conn, circ, rh->length); if (ret < 0) { /* Means we need to close the circuit with reason ret. */ @@ -1738,6 +1743,44 @@ handle_relay_cell_command(cell_t *cell, circuit_t *circ, } return 0; + case RELAY_COMMAND_XOFF: + if (!conn) { + if (CIRCUIT_IS_ORIGIN(circ)) { + origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ); + if (relay_crypt_from_last_hop(ocirc, layer_hint) && + connection_half_edge_is_valid_data(ocirc->half_streams, + rh->stream_id)) { + circuit_read_valid_data(ocirc, rh->length); + } + } + return 0; + } + + if (circuit_process_stream_xoff(conn, layer_hint, cell)) { + if (CIRCUIT_IS_ORIGIN(circ)) { + circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length); + } + } + return 0; + case RELAY_COMMAND_XON: + if (!conn) { + if (CIRCUIT_IS_ORIGIN(circ)) { + origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ); + if (relay_crypt_from_last_hop(ocirc, layer_hint) && + connection_half_edge_is_valid_data(ocirc->half_streams, + rh->stream_id)) { + circuit_read_valid_data(ocirc, rh->length); + } + } + return 0; + } + + if (circuit_process_stream_xon(conn, layer_hint, cell)) { + if (CIRCUIT_IS_ORIGIN(circ)) { + circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length); + } + } + return 0; case RELAY_COMMAND_END: reason = rh->length > 0 ? get_uint8(cell->payload+RELAY_HEADER_SIZE) : END_STREAM_REASON_MISC; @@ -2091,6 +2134,7 @@ void circuit_reset_sendme_randomness(circuit_t *circ) { circ->have_sent_sufficiently_random_cell = 0; + // XXX: do we need to change this check for congestion control? circ->send_randomness_after_n_cells = CIRCWINDOW_INCREMENT / 2 + crypto_fast_rng_get_uint(get_thread_fast_rng(), CIRCWINDOW_INCREMENT / 2); } @@ -2284,7 +2328,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, } /* Handle the stream-level SENDME package window. */ - if (sendme_note_stream_data_packaged(conn) < 0) { + if (sendme_note_stream_data_packaged(conn, length) < 0) { connection_stop_reading(TO_CONN(conn)); log_debug(domain,"conn->package_window reached 0."); circuit_consider_stop_edge_reading(circ, cpath_layer); @@ -2350,15 +2394,16 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, /* How many cells do we have space for? It will be the minimum of * the number needed to exhaust the package window, and the minimum * needed to fill the cell queue. */ - max_to_package = circ->package_window; + + max_to_package = congestion_control_get_package_window(circ, layer_hint); if (CIRCUIT_IS_ORIGIN(circ)) { cells_on_queue = circ->n_chan_cells.n; } else { or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); cells_on_queue = or_circ->p_chan_cells.n; } - if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package) - max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue; + if (cell_queue_highwatermark() - cells_on_queue < max_to_package) + max_to_package = cell_queue_highwatermark() - cells_on_queue; /* Once we used to start listening on the streams in the order they * appeared in the linked list. That leads to starvation on the @@ -2398,7 +2443,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, /* Activate reading starting from the chosen stream */ for (conn=chosen_stream; conn; conn = conn->next_stream) { /* Start reading for the streams starting from here */ - if (conn->base_.marked_for_close || conn->package_window <= 0) + if (conn->base_.marked_for_close || conn->package_window <= 0 || + conn->xoff_received) continue; if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); @@ -2409,7 +2455,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, } /* Go back and do the ones we skipped, circular-style */ for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) { - if (conn->base_.marked_for_close || conn->package_window <= 0) + if (conn->base_.marked_for_close || conn->package_window <= 0 || + conn->xoff_received) continue; if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); @@ -2495,7 +2542,7 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); log_debug(domain,"considering circ->package_window %d", circ->package_window); - if (circ->package_window <= 0) { + if (congestion_control_get_package_window(circ, layer_hint) <= 0) { log_debug(domain,"yes, not-at-origin. stopped."); for (conn = or_circ->n_streams; conn; conn=conn->next_stream) connection_stop_reading(TO_CONN(conn)); @@ -2506,7 +2553,7 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) /* else, layer hint is defined, use it */ log_debug(domain,"considering layer_hint->package_window %d", layer_hint->package_window); - if (layer_hint->package_window <= 0) { + if (congestion_control_get_package_window(circ, layer_hint) <= 0) { log_debug(domain,"yes, at-origin. stopped."); for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn; conn=conn->next_stream) { @@ -2722,11 +2769,18 @@ cell_queues_get_total_allocation(void) /** The time at which we were last low on memory. */ static time_t last_time_under_memory_pressure = 0; +/** Statistics on how many bytes were removed by the OOM per type. */ +uint64_t oom_stats_n_bytes_removed_dns = 0; +uint64_t oom_stats_n_bytes_removed_cell = 0; +uint64_t oom_stats_n_bytes_removed_geoip = 0; +uint64_t oom_stats_n_bytes_removed_hsdir = 0; + /** Check whether we've got too much space used for cells. If so, * call the OOM handler and return 1. Otherwise, return 0. */ STATIC int cell_queues_check_size(void) { + size_t removed = 0; time_t now = time(NULL); size_t alloc = cell_queues_get_total_allocation(); alloc += half_streams_get_total_allocation(); @@ -2751,20 +2805,27 @@ cell_queues_check_size(void) if (hs_cache_total > get_options()->MaxMemInQueues / 5) { const size_t bytes_to_remove = hs_cache_total - (size_t)(get_options()->MaxMemInQueues / 10); - alloc -= hs_cache_handle_oom(now, bytes_to_remove); + removed = hs_cache_handle_oom(now, bytes_to_remove); + oom_stats_n_bytes_removed_hsdir += removed; + alloc -= removed; } if (geoip_client_cache_total > get_options()->MaxMemInQueues / 5) { const size_t bytes_to_remove = geoip_client_cache_total - (size_t)(get_options()->MaxMemInQueues / 10); - alloc -= geoip_client_cache_handle_oom(now, bytes_to_remove); + removed = geoip_client_cache_handle_oom(now, bytes_to_remove); + oom_stats_n_bytes_removed_geoip += removed; + alloc -= removed; } if (dns_cache_total > get_options()->MaxMemInQueues / 5) { const size_t bytes_to_remove = dns_cache_total - (size_t)(get_options()->MaxMemInQueues / 10); - alloc -= dns_cache_handle_oom(now, bytes_to_remove); + removed = dns_cache_handle_oom(now, bytes_to_remove); + oom_stats_n_bytes_removed_dns += removed; + alloc -= removed; } - circuits_handle_oom(alloc); + removed = circuits_handle_oom(alloc); + oom_stats_n_bytes_removed_cell += removed; return 1; } } @@ -3062,7 +3123,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) /* Is the cell queue low enough to unblock all the streams that are waiting * to write to this circuit? */ - if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) + if (streams_blocked && queue->n <= cell_queue_lowwatermark()) set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */ /* If n_flushed < max still, loop around and pick another circuit */ @@ -3180,7 +3241,7 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, /* If we have too many cells on the circuit, we should stop reading from * the edge streams for a while. */ - if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE) + if (!streams_blocked && queue->n >= cell_queue_highwatermark()) set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */ if (streams_blocked && fromstream) { diff --git a/src/core/or/relay.h b/src/core/or/relay.h index 2f337d5d16..eac920f491 100644 --- a/src/core/or/relay.h +++ b/src/core/or/relay.h @@ -49,6 +49,11 @@ extern uint64_t stats_n_data_bytes_packaged; extern uint64_t stats_n_data_cells_received; extern uint64_t stats_n_data_bytes_received; +extern uint64_t oom_stats_n_bytes_removed_dns; +extern uint64_t oom_stats_n_bytes_removed_cell; +extern uint64_t oom_stats_n_bytes_removed_geoip; +extern uint64_t oom_stats_n_bytes_removed_hsdir; + void dump_cell_pool_usage(int severity); size_t packed_cell_mem_cost(void); diff --git a/src/core/or/sendme.c b/src/core/or/sendme.c index ce3385ae98..ee670f9d51 100644 --- a/src/core/or/sendme.c +++ b/src/core/or/sendme.c @@ -21,6 +21,8 @@ #include "core/or/or_circuit_st.h" #include "core/or/relay.h" #include "core/or/sendme.h" +#include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_flow.h" #include "feature/nodelist/networkstatus.h" #include "lib/ctime/di_ops.h" #include "trunnel/sendme_cell.h" @@ -64,13 +66,6 @@ pop_first_cell_digest(const circuit_t *circ) return NULL; } - /* More cell digest than the SENDME window is never suppose to happen. The - * cell should have been rejected before reaching this point due to its - * package_window down to 0 leading to a circuit close. Scream loudly but - * still pop the element so we don't memory leak. */ - tor_assert_nonfatal(smartlist_len(circ->sendme_last_digests) <= - CIRCWINDOW_START_MAX / CIRCWINDOW_INCREMENT); - circ_digest = smartlist_get(circ->sendme_last_digests, 0); smartlist_del_keeporder(circ->sendme_last_digests, 0); return circ_digest; @@ -334,17 +329,18 @@ record_cell_digest_on_circ(circuit_t *circ, const uint8_t *sendme_digest) /** Return true iff the next cell for the given cell window is expected to be * a SENDME. * - * We are able to know that because the package or deliver window value minus - * one cell (the possible SENDME cell) should be a multiple of the increment - * window value. */ + * We are able to know that because the package or inflight window value minus + * one cell (the possible SENDME cell) should be a multiple of the + * cells-per-sendme increment value (set via consensus parameter, negotiated + * for the circuit, and passed in as sendme_inc). + * + * This function is used when recording a cell digest and this is done quite + * low in the stack when decrypting or encrypting a cell. The window is only + * updated once the cell is actually put in the outbuf. + */ static bool -circuit_sendme_cell_is_next(int window) +circuit_sendme_cell_is_next(int window, int sendme_inc) { - /* At the start of the window, no SENDME will be expected. */ - if (window == CIRCWINDOW_START) { - return false; - } - /* Are we at the limit of the increment and if not, we don't expect next * cell is a SENDME. * @@ -352,11 +348,8 @@ circuit_sendme_cell_is_next(int window) * next cell is a SENDME, the window (either package or deliver) hasn't been * decremented just yet so when this is called, we are currently processing * the "window - 1" cell. - * - * This function is used when recording a cell digest and this is done quite - * low in the stack when decrypting or encrypting a cell. The window is only - * updated once the cell is actually put in the outbuf. */ - if (((window - 1) % CIRCWINDOW_INCREMENT) != 0) { + */ + if (((window - 1) % sendme_inc) != 0) { return false; } @@ -378,6 +371,10 @@ sendme_connection_edge_consider_sending(edge_connection_t *conn) int log_domain = TO_CONN(conn)->type == CONN_TYPE_AP ? LD_APP : LD_EXIT; + /* If we use flow control, we do not send stream sendmes */ + if (edge_uses_flow_control(conn)) + goto end; + /* Don't send it if we still have data to deliver. */ if (connection_outbuf_too_full(TO_CONN(conn))) { goto end; @@ -419,15 +416,16 @@ sendme_circuit_consider_sending(circuit_t *circ, crypt_path_t *layer_hint) { bool sent_one_sendme = false; const uint8_t *digest; + int sendme_inc = sendme_get_inc_count(circ, layer_hint); while ((layer_hint ? layer_hint->deliver_window : circ->deliver_window) <= - CIRCWINDOW_START - CIRCWINDOW_INCREMENT) { + CIRCWINDOW_START - sendme_inc) { log_debug(LD_CIRC,"Queuing circuit sendme."); if (layer_hint) { - layer_hint->deliver_window += CIRCWINDOW_INCREMENT; + layer_hint->deliver_window += sendme_inc; digest = cpath_get_sendme_digest(layer_hint); } else { - circ->deliver_window += CIRCWINDOW_INCREMENT; + circ->deliver_window += sendme_inc; digest = relay_crypto_get_sendme_digest(&TO_OR_CIRCUIT(circ)->crypto); } if (send_circuit_level_sendme(circ, layer_hint, digest) < 0) { @@ -448,6 +446,9 @@ sendme_circuit_consider_sending(circuit_t *circ, crypt_path_t *layer_hint) * the length of the SENDME cell payload (excluding the header). The * cell_payload is the payload. * + * This function validates the SENDME's digest, and then dispatches to + * the appropriate congestion control algorithm in use on the circuit. + * * Return 0 on success (the SENDME is valid and the package window has * been updated properly). * @@ -460,6 +461,7 @@ sendme_process_circuit_level(crypt_path_t *layer_hint, { tor_assert(circ); tor_assert(cell_payload); + congestion_control_t *cc; /* Validate the SENDME cell. Depending on the version, different validation * can be done. An invalid SENDME requires us to close the circuit. */ @@ -467,6 +469,34 @@ sendme_process_circuit_level(crypt_path_t *layer_hint, return -END_CIRC_REASON_TORPROTOCOL; } + // Get CC + if (layer_hint) { + cc = layer_hint->ccontrol; + + /* origin circuits need to count valid sendmes as valid protocol data */ + circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), cell_payload_len); + } else { + cc = circ->ccontrol; + } + + /* If there is no CC object, assume fixed alg */ + if (!cc) { + return sendme_process_circuit_level_impl(layer_hint, circ); + } + + return congestion_control_dispatch_cc_alg(cc, circ, layer_hint); +} + +/** + * Process a SENDME for Tor's original fixed window circuit-level flow control. + * Updates the package_window and ensures that it does not exceed the max. + * + * Returns -END_CIRC_REASON_TORPROTOCOL if the max is exceeded, otherwise + * returns 0. + */ +int +sendme_process_circuit_level_impl(crypt_path_t *layer_hint, circuit_t *circ) +{ /* If we are the origin of the circuit, we are the Client so we use the * layer hint (the Exit hop) for the package window tracking. */ if (CIRCUIT_IS_ORIGIN(circ)) { @@ -486,10 +516,6 @@ sendme_process_circuit_level(crypt_path_t *layer_hint, layer_hint->package_window += CIRCWINDOW_INCREMENT; log_debug(LD_APP, "circ-level sendme at origin, packagewindow %d.", layer_hint->package_window); - - /* We count circuit-level sendme's as valid delivered data because they - * are rate limited. */ - circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), cell_payload_len); } else { /* We aren't the origin of this circuit so we are the Exit and thus we * track the package window with the circuit object. */ @@ -525,6 +551,12 @@ sendme_process_stream_level(edge_connection_t *conn, circuit_t *circ, tor_assert(conn); tor_assert(circ); + if (edge_uses_flow_control(conn)) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Congestion control got stream sendme"); + return -END_CIRC_REASON_TORPROTOCOL; + } + /* Don't allow the other endpoint to request more than our maximum (i.e. * initial) stream SENDME window worth of data. Well-behaved stock clients * will not request more than this max (as per the check in the while loop @@ -582,7 +614,12 @@ int sendme_stream_data_received(edge_connection_t *conn) { tor_assert(conn); - return --conn->deliver_window; + + if (edge_uses_flow_control(conn)) { + return flow_control_decide_xoff(conn); + } else { + return --conn->deliver_window; + } } /* Called when a relay DATA cell is packaged on the given circuit. If @@ -592,34 +629,56 @@ int sendme_note_circuit_data_packaged(circuit_t *circ, crypt_path_t *layer_hint) { int package_window, domain; + congestion_control_t *cc; tor_assert(circ); - if (CIRCUIT_IS_ORIGIN(circ)) { - /* Client side. */ - tor_assert(layer_hint); - --layer_hint->package_window; - package_window = layer_hint->package_window; + if (layer_hint) { + cc = layer_hint->ccontrol; domain = LD_APP; } else { - /* Exit side. */ - tor_assert(!layer_hint); - --circ->package_window; - package_window = circ->package_window; + cc = circ->ccontrol; domain = LD_EXIT; } - log_debug(domain, "Circuit package_window now %d.", package_window); - return package_window; + if (cc) { + congestion_control_note_cell_sent(cc, circ, layer_hint); + } else { + /* Fixed alg uses package_window and must update it */ + + if (CIRCUIT_IS_ORIGIN(circ)) { + /* Client side. */ + tor_assert(layer_hint); + --layer_hint->package_window; + package_window = layer_hint->package_window; + } else { + /* Exit side. */ + tor_assert(!layer_hint); + --circ->package_window; + package_window = circ->package_window; + } + log_debug(domain, "Circuit package_window now %d.", package_window); + } + + /* Return appropriate number designating how many cells can still be sent */ + return congestion_control_get_package_window(circ, layer_hint); } /* Called when a relay DATA cell is packaged for the given edge connection * conn. Update the package window and return its new value. */ int -sendme_note_stream_data_packaged(edge_connection_t *conn) +sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len) { tor_assert(conn); + if (edge_uses_flow_control(conn)) { + flow_control_note_sent_data(conn, len); + if (conn->xoff_received) + return -1; + else + return 1; + } + --conn->package_window; log_debug(LD_APP, "Stream package_window now %d.", conn->package_window); return conn->package_window; @@ -631,20 +690,14 @@ sendme_note_stream_data_packaged(edge_connection_t *conn) void sendme_record_cell_digest_on_circ(circuit_t *circ, crypt_path_t *cpath) { - int package_window; uint8_t *sendme_digest; tor_assert(circ); - package_window = circ->package_window; - if (cpath) { - package_window = cpath->package_window; - } - /* Is this the last cell before a SENDME? The idea is that if the * package_window reaches a multiple of the increment, after this cell, we * should expect a SENDME. */ - if (!circuit_sendme_cell_is_next(package_window)) { + if (!circuit_sent_cell_for_sendme(circ, cpath)) { return; } @@ -670,7 +723,8 @@ sendme_record_received_cell_digest(circuit_t *circ, crypt_path_t *cpath) /* Only record if the next cell is expected to be a SENDME. */ if (!circuit_sendme_cell_is_next(cpath ? cpath->deliver_window : - circ->deliver_window)) { + circ->deliver_window, + sendme_get_inc_count(circ, cpath))) { return; } @@ -692,8 +746,7 @@ sendme_record_sending_cell_digest(circuit_t *circ, crypt_path_t *cpath) tor_assert(circ); /* Only record if the next cell is expected to be a SENDME. */ - if (!circuit_sendme_cell_is_next(cpath ? cpath->package_window : - circ->package_window)) { + if (!circuit_sent_cell_for_sendme(circ, cpath)) { goto end; } diff --git a/src/core/or/sendme.h b/src/core/or/sendme.h index a008940905..2abec91a91 100644 --- a/src/core/or/sendme.h +++ b/src/core/or/sendme.h @@ -22,6 +22,7 @@ void sendme_circuit_consider_sending(circuit_t *circ, int sendme_process_circuit_level(crypt_path_t *layer_hint, circuit_t *circ, const uint8_t *cell_payload, uint16_t cell_payload_len); +int sendme_process_circuit_level_impl(crypt_path_t *, circuit_t *); int sendme_process_stream_level(edge_connection_t *conn, circuit_t *circ, uint16_t cell_body_len); @@ -32,7 +33,7 @@ int sendme_circuit_data_received(circuit_t *circ, crypt_path_t *layer_hint); /* Update package window functions. */ int sendme_note_circuit_data_packaged(circuit_t *circ, crypt_path_t *layer_hint); -int sendme_note_stream_data_packaged(edge_connection_t *conn); +int sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len); /* Record cell digest on circuit. */ void sendme_record_cell_digest_on_circ(circuit_t *circ, crypt_path_t *cpath); diff --git a/src/core/or/status.c b/src/core/or/status.c index 9e7ae70535..1e599aafb3 100644 --- a/src/core/or/status.c +++ b/src/core/or/status.c @@ -147,6 +147,32 @@ note_connection(bool inbound, int family) } } +/** + * @name Counters for unrecognized cells + * + * Track cells that we drop because they are unrecognized and we have + * nobody to send them to. + **/ +/**@{*/ +static unsigned n_circs_closed_for_unrecognized_cells; +static uint64_t n_unrecognized_cells_discarded; +static uint64_t n_secs_on_circs_with_unrecognized_cells; +/**@}*/ + +/** + * Note that a circuit has closed @a n_seconds after having been created, + * because of one or more unrecognized cells. Also note the number of + * unrecognized cells @a n_cells. + */ +void +note_circ_closed_for_unrecognized_cells(time_t n_seconds, uint32_t n_cells) +{ + ++n_circs_closed_for_unrecognized_cells; + n_unrecognized_cells_discarded += n_cells; + if (n_seconds >= 0) + n_secs_on_circs_with_unrecognized_cells += (uint64_t) n_seconds; +} + /** Log a "heartbeat" message describing Tor's status and history so that the * user can know that there is indeed a running Tor. Return 0 on success and * -1 on failure. */ @@ -240,6 +266,23 @@ log_heartbeat(time_t now) (main_loop_idle_count)); } + if (n_circs_closed_for_unrecognized_cells) { + double avg_time_alive = ((double) n_secs_on_circs_with_unrecognized_cells) + / n_circs_closed_for_unrecognized_cells; + double avg_cells = ((double) n_unrecognized_cells_discarded) + / n_circs_closed_for_unrecognized_cells; + log_fn(LOG_NOTICE, LD_HEARTBEAT, + "Since our last heartbeat, %u circuits were closed because of " + "unrecognized cells while we were the last hop. On average, each " + "one was alive for %lf seconds, and had %lf unrecognized cells.", + n_circs_closed_for_unrecognized_cells, + avg_time_alive, + avg_cells); + n_circs_closed_for_unrecognized_cells = 0; + n_unrecognized_cells_discarded = 0; + n_secs_on_circs_with_unrecognized_cells = 0; + } + /** Now, if we are an HS service, log some stats about our usage */ log_onion_service_stats(); diff --git a/src/core/or/status.h b/src/core/or/status.h index 927df9a192..57e28002fc 100644 --- a/src/core/or/status.h +++ b/src/core/or/status.h @@ -12,6 +12,9 @@ #include "lib/testsupport/testsupport.h" void note_connection(bool inbound, int family); +void note_circ_closed_for_unrecognized_cells(time_t n_seconds, + uint32_t n_cells); + int log_heartbeat(time_t now); #ifdef STATUS_PRIVATE diff --git a/src/core/or/trace_probes_cc.c b/src/core/or/trace_probes_cc.c new file mode 100644 index 0000000000..d52646da4f --- /dev/null +++ b/src/core/or/trace_probes_cc.c @@ -0,0 +1,33 @@ +/* Copyright (c) 2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file trace_probes_cc.c + * \brief Tracepoint provider source file for the cc subsystem. Probes + * are generated within this C file for LTTng-UST + **/ + +#include "orconfig.h" + +/* + * Following section is specific to LTTng-UST. + */ +#ifdef USE_TRACING_INSTRUMENTATION_LTTNG + +/* Header files that the probes need. */ +#include "core/or/or.h" +#include "core/or/channel.h" +#include "core/or/circuit_st.h" +#include "core/or/circuitlist.h" +#include "core/or/congestion_control_st.h" +#include "core/or/connection_st.h" +#include "core/or/edge_connection_st.h" +#include "core/or/or_circuit_st.h" +#include "core/or/origin_circuit_st.h" + +#define TRACEPOINT_DEFINE +#define TRACEPOINT_CREATE_PROBES + +#include "core/or/trace_probes_cc.h" + +#endif /* defined(USE_TRACING_INSTRUMENTATION_LTTNG) */ diff --git a/src/core/or/trace_probes_cc.h b/src/core/or/trace_probes_cc.h new file mode 100644 index 0000000000..1f87528723 --- /dev/null +++ b/src/core/or/trace_probes_cc.h @@ -0,0 +1,22 @@ +/* Copyright (c) 2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file trace_probes_cc.c + * \brief The tracing probes for the congestion control subsystem. + * Currently, only LTTng-UST probes are available. + **/ + +#ifndef TOR_TRACE_PROBES_CC_H +#define TOR_TRACE_PROBES_CC_H + +#include "lib/trace/events.h" + +/* We only build the following if LTTng instrumentation has been enabled. */ +#ifdef USE_TRACING_INSTRUMENTATION_LTTNG + +#include "core/or/lttng_cc.inc" + +#endif /* USE_TRACING_INSTRUMENTATION_LTTNG */ + +#endif /* !defined(TOR_TRACE_PROBES_CC_H) */ |