diff options
Diffstat (limited to 'src/or/channel.c')
-rw-r--r-- | src/or/channel.c | 1974 |
1 files changed, 287 insertions, 1687 deletions
diff --git a/src/or/channel.c b/src/or/channel.c index 3c0025aff6..a4740dd752 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -1,3 +1,4 @@ + /* * Copyright (c) 2012-2017, The Tor Project, Inc. */ /* See LICENSE for licensing information */ @@ -5,9 +6,8 @@ * \file channel.c * * \brief OR/OP-to-OR channel abstraction layer. A channel's job is to - * transfer cells from Tor instance to Tor instance. - * Currently, there is only one implementation of the channel abstraction: in - * channeltls.c. + * transfer cells from Tor instance to Tor instance. Currently, there is only + * one implementation of the channel abstraction: in channeltls.c. * * Channels are a higher-level abstraction than or_connection_t: In general, * any means that two Tor relays use to exchange cells, or any means that a @@ -24,23 +24,34 @@ * connection. * * Every channel implementation is responsible for being able to transmit - * cells that are added to it with channel_write_cell() and related functions, - * and to receive incoming cells with the channel_queue_cell() and related - * functions. See the channel_t documentation for more information. - * - * When new cells arrive on a channel, they are passed to cell handler - * functions, which can be set by channel_set_cell_handlers() - * functions. (Tor's cell handlers are in command.c.) - * - * Tor flushes cells to channels from relay.c in - * channel_flush_from_first_active_circuit(). + * cells that are passed to it + * + * For *inbound* cells, the entry point is: channel_process_cell(). It takes a + * cell and will pass it to the cell handler set by + * channel_set_cell_handlers(). Currently, this is passed back to the command + * subsystem which is command_process_cell(). + * + * NOTE: For now, the separation between channels and specialized channels + * (like channeltls) is not that well defined. So the channeltls layer calls + * channel_process_cell() which originally comes from the connection subsytem. + * This should be hopefully be fixed with #23993. + * + * For *outbound* cells, the entry point is: channel_write_packed_cell(). + * Only packed cells are dequeued from the circuit queue by the scheduler + * which uses channel_flush_from_first_active_circuit() to decide which cells + * to flush from which circuit on the channel. They are then passed down to + * the channel subsystem. This calls the low layer with the function pointer + * .write_packed_cell(). + * + * Each specialized channel (currently only channeltls_t) MUST implement a + * series of function found in channel_t. See channel.h for more + * documentation. **/ /* * Define this so channel.h gives us things only channel_t subclasses * should touch. */ - #define TOR_CHANNEL_INTERNAL_ /* This one's for stuff only channel.c and the test suite should see */ @@ -112,59 +123,6 @@ HANDLE_IMPL(channel, channel_s,) /* Counter for ID numbers */ static uint64_t n_channels_allocated = 0; -/* - * Channel global byte/cell counters, for statistics and for scheduler high - * /low-water marks. - */ - -/* - * Total number of cells ever given to any channel with the - * channel_write_*_cell() functions. - */ - -static uint64_t n_channel_cells_queued = 0; - -/* - * Total number of cells ever passed to a channel lower layer with the - * write_*_cell() methods. - */ - -static uint64_t n_channel_cells_passed_to_lower_layer = 0; - -/* - * Current number of cells in all channel queues; should be - * n_channel_cells_queued - n_channel_cells_passed_to_lower_layer. - */ - -static uint64_t n_channel_cells_in_queues = 0; - -/* - * Total number of bytes for all cells ever queued to a channel and - * counted in n_channel_cells_queued. - */ - -static uint64_t n_channel_bytes_queued = 0; - -/* - * Total number of bytes for all cells ever passed to a channel lower layer - * and counted in n_channel_cells_passed_to_lower_layer. - */ - -static uint64_t n_channel_bytes_passed_to_lower_layer = 0; - -/* - * Current number of bytes in all channel queues; should be - * n_channel_bytes_queued - n_channel_bytes_passed_to_lower_layer. - */ - -static uint64_t n_channel_bytes_in_queues = 0; - -/* - * Current total estimated queue size *including lower layer queues and - * transmit overhead* - */ - -STATIC uint64_t estimated_total_queue_size = 0; /* Digest->channel map * @@ -201,49 +159,23 @@ HT_PROTOTYPE(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash, HT_GENERATE2(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash, channel_idmap_eq, 0.5, tor_reallocarray_, tor_free_) -static cell_queue_entry_t * cell_queue_entry_dup(cell_queue_entry_t *q); -#if 0 -static int cell_queue_entry_is_padding(cell_queue_entry_t *q); -#endif -static cell_queue_entry_t * -cell_queue_entry_new_fixed(cell_t *cell); -static cell_queue_entry_t * -cell_queue_entry_new_var(var_cell_t *var_cell); -static int is_destroy_cell(channel_t *chan, - const cell_queue_entry_t *q, circid_t *circid_out); - -static void channel_assert_counter_consistency(void); - /* Functions to maintain the digest map */ -static void channel_add_to_digest_map(channel_t *chan); static void channel_remove_from_digest_map(channel_t *chan); -/* - * Flush cells from just the outgoing queue without trying to get them - * from circuits; used internall by channel_flush_some_cells(). - */ -static ssize_t -channel_flush_some_cells_from_outgoing_queue(channel_t *chan, - ssize_t num_cells); -static void channel_force_free(channel_t *chan); -static void -channel_free_list(smartlist_t *channels, int mark_for_close); -static void -channel_listener_free_list(smartlist_t *channels, int mark_for_close); -static void channel_listener_force_free(channel_listener_t *chan_l); -static size_t channel_get_cell_queue_entry_size(channel_t *chan, - cell_queue_entry_t *q); -static void -channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q); +static void channel_force_xfree(channel_t *chan); +static void channel_free_list(smartlist_t *channels, + int mark_for_close); +static void channel_listener_free_list(smartlist_t *channels, + int mark_for_close); +static void channel_listener_force_xfree(channel_listener_t *chan_l); /*********************************** * Channel state utility functions * **********************************/ /** - * Indicate whether a given channel state is valid + * Indicate whether a given channel state is valid. */ - int channel_state_is_valid(channel_state_t state) { @@ -267,9 +199,8 @@ channel_state_is_valid(channel_state_t state) } /** - * Indicate whether a given channel listener state is valid + * Indicate whether a given channel listener state is valid. */ - int channel_listener_state_is_valid(channel_listener_state_t state) { @@ -291,13 +222,12 @@ channel_listener_state_is_valid(channel_listener_state_t state) } /** - * Indicate whether a channel state transition is valid + * Indicate whether a channel state transition is valid. * * This function takes two channel states and indicates whether a * transition between them is permitted (see the state definitions and * transition table in or.h at the channel_state_t typedef). */ - int channel_state_can_transition(channel_state_t from, channel_state_t to) { @@ -338,13 +268,12 @@ channel_state_can_transition(channel_state_t from, channel_state_t to) } /** - * Indicate whether a channel listener state transition is valid + * Indicate whether a channel listener state transition is valid. * * This function takes two channel listener states and indicates whether a * transition between them is permitted (see the state definitions and * transition table in or.h at the channel_listener_state_t typedef). */ - int channel_listener_state_can_transition(channel_listener_state_t from, channel_listener_state_t to) @@ -375,9 +304,8 @@ channel_listener_state_can_transition(channel_listener_state_t from, } /** - * Return a human-readable description for a channel state + * Return a human-readable description for a channel state. */ - const char * channel_state_to_string(channel_state_t state) { @@ -411,9 +339,8 @@ channel_state_to_string(channel_state_t state) } /** - * Return a human-readable description for a channel listenier state + * Return a human-readable description for a channel listener state. */ - const char * channel_listener_state_to_string(channel_listener_state_t state) { @@ -445,12 +372,11 @@ channel_listener_state_to_string(channel_listener_state_t state) ***************************************/ /** - * Register a channel + * Register a channel. * * This function registers a newly created channel in the global lists/maps * of active channels. */ - void channel_register(channel_t *chan) { @@ -503,12 +429,11 @@ channel_register(channel_t *chan) } /** - * Unregister a channel + * Unregister a channel. * * This function removes a channel from the global lists and maps and is used * when freeing a closed/errored channel. */ - void channel_unregister(channel_t *chan) { @@ -543,12 +468,11 @@ channel_unregister(channel_t *chan) } /** - * Register a channel listener + * Register a channel listener. * - * This function registers a newly created channel listner in the global + * This function registers a newly created channel listener in the global * lists/maps of active channel listeners. */ - void channel_listener_register(channel_listener_t *chan_l) { @@ -585,12 +509,11 @@ channel_listener_register(channel_listener_t *chan_l) } /** - * Unregister a channel listener + * Unregister a channel listener. * * This function removes a channel listener from the global lists and maps * and is used when freeing a closed/errored channel listener. */ - void channel_listener_unregister(channel_listener_t *chan_l) { @@ -621,14 +544,13 @@ channel_listener_unregister(channel_listener_t *chan_l) *********************************/ /** - * Add a channel to the digest map + * Add a channel to the digest map. * * This function adds a channel to the digest map and inserts it into the * correct linked list if channels with that remote endpoint identity digest * already exist. */ - -static void +STATIC void channel_add_to_digest_map(channel_t *chan) { channel_idmap_entry_t *ent, search; @@ -660,12 +582,11 @@ channel_add_to_digest_map(channel_t *chan) } /** - * Remove a channel from the digest map + * Remove a channel from the digest map. * * This function removes a channel from the digest map and the linked list of * channels for that digest if more than one exists. */ - static void channel_remove_from_digest_map(channel_t *chan) { @@ -676,33 +597,6 @@ channel_remove_from_digest_map(channel_t *chan) /* Assert that there is a digest */ tor_assert(!tor_digest_is_zero(chan->identity_digest)); -#if 0 - /* Make sure we have a map */ - if (!channel_identity_map) { - /* - * No identity map, so we can't find it by definition. This - * case is similar to digestmap_get() failing below. - */ - log_warn(LD_BUG, - "Trying to remove channel %p (global ID " U64_FORMAT ") " - "with digest %s from identity map, but didn't have any identity " - "map", - chan, U64_PRINTF_ARG(chan->global_identifier), - hex_str(chan->identity_digest, DIGEST_LEN)); - /* Clear out its next/prev pointers */ - if (chan->next_with_same_id) { - chan->next_with_same_id->prev_with_same_id = chan->prev_with_same_id; - } - if (chan->prev_with_same_id) { - chan->prev_with_same_id->next_with_same_id = chan->next_with_same_id; - } - chan->next_with_same_id = NULL; - chan->prev_with_same_id = NULL; - - return; - } -#endif /* 0 */ - /* Pull it out of its list, wherever that list is */ TOR_LIST_REMOVE(chan, next_with_same_id); @@ -740,13 +634,12 @@ channel_remove_from_digest_map(channel_t *chan) ***************************/ /** - * Find channel by global ID + * Find channel by global ID. * * This function searches for a channel by the global_identifier assigned * at initialization time. This identifier is unique for the lifetime of the * Tor process. */ - channel_t * channel_find_by_global_id(uint64_t global_identifier) { @@ -784,7 +677,7 @@ channel_remote_identity_matches(const channel_t *chan, } /** - * Find channel by RSA/Ed25519 identity of of the remote endpoint + * Find channel by RSA/Ed25519 identity of of the remote endpoint. * * This function looks up a channel by the digest of its remote endpoint's RSA * identity key. If <b>ed_id</b> is provided and nonzero, only a channel @@ -823,12 +716,11 @@ channel_find_by_remote_identity(const char *rsa_id_digest, } /** - * Get next channel with digest + * Get next channel with digest. * * This function takes a channel and finds the next channel in the list * with the same digest. */ - channel_t * channel_next_with_rsa_identity(channel_t *chan) { @@ -915,13 +807,12 @@ channel_check_for_duplicates(void) } /** - * Initialize a channel + * Initialize a channel. * * This function should be called by subclasses to set up some per-channel * variables. I.e., this is the superclass constructor. Before this, the * channel should be allocated with tor_malloc_zero(). */ - void channel_init(channel_t *chan) { @@ -936,10 +827,6 @@ channel_init(channel_t *chan) /* Warn about exhausted circuit IDs no more than hourly. */ chan->last_warned_circ_ids_exhausted.rate = 3600; - /* Initialize queues. */ - TOR_SIMPLEQ_INIT(&chan->incoming_queue); - TOR_SIMPLEQ_INIT(&chan->outgoing_queue); - /* Initialize list entries. */ memset(&chan->next_with_same_id, 0, sizeof(chan->next_with_same_id)); @@ -957,13 +844,12 @@ channel_init(channel_t *chan) } /** - * Initialize a channel listener + * Initialize a channel listener. * * This function should be called by subclasses to set up some per-channel * variables. I.e., this is the superclass constructor. Before this, the * channel listener should be allocated with tor_malloc_zero(). */ - void channel_init_listener(channel_listener_t *chan_l) { @@ -980,9 +866,8 @@ channel_init_listener(channel_listener_t *chan_l) * Free a channel; nothing outside of channel.c and subclasses should call * this - it frees channels after they have closed and been unregistered. */ - void -channel_free(channel_t *chan) +channel_free_(channel_t *chan) { if (!chan) return; @@ -1025,8 +910,6 @@ channel_free(channel_t *chan) chan->cmux = NULL; } - /* We're in CLOSED or ERROR, so the cell queue is already empty */ - tor_free(chan); } @@ -1035,9 +918,8 @@ channel_free(channel_t *chan) * should call this - it frees channel listeners after they have closed and * been unregistered. */ - void -channel_listener_free(channel_listener_t *chan_l) +channel_listener_free_(channel_listener_t *chan_l) { if (!chan_l) return; @@ -1055,11 +937,6 @@ channel_listener_free(channel_listener_t *chan_l) /* Call a free method if there is one */ if (chan_l->free_fn) chan_l->free_fn(chan_l); - /* - * We're in CLOSED or ERROR, so the incoming channel queue is already - * empty. - */ - tor_free(chan_l); } @@ -1068,11 +945,9 @@ channel_listener_free(channel_listener_t *chan_l) * use-only function should be called only from channel_free_all() when * shutting down the Tor process. */ - static void -channel_force_free(channel_t *chan) +channel_force_xfree(channel_t *chan) { - cell_queue_entry_t *cell, *cell_tmp; tor_assert(chan); log_debug(LD_CHANNEL, @@ -1106,29 +981,16 @@ channel_force_free(channel_t *chan) chan->cmux = NULL; } - /* We might still have a cell queue; kill it */ - TOR_SIMPLEQ_FOREACH_SAFE(cell, &chan->incoming_queue, next, cell_tmp) { - cell_queue_entry_free(cell, 0); - } - TOR_SIMPLEQ_INIT(&chan->incoming_queue); - - /* Outgoing cell queue is similar, but we can have to free packed cells */ - TOR_SIMPLEQ_FOREACH_SAFE(cell, &chan->outgoing_queue, next, cell_tmp) { - cell_queue_entry_free(cell, 0); - } - TOR_SIMPLEQ_INIT(&chan->outgoing_queue); - tor_free(chan); } /** - * Free a channel listener and skip the state/reigstration asserts; this + * Free a channel listener and skip the state/registration asserts; this * internal-use-only function should be called only from channel_free_all() * when shutting down the Tor process. */ - static void -channel_listener_force_free(channel_listener_t *chan_l) +channel_listener_force_xfree(channel_listener_t *chan_l) { tor_assert(chan_l); @@ -1159,30 +1021,11 @@ channel_listener_force_free(channel_listener_t *chan_l) } /** - * Return the current registered listener for a channel listener - * - * This function returns a function pointer to the current registered - * handler for new incoming channels on a channel listener. - */ - -channel_listener_fn_ptr -channel_listener_get_listener_fn(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - if (chan_l->state == CHANNEL_LISTENER_STATE_LISTENING) - return chan_l->listener; - - return NULL; -} - -/** - * Set the listener for a channel listener + * Set the listener for a channel listener. * * This function sets the handler for new incoming channels on a channel * listener. */ - void channel_listener_set_listener_fn(channel_listener_t *chan_l, channel_listener_fn_ptr listener) @@ -1201,12 +1044,11 @@ channel_listener_set_listener_fn(channel_listener_t *chan_l, } /** - * Return the fixed-length cell handler for a channel + * Return the fixed-length cell handler for a channel. * * This function gets the handler for incoming fixed-length cells installed * on a channel. */ - channel_cell_handler_fn_ptr channel_get_cell_handler(channel_t *chan) { @@ -1219,12 +1061,11 @@ channel_get_cell_handler(channel_t *chan) } /** - * Return the variable-length cell handler for a channel + * Return the variable-length cell handler for a channel. * * This function gets the handler for incoming variable-length cells * installed on a channel. */ - channel_var_cell_handler_fn_ptr channel_get_var_cell_handler(channel_t *chan) { @@ -1237,21 +1078,17 @@ channel_get_var_cell_handler(channel_t *chan) } /** - * Set both cell handlers for a channel + * Set both cell handlers for a channel. * * This function sets both the fixed-length and variable length cell handlers - * for a channel and processes any incoming cells that had been blocked in the - * queue because none were available. + * for a channel. */ - void channel_set_cell_handlers(channel_t *chan, channel_cell_handler_fn_ptr cell_handler, channel_var_cell_handler_fn_ptr var_cell_handler) { - int try_again = 0; - tor_assert(chan); tor_assert(CHANNEL_CAN_HANDLE_CELLS(chan)); @@ -1262,21 +1099,9 @@ channel_set_cell_handlers(channel_t *chan, "Setting var_cell_handler callback for channel %p to %p", chan, var_cell_handler); - /* Should we try the queue? */ - if (cell_handler && - cell_handler != chan->cell_handler) try_again = 1; - if (var_cell_handler && - var_cell_handler != chan->var_cell_handler) try_again = 1; - /* Change them */ chan->cell_handler = cell_handler; chan->var_cell_handler = var_cell_handler; - - /* Re-run the queue if we have one and there's any reason to */ - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue) && - try_again && - (chan->cell_handler || - chan->var_cell_handler)) channel_process_cells(chan); } /* @@ -1293,13 +1118,12 @@ channel_set_cell_handlers(channel_t *chan, */ /** - * Mark a channel for closure + * Mark a channel for closure. * * This function tries to close a channel_t; it will go into the CLOSING * state, and eventually the lower layer should put it into the CLOSED or * ERROR state. Then, channel_run_cleanup() will eventually free it. */ - void channel_mark_for_close(channel_t *chan) { @@ -1333,13 +1157,12 @@ channel_mark_for_close(channel_t *chan) } /** - * Mark a channel listener for closure + * Mark a channel listener for closure. * * This function tries to close a channel_listener_t; it will go into the * CLOSING state, and eventually the lower layer should put it into the CLOSED * or ERROR state. Then, channel_run_cleanup() will eventually free it. */ - void channel_listener_mark_for_close(channel_listener_t *chan_l) { @@ -1374,13 +1197,12 @@ channel_listener_mark_for_close(channel_listener_t *chan_l) } /** - * Close a channel from the lower layer + * Close a channel from the lower layer. * * Notify the channel code that the channel is being closed due to a non-error * condition in the lower layer. This does not call the close() method, since * the lower layer already knows. */ - void channel_close_from_lower_layer(channel_t *chan) { @@ -1403,43 +1225,12 @@ channel_close_from_lower_layer(channel_t *chan) } /** - * Close a channel listener from the lower layer - * - * Notify the channel code that the channel listener is being closed due to a - * non-error condition in the lower layer. This does not call the close() - * method, since the lower layer already knows. - */ - -void -channel_listener_close_from_lower_layer(channel_listener_t *chan_l) -{ - tor_assert(chan_l != NULL); - - /* If it's already in CLOSING, CLOSED or ERROR, this is a no-op */ - if (chan_l->state == CHANNEL_LISTENER_STATE_CLOSING || - chan_l->state == CHANNEL_LISTENER_STATE_CLOSED || - chan_l->state == CHANNEL_LISTENER_STATE_ERROR) return; - - log_debug(LD_CHANNEL, - "Closing channel listener %p (global ID " U64_FORMAT ") " - "due to lower-layer event", - chan_l, U64_PRINTF_ARG(chan_l->global_identifier)); - - /* Note closing by event from below */ - chan_l->reason_for_closing = CHANNEL_LISTENER_CLOSE_FROM_BELOW; - - /* Change state to CLOSING */ - channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_CLOSING); -} - -/** - * Notify that the channel is being closed due to an error condition + * Notify that the channel is being closed due to an error condition. * * This function is called by the lower layer implementing the transport * when a channel must be closed due to an error condition. This does not * call the channel's close method, since the lower layer already knows. */ - void channel_close_for_error(channel_t *chan) { @@ -1461,44 +1252,12 @@ channel_close_for_error(channel_t *chan) } /** - * Notify that the channel listener is being closed due to an error condition - * - * This function is called by the lower layer implementing the transport - * when a channel listener must be closed due to an error condition. This - * does not call the channel listener's close method, since the lower layer - * already knows. - */ - -void -channel_listener_close_for_error(channel_listener_t *chan_l) -{ - tor_assert(chan_l != NULL); - - /* If it's already in CLOSING, CLOSED or ERROR, this is a no-op */ - if (chan_l->state == CHANNEL_LISTENER_STATE_CLOSING || - chan_l->state == CHANNEL_LISTENER_STATE_CLOSED || - chan_l->state == CHANNEL_LISTENER_STATE_ERROR) return; - - log_debug(LD_CHANNEL, - "Closing channel listener %p (global ID " U64_FORMAT ") " - "due to lower-layer error", - chan_l, U64_PRINTF_ARG(chan_l->global_identifier)); - - /* Note closing by event from below */ - chan_l->reason_for_closing = CHANNEL_LISTENER_CLOSE_FOR_ERROR; - - /* Change state to CLOSING */ - channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_CLOSING); -} - -/** - * Notify that the lower layer is finished closing the channel + * Notify that the lower layer is finished closing the channel. * * This function should be called by the lower layer when a channel * is finished closing and it should be regarded as inactive and * freed by the channel code. */ - void channel_closed(channel_t *chan) { @@ -1525,39 +1284,11 @@ channel_closed(channel_t *chan) } /** - * Notify that the lower layer is finished closing the channel listener - * - * This function should be called by the lower layer when a channel listener - * is finished closing and it should be regarded as inactive and - * freed by the channel code. - */ - -void -channel_listener_closed(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - tor_assert(chan_l->state == CHANNEL_LISTENER_STATE_CLOSING || - chan_l->state == CHANNEL_LISTENER_STATE_CLOSED || - chan_l->state == CHANNEL_LISTENER_STATE_ERROR); - - /* No-op if already inactive */ - if (chan_l->state == CHANNEL_LISTENER_STATE_CLOSED || - chan_l->state == CHANNEL_LISTENER_STATE_ERROR) return; - - if (chan_l->reason_for_closing != CHANNEL_LISTENER_CLOSE_FOR_ERROR) { - channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_CLOSED); - } else { - channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_ERROR); - } -} - -/** - * Clear the identity_digest of a channel + * Clear the identity_digest of a channel. * * This function clears the identity digest of the remote endpoint for a * channel; this is intended for use by the lower layer. */ - void channel_clear_identity_digest(channel_t *chan) { @@ -1582,7 +1313,7 @@ channel_clear_identity_digest(channel_t *chan) } /** - * Set the identity_digest of a channel + * Set the identity_digest of a channel. * * This function sets the identity digest of the remote endpoint for a * channel; this is intended for use by the lower layer. @@ -1641,12 +1372,11 @@ channel_set_identity_digest(channel_t *chan, } /** - * Clear the remote end metadata (identity_digest/nickname) of a channel + * Clear the remote end metadata (identity_digest) of a channel. * * This function clears all the remote end info from a channel; this is * intended for use by the lower layer. */ - void channel_clear_remote_end(channel_t *chan) { @@ -1668,429 +1398,103 @@ channel_clear_remote_end(channel_t *chan) memset(chan->identity_digest, 0, sizeof(chan->identity_digest)); - tor_free(chan->nickname); } /** - * Set the remote end metadata (identity_digest/nickname) of a channel + * Write to a channel the given packed cell. * - * This function sets new remote end info on a channel; this is intended - * for use by the lower layer. - */ - -void -channel_set_remote_end(channel_t *chan, - const char *identity_digest, - const char *nickname) -{ - int was_in_digest_map, should_be_in_digest_map, state_not_in_map; - - tor_assert(chan); - - log_debug(LD_CHANNEL, - "Setting remote endpoint identity on channel %p with " - "global ID " U64_FORMAT " to nickname %s, digest %s", - chan, U64_PRINTF_ARG(chan->global_identifier), - nickname ? nickname : "(null)", - identity_digest ? - hex_str(identity_digest, DIGEST_LEN) : "(null)"); - - state_not_in_map = CHANNEL_CONDEMNED(chan); - - was_in_digest_map = - !state_not_in_map && - chan->registered && - !tor_digest_is_zero(chan->identity_digest); - should_be_in_digest_map = - !state_not_in_map && - chan->registered && - (identity_digest && - !tor_digest_is_zero(identity_digest)); - - if (was_in_digest_map) - /* We should always remove it; we'll add it back if we're writing - * in a new digest. - */ - channel_remove_from_digest_map(chan); - - if (identity_digest) { - memcpy(chan->identity_digest, - identity_digest, - sizeof(chan->identity_digest)); - - } else { - memset(chan->identity_digest, 0, - sizeof(chan->identity_digest)); - } - - tor_free(chan->nickname); - if (nickname) - chan->nickname = tor_strdup(nickname); - - /* Put it in the digest map if we should */ - if (should_be_in_digest_map) - channel_add_to_digest_map(chan); -} - -/** - * Duplicate a cell queue entry; this is a shallow copy intended for use - * in channel_write_cell_queue_entry(). - */ - -static cell_queue_entry_t * -cell_queue_entry_dup(cell_queue_entry_t *q) -{ - cell_queue_entry_t *rv = NULL; - - tor_assert(q); - - rv = tor_malloc(sizeof(*rv)); - memcpy(rv, q, sizeof(*rv)); - - return rv; -} - -/** - * Free a cell_queue_entry_t; the handed_off parameter indicates whether - * the contents were passed to the lower layer (it is responsible for - * them) or not (we should free). + * Two possible errors can happen. Either the channel is not opened or the + * lower layer (specialized channel) failed to write it. In both cases, it is + * the caller responsibility to free the cell. */ - -STATIC void -cell_queue_entry_free(cell_queue_entry_t *q, int handed_off) -{ - if (!q) return; - - if (!handed_off) { - /* - * If we handed it off, the recipient becomes responsible (or - * with packed cells the channel_t subclass calls packed_cell - * free after writing out its contents; see, e.g., - * channel_tls_write_packed_cell_method(). Otherwise, we have - * to take care of it here if possible. - */ - switch (q->type) { - case CELL_QUEUE_FIXED: - if (q->u.fixed.cell) { - /* - * There doesn't seem to be a cell_free() function anywhere in the - * pre-channel code; just use tor_free() - */ - tor_free(q->u.fixed.cell); - } - break; - case CELL_QUEUE_PACKED: - if (q->u.packed.packed_cell) { - packed_cell_free(q->u.packed.packed_cell); - } - break; - case CELL_QUEUE_VAR: - if (q->u.var.var_cell) { - /* - * This one's in connection_or.c; it'd be nice to figure out the - * whole flow of cells from one end to the other and factor the - * cell memory management functions like this out of the specific - * TLS lower layer. - */ - var_cell_free(q->u.var.var_cell); - } - break; - default: - /* - * Nothing we can do if we don't know the type; this will - * have been warned about elsewhere. - */ - break; - } - } - tor_free(q); -} - -#if 0 -/** - * Check whether a cell queue entry is padding; this is a helper function - * for channel_write_cell_queue_entry() - */ - static int -cell_queue_entry_is_padding(cell_queue_entry_t *q) -{ - tor_assert(q); - - if (q->type == CELL_QUEUE_FIXED) { - if (q->u.fixed.cell) { - if (q->u.fixed.cell->command == CELL_PADDING || - q->u.fixed.cell->command == CELL_VPADDING) { - return 1; - } - } - } else if (q->type == CELL_QUEUE_VAR) { - if (q->u.var.var_cell) { - if (q->u.var.var_cell->command == CELL_PADDING || - q->u.var.var_cell->command == CELL_VPADDING) { - return 1; - } - } - } - - return 0; -} -#endif /* 0 */ - -/** - * Allocate a new cell queue entry for a fixed-size cell - */ - -static cell_queue_entry_t * -cell_queue_entry_new_fixed(cell_t *cell) +write_packed_cell(channel_t *chan, packed_cell_t *cell) { - cell_queue_entry_t *q = NULL; - - tor_assert(cell); - - q = tor_malloc(sizeof(*q)); - q->type = CELL_QUEUE_FIXED; - q->u.fixed.cell = cell; - - return q; -} - -/** - * Allocate a new cell queue entry for a variable-size cell - */ - -static cell_queue_entry_t * -cell_queue_entry_new_var(var_cell_t *var_cell) -{ - cell_queue_entry_t *q = NULL; - - tor_assert(var_cell); - - q = tor_malloc(sizeof(*q)); - q->type = CELL_QUEUE_VAR; - q->u.var.var_cell = var_cell; - - return q; -} - -/** - * Ask how big the cell contained in a cell_queue_entry_t is - */ - -static size_t -channel_get_cell_queue_entry_size(channel_t *chan, cell_queue_entry_t *q) -{ - size_t rv = 0; - - tor_assert(chan); - tor_assert(q); - - switch (q->type) { - case CELL_QUEUE_FIXED: - rv = get_cell_network_size(chan->wide_circ_ids); - break; - case CELL_QUEUE_VAR: - rv = get_var_cell_header_size(chan->wide_circ_ids) + - (q->u.var.var_cell ? q->u.var.var_cell->payload_len : 0); - break; - case CELL_QUEUE_PACKED: - rv = get_cell_network_size(chan->wide_circ_ids); - break; - default: - tor_assert_nonfatal_unreached_once(); - } - - return rv; -} - -/** - * Write to a channel based on a cell_queue_entry_t - * - * Given a cell_queue_entry_t filled out by the caller, try to send the cell - * and queue it if we can't. - */ - -static void -channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) -{ - int result = 0, sent = 0; - cell_queue_entry_t *tmp = NULL; + int ret = -1; size_t cell_bytes; tor_assert(chan); - tor_assert(q); + tor_assert(cell); /* Assert that the state makes sense for a cell write */ tor_assert(CHANNEL_CAN_HANDLE_CELLS(chan)); { circid_t circ_id; - if (is_destroy_cell(chan, q, &circ_id)) { + if (packed_cell_is_destroy(chan, cell, &circ_id)) { channel_note_destroy_not_pending(chan, circ_id); } } /* For statistical purposes, figure out how big this cell is */ - cell_bytes = channel_get_cell_queue_entry_size(chan, q); + cell_bytes = get_cell_network_size(chan->wide_circ_ids); /* Can we send it right out? If so, try */ - if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) && - CHANNEL_IS_OPEN(chan)) { - /* Pick the right write function for this cell type and save the result */ - switch (q->type) { - case CELL_QUEUE_FIXED: - tor_assert(chan->write_cell); - tor_assert(q->u.fixed.cell); - result = chan->write_cell(chan, q->u.fixed.cell); - break; - case CELL_QUEUE_PACKED: - tor_assert(chan->write_packed_cell); - tor_assert(q->u.packed.packed_cell); - result = chan->write_packed_cell(chan, q->u.packed.packed_cell); - break; - case CELL_QUEUE_VAR: - tor_assert(chan->write_var_cell); - tor_assert(q->u.var.var_cell); - result = chan->write_var_cell(chan, q->u.var.var_cell); - break; - default: - tor_assert(1); - } - - /* Check if we got it out */ - if (result > 0) { - sent = 1; - /* Timestamp for transmission */ - channel_timestamp_xmit(chan); - /* If we're here the queue is empty, so it's drained too */ - channel_timestamp_drained(chan); - /* Update the counter */ - ++(chan->n_cells_xmitted); - chan->n_bytes_xmitted += cell_bytes; - /* Update global counters */ - ++n_channel_cells_queued; - ++n_channel_cells_passed_to_lower_layer; - n_channel_bytes_queued += cell_bytes; - n_channel_bytes_passed_to_lower_layer += cell_bytes; - channel_assert_counter_consistency(); - } + if (!CHANNEL_IS_OPEN(chan)) { + goto done; } - if (!sent) { - /* Not sent, queue it */ - /* - * We have to copy the queue entry passed in, since the caller probably - * used the stack. - */ - tmp = cell_queue_entry_dup(q); - TOR_SIMPLEQ_INSERT_TAIL(&chan->outgoing_queue, tmp, next); - /* Update global counters */ - ++n_channel_cells_queued; - ++n_channel_cells_in_queues; - n_channel_bytes_queued += cell_bytes; - n_channel_bytes_in_queues += cell_bytes; - channel_assert_counter_consistency(); - /* Update channel queue size */ - chan->bytes_in_queue += cell_bytes; - /* Try to process the queue? */ - if (CHANNEL_IS_OPEN(chan)) channel_flush_cells(chan); + /* Write the cell on the connection's outbuf. */ + if (chan->write_packed_cell(chan, cell) < 0) { + goto done; } + /* Timestamp for transmission */ + channel_timestamp_xmit(chan); + /* Update the counter */ + ++(chan->n_cells_xmitted); + chan->n_bytes_xmitted += cell_bytes; + /* Successfully sent the cell. */ + ret = 0; + + done: + return ret; } -/** Write a generic cell type to a channel +/** + * Write a packed cell to a channel. + * + * Write a packed cell to a channel using the write_cell() method. This is + * called by the transport-independent code to deliver a packed cell to a + * channel for transmission. * - * Write a generic cell to a channel. It is called by channel_write_cell(), - * channel_write_var_cell() and channel_write_packed_cell() in order to reduce - * code duplication. Notice that it takes cell as pointer of type void, - * this can be dangerous because no type check is performed. + * Return 0 on success else a negative value. In both cases, the caller should + * not access the cell anymore, it is freed both on success and error. */ - -void -channel_write_cell_generic_(channel_t *chan, const char *cell_type, - void *cell, cell_queue_entry_t *q) +int +channel_write_packed_cell(channel_t *chan, packed_cell_t *cell) { + int ret = -1; tor_assert(chan); tor_assert(cell); if (CHANNEL_IS_CLOSING(chan)) { - log_debug(LD_CHANNEL, "Discarding %c %p on closing channel %p with " - "global ID "U64_FORMAT, *cell_type, cell, chan, + log_debug(LD_CHANNEL, "Discarding %p on closing channel %p with " + "global ID "U64_FORMAT, cell, chan, U64_PRINTF_ARG(chan->global_identifier)); - tor_free(cell); - return; + goto end; } log_debug(LD_CHANNEL, - "Writing %c %p to channel %p with global ID " - U64_FORMAT, *cell_type, - cell, chan, U64_PRINTF_ARG(chan->global_identifier)); - - channel_write_cell_queue_entry(chan, q); - /* Update the queue size estimate */ - channel_update_xmit_queue_size(chan); -} - -/** - * Write a cell to a channel - * - * Write a fixed-length cell to a channel using the write_cell() method. - * This is equivalent to the pre-channels connection_or_write_cell_to_buf(); - * it is called by the transport-independent code to deliver a cell to a - * channel for transmission. - */ - -void -channel_write_cell(channel_t *chan, cell_t *cell) -{ - cell_queue_entry_t q; - q.type = CELL_QUEUE_FIXED; - q.u.fixed.cell = cell; - channel_write_cell_generic_(chan, "cell_t", cell, &q); -} + "Writing %p to channel %p with global ID " + U64_FORMAT, cell, chan, U64_PRINTF_ARG(chan->global_identifier)); -/** - * Write a packed cell to a channel - * - * Write a packed cell to a channel using the write_cell() method. This is - * called by the transport-independent code to deliver a packed cell to a - * channel for transmission. - */ + ret = write_packed_cell(chan, cell); -void -channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell) -{ - cell_queue_entry_t q; - q.type = CELL_QUEUE_PACKED; - q.u.packed.packed_cell = packed_cell; - channel_write_cell_generic_(chan, "packed_cell_t", packed_cell, &q); + end: + /* Whatever happens, we free the cell. Either an error occurred or the cell + * was put on the connection outbuf, both cases we have ownership of the + * cell and we free it. */ + packed_cell_free(cell); + return ret; } /** - * Write a variable-length cell to a channel - * - * Write a variable-length cell to a channel using the write_cell() method. - * This is equivalent to the pre-channels - * connection_or_write_var_cell_to_buf(); it's called by the transport- - * independent code to deliver a var_cell to a channel for transmission. - */ - -void -channel_write_var_cell(channel_t *chan, var_cell_t *var_cell) -{ - cell_queue_entry_t q; - q.type = CELL_QUEUE_VAR; - q.u.var.var_cell = var_cell; - channel_write_cell_generic_(chan, "var_cell_t", var_cell, &q); -} - -/** - * Change channel state + * Change channel state. * * This internal and subclass use only function is used to change channel * state, performing all transition validity checks and whatever actions * are appropriate to the state transition in question. */ - static void channel_change_state_(channel_t *chan, channel_state_t to_state) { @@ -2122,15 +1526,6 @@ channel_change_state_(channel_t *chan, channel_state_t to_state) tor_assert(chan->reason_for_closing != CHANNEL_NOT_CLOSING); } - /* - * We need to maintain the queues here for some transitions: - * when we enter CHANNEL_STATE_OPEN (especially from CHANNEL_STATE_MAINT) - * we may have a backlog of cells to transmit, so drain the queues in - * that case, and when going to CHANNEL_STATE_CLOSED the subclass - * should have made sure to finish sending things (or gone to - * CHANNEL_STATE_ERROR if not possible), so we assert for that here. - */ - log_debug(LD_CHANNEL, "Changing state of channel %p (global ID " U64_FORMAT ") from \"%s\" to \"%s\"", @@ -2187,36 +1582,6 @@ channel_change_state_(channel_t *chan, channel_state_t to_state) } else if (to_state == CHANNEL_STATE_MAINT) { scheduler_channel_doesnt_want_writes(chan); } - - /* - * If we're closing, this channel no longer counts toward the global - * estimated queue size; if we're open, it now does. - */ - if ((to_state == CHANNEL_STATE_CLOSING || - to_state == CHANNEL_STATE_CLOSED || - to_state == CHANNEL_STATE_ERROR) && - (from_state == CHANNEL_STATE_OPEN || - from_state == CHANNEL_STATE_MAINT)) { - estimated_total_queue_size -= chan->bytes_in_queue; - } - - /* - * If we're opening, this channel now does count toward the global - * estimated queue size. - */ - if ((to_state == CHANNEL_STATE_OPEN || - to_state == CHANNEL_STATE_MAINT) && - !(from_state == CHANNEL_STATE_OPEN || - from_state == CHANNEL_STATE_MAINT)) { - estimated_total_queue_size += chan->bytes_in_queue; - } - - if (to_state == CHANNEL_STATE_CLOSED || - to_state == CHANNEL_STATE_ERROR) { - /* Assert that all queues are empty */ - tor_assert(TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)); - tor_assert(TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)); - } } /** @@ -2240,22 +1605,15 @@ channel_change_state_open(channel_t *chan) /* Tell circuits if we opened and stuff */ channel_do_open_actions(chan); chan->has_been_open = 1; - - /* Check for queued cells to process */ - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - channel_process_cells(chan); - if (! TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) - channel_flush_cells(chan); } /** - * Change channel listener state + * Change channel listener state. * * This internal and subclass use only function is used to change channel * listener state, performing all transition validity checks and whatever * actions are appropriate to the state transition in question. */ - void channel_listener_change_state(channel_listener_t *chan_l, channel_listener_state_t to_state) @@ -2287,15 +1645,6 @@ channel_listener_change_state(channel_listener_t *chan_l, tor_assert(chan_l->reason_for_closing != CHANNEL_LISTENER_NOT_CLOSING); } - /* - * We need to maintain the queues here for some transitions: - * when we enter CHANNEL_STATE_OPEN (especially from CHANNEL_STATE_MAINT) - * we may have a backlog of cells to transmit, so drain the queues in - * that case, and when going to CHANNEL_STATE_CLOSED the subclass - * should have made sure to finish sending things (or gone to - * CHANNEL_STATE_ERROR if not possible), so we assert for that here. - */ - log_debug(LD_CHANNEL, "Changing state of channel listener %p (global ID " U64_FORMAT "from \"%s\" to \"%s\"", @@ -2328,30 +1677,39 @@ channel_listener_change_state(channel_listener_t *chan_l, if (to_state == CHANNEL_LISTENER_STATE_CLOSED || to_state == CHANNEL_LISTENER_STATE_ERROR) { - /* Assert that the queue is empty */ tor_assert(!(chan_l->incoming_list) || smartlist_len(chan_l->incoming_list) == 0); } } -/** - * Try to flush cells to the lower layer - * - * this is called by the lower layer to indicate that it wants more cells; - * it will try to write up to num_cells cells from the channel's cell queue or - * from circuits active on that channel, or as many as it has available if - * num_cells == -1. - */ - +/* Maximum number of cells that is allowed to flush at once within + * channel_flush_some_cells(). */ #define MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED 256 +/** + * Try to flush cells of the given channel chan up to a maximum of num_cells. + * + * This is called by the scheduler when it wants to flush cells from the + * channel's circuit queue(s) to the connection outbuf (not yet on the wire). + * + * If the channel is not in state CHANNEL_STATE_OPEN, this does nothing and + * will return 0 meaning no cells were flushed. + * + * If num_cells is -1, we'll try to flush up to the maximum cells allowed + * defined in MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED. + * + * On success, the number of flushed cells are returned and it can never be + * above num_cells. If 0 is returned, no cells were flushed either because the + * channel was not opened or we had no cells on the channel. A negative number + * can NOT be sent back. + * + * This function is part of the fast path. */ MOCK_IMPL(ssize_t, channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) { unsigned int unlimited = 0; ssize_t flushed = 0; - int num_cells_from_circs, clamped_num_cells; - int q_len_before, q_len_after; + int clamped_num_cells; tor_assert(chan); @@ -2360,11 +1718,6 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */ if (CHANNEL_IS_OPEN(chan)) { - /* Try to flush as much as we can that's already queued */ - flushed += channel_flush_some_cells_from_outgoing_queue(chan, - (unlimited ? -1 : num_cells - flushed)); - if (!unlimited && num_cells <= flushed) goto done; - if (circuitmux_num_cells(chan->cmux) > 0) { /* Calculate number of cells, including clamp */ if (unlimited) { @@ -2378,45 +1731,9 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) } } - /* - * Keep track of the change in queue size; we have to count cells - * channel_flush_from_first_active_circuit() writes out directly, - * but not double-count ones we might get later in - * channel_flush_some_cells_from_outgoing_queue() - */ - q_len_before = chan_cell_queue_len(&(chan->outgoing_queue)); - /* Try to get more cells from any active circuits */ - num_cells_from_circs = channel_flush_from_first_active_circuit( + flushed = channel_flush_from_first_active_circuit( chan, clamped_num_cells); - - q_len_after = chan_cell_queue_len(&(chan->outgoing_queue)); - - /* - * If it claims we got some, adjust the flushed counter and consider - * processing the queue again - */ - if (num_cells_from_circs > 0) { - /* - * Adjust flushed by the number of cells counted in - * num_cells_from_circs that didn't go to the cell queue. - */ - - if (q_len_after > q_len_before) { - num_cells_from_circs -= (q_len_after - q_len_before); - if (num_cells_from_circs < 0) num_cells_from_circs = 0; - } - - flushed += num_cells_from_circs; - - /* Now process the queue if necessary */ - - if ((q_len_after > q_len_before) && - (unlimited || (flushed < num_cells))) { - flushed += channel_flush_some_cells_from_outgoing_queue(chan, - (unlimited ? -1 : num_cells - flushed)); - } - } } } @@ -2425,197 +1742,16 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) } /** - * Flush cells from just the channel's outgoing cell queue + * Check if any cells are available. * - * This gets called from channel_flush_some_cells() above to flush cells - * just from the queue without trying for active_circuits. + * This is used by the scheduler to know if the channel has more to flush + * after a scheduling round. */ - -static ssize_t -channel_flush_some_cells_from_outgoing_queue(channel_t *chan, - ssize_t num_cells) -{ - unsigned int unlimited = 0; - ssize_t flushed = 0; - cell_queue_entry_t *q = NULL; - size_t cell_size; - int free_q = 0, handed_off = 0; - - tor_assert(chan); - tor_assert(chan->write_cell); - tor_assert(chan->write_packed_cell); - tor_assert(chan->write_var_cell); - - if (num_cells < 0) unlimited = 1; - if (!unlimited && num_cells <= flushed) return 0; - - /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */ - if (CHANNEL_IS_OPEN(chan)) { - while ((unlimited || num_cells > flushed) && - NULL != (q = TOR_SIMPLEQ_FIRST(&chan->outgoing_queue))) { - free_q = 0; - handed_off = 0; - - /* Figure out how big it is for statistical purposes */ - cell_size = channel_get_cell_queue_entry_size(chan, q); - /* - * Okay, we have a good queue entry, try to give it to the lower - * layer. - */ - switch (q->type) { - case CELL_QUEUE_FIXED: - if (q->u.fixed.cell) { - if (chan->write_cell(chan, - q->u.fixed.cell)) { - ++flushed; - channel_timestamp_xmit(chan); - ++(chan->n_cells_xmitted); - chan->n_bytes_xmitted += cell_size; - free_q = 1; - handed_off = 1; - } - /* Else couldn't write it; leave it on the queue */ - } else { - /* This shouldn't happen */ - log_info(LD_CHANNEL, - "Saw broken cell queue entry of type CELL_QUEUE_FIXED " - "with no cell on channel %p " - "(global ID " U64_FORMAT ").", - chan, U64_PRINTF_ARG(chan->global_identifier)); - /* Throw it away */ - free_q = 1; - handed_off = 0; - } - break; - case CELL_QUEUE_PACKED: - if (q->u.packed.packed_cell) { - if (chan->write_packed_cell(chan, - q->u.packed.packed_cell)) { - ++flushed; - channel_timestamp_xmit(chan); - ++(chan->n_cells_xmitted); - chan->n_bytes_xmitted += cell_size; - free_q = 1; - handed_off = 1; - } - /* Else couldn't write it; leave it on the queue */ - } else { - /* This shouldn't happen */ - log_info(LD_CHANNEL, - "Saw broken cell queue entry of type CELL_QUEUE_PACKED " - "with no cell on channel %p " - "(global ID " U64_FORMAT ").", - chan, U64_PRINTF_ARG(chan->global_identifier)); - /* Throw it away */ - free_q = 1; - handed_off = 0; - } - break; - case CELL_QUEUE_VAR: - if (q->u.var.var_cell) { - if (chan->write_var_cell(chan, - q->u.var.var_cell)) { - ++flushed; - channel_timestamp_xmit(chan); - ++(chan->n_cells_xmitted); - chan->n_bytes_xmitted += cell_size; - free_q = 1; - handed_off = 1; - } - /* Else couldn't write it; leave it on the queue */ - } else { - /* This shouldn't happen */ - log_info(LD_CHANNEL, - "Saw broken cell queue entry of type CELL_QUEUE_VAR " - "with no cell on channel %p " - "(global ID " U64_FORMAT ").", - chan, U64_PRINTF_ARG(chan->global_identifier)); - /* Throw it away */ - free_q = 1; - handed_off = 0; - } - break; - default: - /* Unknown type, log and free it */ - log_info(LD_CHANNEL, - "Saw an unknown cell queue entry type %d on channel %p " - "(global ID " U64_FORMAT "; ignoring it." - " Someone should fix this.", - q->type, chan, U64_PRINTF_ARG(chan->global_identifier)); - free_q = 1; - handed_off = 0; - } - - /* - * if free_q is set, we used it and should remove the queue entry; - * we have to do the free down here so TOR_SIMPLEQ_REMOVE_HEAD isn't - * accessing freed memory - */ - if (free_q) { - TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next); - /* - * ...and we handed a cell off to the lower layer, so we should - * update the counters. - */ - ++n_channel_cells_passed_to_lower_layer; - --n_channel_cells_in_queues; - n_channel_bytes_passed_to_lower_layer += cell_size; - n_channel_bytes_in_queues -= cell_size; - channel_assert_counter_consistency(); - /* Update the channel's queue size too */ - chan->bytes_in_queue -= cell_size; - /* Finally, free q */ - cell_queue_entry_free(q, handed_off); - q = NULL; - } else { - /* No cell removed from list, so we can't go on any further */ - break; - } - } - } - - /* Did we drain the queue? */ - if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) { - channel_timestamp_drained(chan); - } - - /* Update the estimate queue size */ - channel_update_xmit_queue_size(chan); - - return flushed; -} - -/** - * Flush as many cells as we possibly can from the queue - * - * This tries to flush as many cells from the queue as the lower layer - * will take. It just calls channel_flush_some_cells_from_outgoing_queue() - * in unlimited mode. - */ - -void -channel_flush_cells(channel_t *chan) -{ - channel_flush_some_cells_from_outgoing_queue(chan, -1); -} - -/** - * Check if any cells are available - * - * This gets used from the lower layer to check if any more cells are - * available. - */ - MOCK_IMPL(int, channel_more_to_flush, (channel_t *chan)) { tor_assert(chan); - /* Check if we have any queued */ - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - return 1; - - /* Check if any circuits would like to queue some */ if (circuitmux_num_cells(chan->cmux) > 0) return 1; /* Else no */ @@ -2623,12 +1759,11 @@ channel_more_to_flush, (channel_t *chan)) } /** - * Notify the channel we're done flushing the output in the lower layer + * Notify the channel we're done flushing the output in the lower layer. * * Connection.c will call this when we've flushed the output; there's some * dirreq-related maintenance to do. */ - void channel_notify_flushed(channel_t *chan) { @@ -2641,12 +1776,11 @@ channel_notify_flushed(channel_t *chan) } /** - * Process the queue of incoming channels on a listener + * Process the queue of incoming channels on a listener. * * Use a listener's registered callback to process as many entries in the * queue of incoming channels as possible. */ - void channel_listener_process_incoming(channel_listener_t *listener) { @@ -2688,18 +1822,17 @@ channel_listener_process_incoming(channel_listener_t *listener) } /** - * Take actions required when a channel becomes open + * Take actions required when a channel becomes open. * * Handle actions we should do when we know a channel is open; a lot of * this comes from the old connection_or_set_state_open() of connection_or.c. * * Because of this mechanism, future channel_t subclasses should take care - * not to change a channel to from CHANNEL_STATE_OPENING to CHANNEL_STATE_OPEN + * not to change a channel from CHANNEL_STATE_OPENING to CHANNEL_STATE_OPEN * until there is positive confirmation that the network is operational. * In particular, anything UDP-based should not make this transition until a * packet is received from the other side. */ - void channel_do_open_actions(channel_t *chan) { @@ -2714,11 +1847,10 @@ channel_do_open_actions(channel_t *chan) if (started_here) { circuit_build_times_network_is_live(get_circuit_build_times_mutable()); - rep_hist_note_connect_succeeded(chan->identity_digest, now); router_set_status(chan->identity_digest, 1); } else { - /* only report it to the geoip module if it's not a known router */ - if (!connection_or_digest_is_known_relay(chan->identity_digest)) { + /* only report it to the geoip module if it's a client */ + if (channel_is_client(chan)) { if (channel_get_addr_if_possible(chan, &remote_addr)) { char *transport_name = NULL; channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan); @@ -2755,7 +1887,7 @@ channel_do_open_actions(channel_t *chan) CHANNELPADDING_SOS_PARAM, CHANNELPADDING_SOS_DEFAULT, 0, 1)) { /* Disable if we're using RSOS and the consensus disabled padding - * for RSOS*/ + * for RSOS */ channelpadding_disable_padding_on_channel(chan); } else if (get_options()->ReducedConnectionPadding) { /* Padding can be forced and/or reduced by clients, regardless of if @@ -2768,13 +1900,12 @@ channel_do_open_actions(channel_t *chan) } /** - * Queue an incoming channel on a listener + * Queue an incoming channel on a listener. * * Internal and subclass use only function to queue an incoming channel from * a listener. A subclass of channel_listener_t should call this when a new * incoming channel is created. */ - void channel_listener_queue_incoming(channel_listener_t *listener, channel_t *incoming) @@ -2824,207 +1955,31 @@ channel_listener_queue_incoming(channel_listener_t *listener, } /** - * Process queued incoming cells - * - * Process as many queued cells as we can from the incoming - * cell queue. + * Process a cell from the given channel. */ - void -channel_process_cells(channel_t *chan) +channel_process_cell(channel_t *chan, cell_t *cell) { - cell_queue_entry_t *q; tor_assert(chan); tor_assert(CHANNEL_IS_CLOSING(chan) || CHANNEL_IS_MAINT(chan) || CHANNEL_IS_OPEN(chan)); - - log_debug(LD_CHANNEL, - "Processing as many incoming cells as we can for channel %p", - chan); - - /* Nothing we can do if we have no registered cell handlers */ - if (!(chan->cell_handler || - chan->var_cell_handler)) return; - /* Nothing we can do if we have no cells */ - if (TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) return; - - /* - * Process cells until we're done or find one we have no current handler - * for. - * - * We must free the cells here after calling the handler, since custody - * of the buffer was given to the channel layer when they were queued; - * see comments on memory management in channel_queue_cell() and in - * channel_queue_var_cell() below. - */ - while (NULL != (q = TOR_SIMPLEQ_FIRST(&chan->incoming_queue))) { - tor_assert(q); - tor_assert(q->type == CELL_QUEUE_FIXED || - q->type == CELL_QUEUE_VAR); - - if (q->type == CELL_QUEUE_FIXED && - chan->cell_handler) { - /* Handle a fixed-length cell */ - TOR_SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next); - tor_assert(q->u.fixed.cell); - log_debug(LD_CHANNEL, - "Processing incoming cell_t %p for channel %p (global ID " - U64_FORMAT ")", - q->u.fixed.cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - chan->cell_handler(chan, q->u.fixed.cell); - tor_free(q->u.fixed.cell); - tor_free(q); - } else if (q->type == CELL_QUEUE_VAR && - chan->var_cell_handler) { - /* Handle a variable-length cell */ - TOR_SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next); - tor_assert(q->u.var.var_cell); - log_debug(LD_CHANNEL, - "Processing incoming var_cell_t %p for channel %p (global ID " - U64_FORMAT ")", - q->u.var.var_cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - chan->var_cell_handler(chan, q->u.var.var_cell); - tor_free(q->u.var.var_cell); - tor_free(q); - } else { - /* Can't handle this one */ - break; - } - } -} - -/** - * Queue incoming cell - * - * This should be called by a channel_t subclass to queue an incoming fixed- - * length cell for processing, and process it if possible. - */ - -void -channel_queue_cell(channel_t *chan, cell_t *cell) -{ - int need_to_queue = 0; - cell_queue_entry_t *q; - cell_t *cell_copy = NULL; - - tor_assert(chan); tor_assert(cell); - tor_assert(CHANNEL_IS_OPEN(chan)); - /* Do we need to queue it, or can we just call the handler right away? */ - if (!(chan->cell_handler)) need_to_queue = 1; - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - need_to_queue = 1; + /* Nothing we can do if we have no registered cell handlers */ + if (!chan->cell_handler) + return; /* Timestamp for receiving */ channel_timestamp_recv(chan); - - /* Update the counters */ + /* Update received counter. */ ++(chan->n_cells_recved); chan->n_bytes_recved += get_cell_network_size(chan->wide_circ_ids); - /* If we don't need to queue we can just call cell_handler */ - if (!need_to_queue) { - tor_assert(chan->cell_handler); - log_debug(LD_CHANNEL, - "Directly handling incoming cell_t %p for channel %p " - "(global ID " U64_FORMAT ")", - cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - chan->cell_handler(chan, cell); - } else { - /* - * Otherwise queue it and then process the queue if possible. - * - * We queue a copy, not the original pointer - it might have been on the - * stack in connection_or_process_cells_from_inbuf() (or another caller - * if we ever have a subclass other than channel_tls_t), or be freed - * there after we return. This is the uncommon case; the non-copying - * fast path occurs in the if (!need_to_queue) case above when the - * upper layer has installed cell handlers. - */ - cell_copy = tor_malloc_zero(sizeof(cell_t)); - memcpy(cell_copy, cell, sizeof(cell_t)); - q = cell_queue_entry_new_fixed(cell_copy); - log_debug(LD_CHANNEL, - "Queueing incoming cell_t %p for channel %p " - "(global ID " U64_FORMAT ")", - cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - TOR_SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next); - if (chan->cell_handler || - chan->var_cell_handler) { - channel_process_cells(chan); - } - } -} - -/** - * Queue incoming variable-length cell - * - * This should be called by a channel_t subclass to queue an incoming - * variable-length cell for processing, and process it if possible. - */ - -void -channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell) -{ - int need_to_queue = 0; - cell_queue_entry_t *q; - var_cell_t *cell_copy = NULL; - - tor_assert(chan); - tor_assert(var_cell); - tor_assert(CHANNEL_IS_OPEN(chan)); - - /* Do we need to queue it, or can we just call the handler right away? */ - if (!(chan->var_cell_handler)) need_to_queue = 1; - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - need_to_queue = 1; - - /* Timestamp for receiving */ - channel_timestamp_recv(chan); - - /* Update the counter */ - ++(chan->n_cells_recved); - chan->n_bytes_recved += get_var_cell_header_size(chan->wide_circ_ids) + - var_cell->payload_len; - - /* If we don't need to queue we can just call cell_handler */ - if (!need_to_queue) { - tor_assert(chan->var_cell_handler); - log_debug(LD_CHANNEL, - "Directly handling incoming var_cell_t %p for channel %p " - "(global ID " U64_FORMAT ")", - var_cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - chan->var_cell_handler(chan, var_cell); - } else { - /* - * Otherwise queue it and then process the queue if possible. - * - * We queue a copy, not the original pointer - it might have been on the - * stack in connection_or_process_cells_from_inbuf() (or another caller - * if we ever have a subclass other than channel_tls_t), or be freed - * there after we return. This is the uncommon case; the non-copying - * fast path occurs in the if (!need_to_queue) case above when the - * upper layer has installed cell handlers. - */ - cell_copy = var_cell_copy(var_cell); - q = cell_queue_entry_new_var(cell_copy); - log_debug(LD_CHANNEL, - "Queueing incoming var_cell_t %p for channel %p " - "(global ID " U64_FORMAT ")", - var_cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - TOR_SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next); - if (chan->cell_handler || - chan->var_cell_handler) { - channel_process_cells(chan); - } - } + log_debug(LD_CHANNEL, + "Processing incoming cell_t %p for channel %p (global ID " + U64_FORMAT ")", cell, chan, + U64_PRINTF_ARG(chan->global_identifier)); + chan->cell_handler(chan, cell); } /** If <b>packed_cell</b> on <b>chan</b> is a destroy cell, then set @@ -3051,51 +2006,12 @@ packed_cell_is_destroy(channel_t *chan, } /** - * Assert that the global channel stats counters are internally consistent - */ - -static void -channel_assert_counter_consistency(void) -{ - tor_assert(n_channel_cells_queued == - (n_channel_cells_in_queues + n_channel_cells_passed_to_lower_layer)); - tor_assert(n_channel_bytes_queued == - (n_channel_bytes_in_queues + n_channel_bytes_passed_to_lower_layer)); -} - -/* DOCDOC */ -static int -is_destroy_cell(channel_t *chan, - const cell_queue_entry_t *q, circid_t *circid_out) -{ - *circid_out = 0; - switch (q->type) { - case CELL_QUEUE_FIXED: - if (q->u.fixed.cell->command == CELL_DESTROY) { - *circid_out = q->u.fixed.cell->circ_id; - return 1; - } - break; - case CELL_QUEUE_VAR: - if (q->u.var.var_cell->command == CELL_DESTROY) { - *circid_out = q->u.var.var_cell->circ_id; - return 1; - } - break; - case CELL_QUEUE_PACKED: - return packed_cell_is_destroy(chan, q->u.packed.packed_cell, circid_out); - } - return 0; -} - -/** - * Send destroy cell on a channel + * Send destroy cell on a channel. * * Write a destroy cell with circ ID <b>circ_id</b> and reason <b>reason</b> * onto channel <b>chan</b>. Don't perform range-checking on reason: * we may want to propagate reasons from other cells. */ - int channel_send_destroy(circid_t circ_id, channel_t *chan, int reason) { @@ -3131,30 +2047,16 @@ channel_send_destroy(circid_t circ_id, channel_t *chan, int reason) } /** - * Dump channel statistics to the log + * Dump channel statistics to the log. * * This is called from dumpstats() in main.c and spams the log with * statistics on channels. */ - void channel_dumpstats(int severity) { if (all_channels && smartlist_len(all_channels) > 0) { tor_log(severity, LD_GENERAL, - "Channels have queued " U64_FORMAT " bytes in " U64_FORMAT " cells, " - "and handed " U64_FORMAT " bytes in " U64_FORMAT " cells to the lower" - " layer.", - U64_PRINTF_ARG(n_channel_bytes_queued), - U64_PRINTF_ARG(n_channel_cells_queued), - U64_PRINTF_ARG(n_channel_bytes_passed_to_lower_layer), - U64_PRINTF_ARG(n_channel_cells_passed_to_lower_layer)); - tor_log(severity, LD_GENERAL, - "There are currently " U64_FORMAT " bytes in " U64_FORMAT " cells " - "in channel queues.", - U64_PRINTF_ARG(n_channel_bytes_in_queues), - U64_PRINTF_ARG(n_channel_cells_in_queues)); - tor_log(severity, LD_GENERAL, "Dumping statistics about %d channels:", smartlist_len(all_channels)); tor_log(severity, LD_GENERAL, @@ -3176,12 +2078,11 @@ channel_dumpstats(int severity) } /** - * Dump channel listener statistics to the log + * Dump channel listener statistics to the log. * * This is called from dumpstats() in main.c and spams the log with * statistics on channel listeners. */ - void channel_listener_dumpstats(int severity) { @@ -3208,9 +2109,8 @@ channel_listener_dumpstats(int severity) } /** - * Set the cmux policy on all active channels + * Set the cmux policy on all active channels. */ - void channel_set_cmux_policy_everywhere(circuitmux_policy_t *pol) { @@ -3224,12 +2124,11 @@ channel_set_cmux_policy_everywhere(circuitmux_policy_t *pol) } /** - * Clean up channels + * Clean up channels. * * This gets called periodically from run_scheduled_events() in main.c; * it cleans up after closed channels. */ - void channel_run_cleanup(void) { @@ -3251,12 +2150,11 @@ channel_run_cleanup(void) } /** - * Clean up channel listeners + * Clean up channel listeners. * * This gets called periodically from run_scheduled_events() in main.c; * it cleans up after closed channel listeners. */ - void channel_listener_run_cleanup(void) { @@ -3278,9 +2176,8 @@ channel_listener_run_cleanup(void) } /** - * Free a list of channels for channel_free_all() + * Free a list of channels for channel_free_all(). */ - static void channel_free_list(smartlist_t *channels, int mark_for_close) { @@ -3304,15 +2201,14 @@ channel_free_list(smartlist_t *channels, int mark_for_close) if (!CHANNEL_CONDEMNED(curr)) { channel_mark_for_close(curr); } - channel_force_free(curr); + channel_force_xfree(curr); } else channel_free(curr); } SMARTLIST_FOREACH_END(curr); } /** - * Free a list of channel listeners for channel_free_all() + * Free a list of channel listeners for channel_free_all(). */ - static void channel_listener_free_list(smartlist_t *listeners, int mark_for_close) { @@ -3333,20 +2229,19 @@ channel_listener_free_list(smartlist_t *listeners, int mark_for_close) curr->state == CHANNEL_LISTENER_STATE_ERROR)) { channel_listener_mark_for_close(curr); } - channel_listener_force_free(curr); + channel_listener_force_xfree(curr); } else channel_listener_free(curr); } SMARTLIST_FOREACH_END(curr); } /** - * Close all channels and free everything + * Close all channels and free everything. * * This gets called from tor_free_all() in main.c to clean up on exit. * It will close all registered channels and free associated storage, * then free the all_channels, active_channels, listening_channels and * finished_channels lists and also channel_identity_map. */ - void channel_free_all(void) { @@ -3411,7 +2306,7 @@ channel_free_all(void) } /** - * Connect to a given addr/port/digest + * Connect to a given addr/port/digest. * * This sets up a new outgoing channel; in the future if multiple * channel_t subclasses are available, this is where the selection policy @@ -3420,7 +2315,6 @@ channel_free_all(void) * single abstract object encapsulating all the protocol details of * how to contact an OR. */ - channel_t * channel_connect(const tor_addr_t *addr, uint16_t port, const char *id_digest, @@ -3430,7 +2324,7 @@ channel_connect(const tor_addr_t *addr, uint16_t port, } /** - * Decide which of two channels to prefer for extending a circuit + * Decide which of two channels to prefer for extending a circuit. * * This function is called while extending a circuit and returns true iff * a is 'better' than b. The most important criterion here is that a @@ -3439,7 +2333,6 @@ channel_connect(const tor_addr_t *addr, uint16_t port, * * This is based on the former connection_or_is_better() of connection_or.c */ - int channel_is_better(channel_t *a, channel_t *b) { @@ -3491,7 +2384,7 @@ channel_is_better(channel_t *a, channel_t *b) } /** - * Get a channel to extend a circuit + * Get a channel to extend a circuit. * * Pick a suitable channel to extend a circuit to given the desired digest * the address we believe is correct for that digest; this tries to see @@ -3500,7 +2393,6 @@ channel_is_better(channel_t *a, channel_t *b) * and our next action, and set *launch_out to a boolean indicated whether * the caller should try to launch a new channel with channel_connect(). */ - channel_t * channel_get_for_extend(const char *rsa_id_digest, const ed25519_public_key_t *ed_id, @@ -3605,12 +2497,11 @@ channel_get_for_extend(const char *rsa_id_digest, } /** - * Describe the transport subclass for a channel + * Describe the transport subclass for a channel. * * Invoke a method to get a string description of the lower-layer * transport for this channel. */ - const char * channel_describe_transport(channel_t *chan) { @@ -3621,12 +2512,11 @@ channel_describe_transport(channel_t *chan) } /** - * Describe the transport subclass for a channel listener + * Describe the transport subclass for a channel listener. * * Invoke a method to get a string description of the lower-layer * transport for this channel listener. */ - const char * channel_listener_describe_transport(channel_listener_t *chan_l) { @@ -3637,24 +2527,10 @@ channel_listener_describe_transport(channel_listener_t *chan_l) } /** - * Return the number of entries in <b>queue</b> - */ -STATIC int -chan_cell_queue_len(const chan_cell_queue_t *queue) -{ - int r = 0; - cell_queue_entry_t *cell; - TOR_SIMPLEQ_FOREACH(cell, queue, next) - ++r; - return r; -} - -/** - * Dump channel statistics + * Dump channel statistics. * - * Dump statistics for one channel to the log + * Dump statistics for one channel to the log. */ - MOCK_IMPL(void, channel_dump_statistics, (channel_t *chan, int severity)) { @@ -3684,35 +2560,18 @@ channel_dump_statistics, (channel_t *chan, int severity)) U64_PRINTF_ARG(chan->timestamp_active), U64_PRINTF_ARG(now - chan->timestamp_active)); - /* Handle digest and nickname */ + /* Handle digest. */ if (!tor_digest_is_zero(chan->identity_digest)) { - if (chan->nickname) { - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " says it is connected " - "to an OR with digest %s and nickname %s", - U64_PRINTF_ARG(chan->global_identifier), - hex_str(chan->identity_digest, DIGEST_LEN), - chan->nickname); - } else { - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " says it is connected " - "to an OR with digest %s and no known nickname", - U64_PRINTF_ARG(chan->global_identifier), - hex_str(chan->identity_digest, DIGEST_LEN)); - } + tor_log(severity, LD_GENERAL, + " * Channel " U64_FORMAT " says it is connected " + "to an OR with digest %s", + U64_PRINTF_ARG(chan->global_identifier), + hex_str(chan->identity_digest, DIGEST_LEN)); } else { - if (chan->nickname) { - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " does not know the digest" - " of the OR it is connected to, but reports its nickname is %s", - U64_PRINTF_ARG(chan->global_identifier), - chan->nickname); - } else { - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " does not know the digest" - " or the nickname of the OR it is connected to", - U64_PRINTF_ARG(chan->global_identifier)); - } + tor_log(severity, LD_GENERAL, + " * Channel " U64_FORMAT " does not know the digest" + " of the OR it is connected to", + U64_PRINTF_ARG(chan->global_identifier)); } /* Handle remote address and descriptions */ @@ -3761,14 +2620,6 @@ channel_dump_statistics, (channel_t *chan, int severity)) channel_is_incoming(chan) ? "incoming" : "outgoing"); - /* Describe queues */ - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " has %d queued incoming cells" - " and %d queued outgoing cells", - U64_PRINTF_ARG(chan->global_identifier), - chan_cell_queue_len(&chan->incoming_queue), - chan_cell_queue_len(&chan->outgoing_queue)); - /* Describe circuits */ tor_log(severity, LD_GENERAL, " * Channel " U64_FORMAT " has %d active circuits out of" @@ -3787,12 +2638,6 @@ channel_dump_statistics, (channel_t *chan, int severity)) U64_PRINTF_ARG(chan->timestamp_client), U64_PRINTF_ARG(now - chan->timestamp_client)); tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " was last drained at " - U64_FORMAT " (" U64_FORMAT " seconds ago)", - U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(chan->timestamp_drained), - U64_PRINTF_ARG(now - chan->timestamp_drained)); - tor_log(severity, LD_GENERAL, " * Channel " U64_FORMAT " last received a cell " "at " U64_FORMAT " (" U64_FORMAT " seconds ago)", U64_PRINTF_ARG(chan->global_identifier), @@ -3868,11 +2713,10 @@ channel_dump_statistics, (channel_t *chan, int severity)) } /** - * Dump channel listener statistics + * Dump channel listener statistics. * - * Dump statistics for one channel listener to the log + * Dump statistics for one channel listener to the log. */ - void channel_listener_dump_statistics(channel_listener_t *chan_l, int severity) { @@ -3935,11 +2779,10 @@ channel_listener_dump_statistics(channel_listener_t *chan_l, int severity) } /** - * Invoke transport-specific stats dump for channel + * Invoke transport-specific stats dump for channel. * - * If there is a lower-layer statistics dump method, invoke it + * If there is a lower-layer statistics dump method, invoke it. */ - void channel_dump_transport_statistics(channel_t *chan, int severity) { @@ -3949,11 +2792,10 @@ channel_dump_transport_statistics(channel_t *chan, int severity) } /** - * Invoke transport-specific stats dump for channel listener + * Invoke transport-specific stats dump for channel listener. * - * If there is a lower-layer statistics dump method, invoke it + * If there is a lower-layer statistics dump method, invoke it. */ - void channel_listener_dump_transport_statistics(channel_listener_t *chan_l, int severity) @@ -3964,7 +2806,7 @@ channel_listener_dump_transport_statistics(channel_listener_t *chan_l, } /** - * Return text description of the remote endpoint + * Return text description of the remote endpoint. * * This function return a test provided by the lower layer of the remote * endpoint for this channel; it should specify the actual address connected @@ -3997,7 +2839,7 @@ channel_get_actual_remote_address(channel_t *chan) } /** - * Return text description of the remote endpoint canonical address + * Return text description of the remote endpoint canonical address. * * This function return a test provided by the lower layer of the remote * endpoint for this channel; it should use the known canonical address for @@ -4036,37 +2878,25 @@ channel_get_addr_if_possible,(channel_t *chan, tor_addr_t *addr_out)) } /** - * Check if there are outgoing queue writes on this channel - * - * Indicate if either we have queued cells, or if not, whether the underlying - * lower-layer transport thinks it has an output queue. + * Return true iff the channel has any cells on the connection outbuf waiting + * to be sent onto the network. */ - int channel_has_queued_writes(channel_t *chan) { - int has_writes = 0; - tor_assert(chan); tor_assert(chan->has_queued_writes); - if (! TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) { - has_writes = 1; - } else { - /* Check with the lower layer */ - has_writes = chan->has_queued_writes(chan); - } - - return has_writes; + /* Check with the lower layer */ + return chan->has_queued_writes(chan); } /** - * Check the is_bad_for_new_circs flag + * Check the is_bad_for_new_circs flag. * * This function returns the is_bad_for_new_circs flag of the specified * channel. */ - int channel_is_bad_for_new_circs(channel_t *chan) { @@ -4076,11 +2906,10 @@ channel_is_bad_for_new_circs(channel_t *chan) } /** - * Mark a channel as bad for new circuits + * Mark a channel as bad for new circuits. * * Set the is_bad_for_new_circs_flag on chan. */ - void channel_mark_bad_for_new_circs(channel_t *chan) { @@ -4090,13 +2919,12 @@ channel_mark_bad_for_new_circs(channel_t *chan) } /** - * Get the client flag + * Get the client flag. * * This returns the client flag of a channel, which will be set if * command_process_create_cell() in command.c thinks this is a connection * from a client. */ - int channel_is_client(const channel_t *chan) { @@ -4106,11 +2934,10 @@ channel_is_client(const channel_t *chan) } /** - * Set the client flag + * Set the client flag. * - * Mark a channel as being from a client + * Mark a channel as being from a client. */ - void channel_mark_client(channel_t *chan) { @@ -4120,11 +2947,10 @@ channel_mark_client(channel_t *chan) } /** - * Clear the client flag + * Clear the client flag. * - * Mark a channel as being _not_ from a client + * Mark a channel as being _not_ from a client. */ - void channel_clear_client(channel_t *chan) { @@ -4134,12 +2960,11 @@ channel_clear_client(channel_t *chan) } /** - * Get the canonical flag for a channel + * Get the canonical flag for a channel. * * This returns the is_canonical for a channel; this flag is determined by * the lower layer and can't be set in a transport-independent way. */ - int channel_is_canonical(channel_t *chan) { @@ -4150,12 +2975,11 @@ channel_is_canonical(channel_t *chan) } /** - * Test if the canonical flag is reliable + * Test if the canonical flag is reliable. * * This function asks if the lower layer thinks it's safe to trust the - * result of channel_is_canonical() + * result of channel_is_canonical(). */ - int channel_is_canonical_is_reliable(channel_t *chan) { @@ -4166,12 +2990,11 @@ channel_is_canonical_is_reliable(channel_t *chan) } /** - * Test incoming flag + * Test incoming flag. * * This function gets the incoming flag; this is set when a listener spawns * a channel. If this returns true the channel was remotely initiated. */ - int channel_is_incoming(channel_t *chan) { @@ -4181,12 +3004,11 @@ channel_is_incoming(channel_t *chan) } /** - * Set the incoming flag + * Set the incoming flag. * * This function is called when a channel arrives on a listening channel * to mark it as incoming. */ - void channel_mark_incoming(channel_t *chan) { @@ -4196,7 +3018,7 @@ channel_mark_incoming(channel_t *chan) } /** - * Test local flag + * Test local flag. * * This function gets the local flag; the lower layer should set this when * setting up the channel if is_local_addr() is true for all of the @@ -4204,7 +3026,6 @@ channel_mark_incoming(channel_t *chan) * used to decide whether to declare the network reachable when seeing incoming * traffic on the channel. */ - int channel_is_local(channel_t *chan) { @@ -4214,13 +3035,12 @@ channel_is_local(channel_t *chan) } /** - * Set the local flag + * Set the local flag. * * This internal-only function should be called by the lower layer if the * channel is to a local address. See channel_is_local() above or the - * description of the is_local bit in channel.h + * description of the is_local bit in channel.h. */ - void channel_mark_local(channel_t *chan) { @@ -4230,14 +3050,13 @@ channel_mark_local(channel_t *chan) } /** - * Mark a channel as remote + * Mark a channel as remote. * * This internal-only function should be called by the lower layer if the * channel is not to a local address but has previously been marked local. * See channel_is_local() above or the description of the is_local bit in * channel.h */ - void channel_mark_remote(channel_t *chan) { @@ -4247,13 +3066,12 @@ channel_mark_remote(channel_t *chan) } /** - * Test outgoing flag + * Test outgoing flag. * * This function gets the outgoing flag; this is the inverse of the incoming * bit set when a listener spawns a channel. If this returns true the channel * was locally initiated. */ - int channel_is_outgoing(channel_t *chan) { @@ -4263,12 +3081,11 @@ channel_is_outgoing(channel_t *chan) } /** - * Mark a channel as outgoing + * Mark a channel as outgoing. * * This function clears the incoming flag and thus marks a channel as * outgoing. */ - void channel_mark_outgoing(channel_t *chan) { @@ -4281,24 +3098,11 @@ channel_mark_outgoing(channel_t *chan) * Flow control queries * ***********************/ -/* - * Get the latest estimate for the total queue size of all open channels - */ - -uint64_t -channel_get_global_queue_estimate(void) -{ - return estimated_total_queue_size; -} - -/* - * Estimate the number of writeable cells +/** + * Estimate the number of writeable cells. * - * Ask the lower layer for an estimate of how many cells it can accept, and - * then subtract the length of our outgoing_queue, if any, to produce an - * estimate of the number of cells this channel can accept for writes. + * Ask the lower layer for an estimate of how many cells it can accept. */ - int channel_num_cells_writeable(channel_t *chan) { @@ -4310,8 +3114,6 @@ channel_num_cells_writeable(channel_t *chan) if (chan->state == CHANNEL_STATE_OPEN) { /* Query lower layer */ result = chan->num_cells_writeable(chan); - /* Subtract cell queue length, if any */ - result -= chan_cell_queue_len(&chan->outgoing_queue); if (result < 0) result = 0; } else { /* No cells are writeable in any other state */ @@ -4326,12 +3128,11 @@ channel_num_cells_writeable(channel_t *chan) ********************/ /** - * Update the created timestamp for a channel + * Update the created timestamp for a channel. * * This updates the channel's created timestamp and should only be called * from channel_init(). */ - void channel_timestamp_created(channel_t *chan) { @@ -4343,12 +3144,11 @@ channel_timestamp_created(channel_t *chan) } /** - * Update the created timestamp for a channel listener + * Update the created timestamp for a channel listener. * * This updates the channel listener's created timestamp and should only be * called from channel_init_listener(). */ - void channel_listener_timestamp_created(channel_listener_t *chan_l) { @@ -4360,7 +3160,7 @@ channel_listener_timestamp_created(channel_listener_t *chan_l) } /** - * Update the last active timestamp for a channel + * Update the last active timestamp for a channel. * * This function updates the channel's last active timestamp; it should be * called by the lower layer whenever there is activity on the channel which @@ -4369,25 +3169,23 @@ channel_listener_timestamp_created(channel_listener_t *chan_l) * but it should be updated for things like the v3 handshake and stuff that * produce activity only visible to the lower layer. */ - void channel_timestamp_active(channel_t *chan) { time_t now = time(NULL); tor_assert(chan); - chan->timestamp_xfer_ms = monotime_coarse_absolute_msec(); + monotime_coarse_get(&chan->timestamp_xfer); chan->timestamp_active = now; /* Clear any potential netflow padding timer. We're active */ - chan->next_padding_time_ms = 0; + monotime_coarse_zero(&chan->next_padding_time); } /** - * Update the last active timestamp for a channel listener + * Update the last active timestamp for a channel listener. */ - void channel_listener_timestamp_active(channel_listener_t *chan_l) { @@ -4405,7 +3203,6 @@ channel_listener_timestamp_active(channel_listener_t *chan_l) * should be called whenever a new incoming channel is accepted on a * listener. */ - void channel_listener_timestamp_accepted(channel_listener_t *chan_l) { @@ -4418,12 +3215,11 @@ channel_listener_timestamp_accepted(channel_listener_t *chan_l) } /** - * Update client timestamp + * Update client timestamp. * * This function is called by relay.c to timestamp a channel that appears to * be used as a client. */ - void channel_timestamp_client(channel_t *chan) { @@ -4435,64 +3231,44 @@ channel_timestamp_client(channel_t *chan) } /** - * Update the last drained timestamp - * - * This is called whenever we transmit a cell which leaves the outgoing cell - * queue completely empty. It also updates the xmit time and the active time. - */ - -void -channel_timestamp_drained(channel_t *chan) -{ - time_t now = time(NULL); - - tor_assert(chan); - - chan->timestamp_active = now; - chan->timestamp_drained = now; - chan->timestamp_xmit = now; -} - -/** - * Update the recv timestamp + * Update the recv timestamp. * * This is called whenever we get an incoming cell from the lower layer. * This also updates the active timestamp. */ - void channel_timestamp_recv(channel_t *chan) { time_t now = time(NULL); tor_assert(chan); - chan->timestamp_xfer_ms = monotime_coarse_absolute_msec(); + monotime_coarse_get(&chan->timestamp_xfer); chan->timestamp_active = now; chan->timestamp_recv = now; /* Clear any potential netflow padding timer. We're active */ - chan->next_padding_time_ms = 0; + monotime_coarse_zero(&chan->next_padding_time); } /** - * Update the xmit timestamp + * Update the xmit timestamp. + * * This is called whenever we pass an outgoing cell to the lower layer. This * also updates the active timestamp. */ - void channel_timestamp_xmit(channel_t *chan) { time_t now = time(NULL); tor_assert(chan); - chan->timestamp_xfer_ms = monotime_coarse_absolute_msec(); + monotime_coarse_get(&chan->timestamp_xfer); chan->timestamp_active = now; chan->timestamp_xmit = now; /* Clear any potential netflow padding timer. We're active */ - chan->next_padding_time_ms = 0; + monotime_coarse_zero(&chan->next_padding_time); } /*************************************************************** @@ -4500,9 +3276,8 @@ channel_timestamp_xmit(channel_t *chan) **************************************************************/ /** - * Query created timestamp for a channel + * Query created timestamp for a channel. */ - time_t channel_when_created(channel_t *chan) { @@ -4512,57 +3287,8 @@ channel_when_created(channel_t *chan) } /** - * Query created timestamp for a channel listener - */ - -time_t -channel_listener_when_created(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - return chan_l->timestamp_created; -} - -/** - * Query last active timestamp for a channel - */ - -time_t -channel_when_last_active(channel_t *chan) -{ - tor_assert(chan); - - return chan->timestamp_active; -} - -/** - * Query last active timestamp for a channel listener - */ - -time_t -channel_listener_when_last_active(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - return chan_l->timestamp_active; -} - -/** - * Query last accepted timestamp for a channel listener - */ - -time_t -channel_listener_when_last_accepted(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - return chan_l->timestamp_accepted; -} - -/** - * Query client timestamp + * Query client timestamp. */ - time_t channel_when_last_client(channel_t *chan) { @@ -4572,33 +3298,8 @@ channel_when_last_client(channel_t *chan) } /** - * Query drained timestamp - */ - -time_t -channel_when_last_drained(channel_t *chan) -{ - tor_assert(chan); - - return chan->timestamp_drained; -} - -/** - * Query recv timestamp - */ - -time_t -channel_when_last_recv(channel_t *chan) -{ - tor_assert(chan); - - return chan->timestamp_recv; -} - -/** - * Query xmit timestamp + * Query xmit timestamp. */ - time_t channel_when_last_xmit(channel_t *chan) { @@ -4608,48 +3309,11 @@ channel_when_last_xmit(channel_t *chan) } /** - * Query accepted counter - */ - -uint64_t -channel_listener_count_accepted(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - return chan_l->n_accepted; -} - -/** - * Query received cell counter - */ - -uint64_t -channel_count_recved(channel_t *chan) -{ - tor_assert(chan); - - return chan->n_cells_recved; -} - -/** - * Query transmitted cell counter - */ - -uint64_t -channel_count_xmitted(channel_t *chan) -{ - tor_assert(chan); - - return chan->n_cells_xmitted; -} - -/** - * Check if a channel matches an extend_info_t + * Check if a channel matches an extend_info_t. * * This function calls the lower layer and asks if this channel matches a * given extend_info_t. */ - int channel_matches_extend_info(channel_t *chan, extend_info_t *extend_info) { @@ -4666,7 +3330,6 @@ channel_matches_extend_info(channel_t *chan, extend_info_t *extend_info) * This function calls into the lower layer and asks if this channel thinks * it matches a given target address for circuit extension purposes. */ - int channel_matches_target_addr_for_extend(channel_t *chan, const tor_addr_t *target) @@ -4679,12 +3342,11 @@ channel_matches_target_addr_for_extend(channel_t *chan, } /** - * Return the total number of circuits used by a channel + * Return the total number of circuits used by a channel. * * @param chan Channel to query * @return Number of circuits using this as n_chan or p_chan */ - unsigned int channel_num_circuits(channel_t *chan) { @@ -4695,10 +3357,10 @@ channel_num_circuits(channel_t *chan) } /** - * Set up circuit ID generation + * Set up circuit ID generation. * * This is called when setting up a channel and replaces the old - * connection_or_set_circid_type() + * connection_or_set_circid_type(). */ MOCK_IMPL(void, channel_set_circid_type,(channel_t *chan, @@ -4734,6 +3396,16 @@ channel_set_circid_type,(channel_t *chan, } } +static int +channel_sort_by_ed25519_identity(const void **a_, const void **b_) +{ + const channel_t *a = *a_, + *b = *b_; + return fast_memcmp(&a->ed25519_identity.pubkey, + &b->ed25519_identity.pubkey, + sizeof(a->ed25519_identity.pubkey)); +} + /** Helper for channel_update_bad_for_new_circs(): Perform the * channel_update_bad_for_new_circs operation on all channels in <b>lst</b>, * all of which MUST have the same RSA ID. (They MAY have different @@ -4742,44 +3414,52 @@ static void channel_rsa_id_group_set_badness(struct channel_list_s *lst, int force) { /*XXXX This function should really be about channels. 15056 */ - channel_t *chan; + channel_t *chan = TOR_LIST_FIRST(lst); + + if (!chan) + return; + + /* if there is only one channel, don't bother looping */ + if (PREDICT_LIKELY(!TOR_LIST_NEXT(chan, next_with_same_id))) { + connection_or_single_set_badness_( + time(NULL), BASE_CHAN_TO_TLS(chan)->conn, force); + return; + } + + smartlist_t *channels = smartlist_new(); - /* First, get a minimal list of the ed25519 identites */ - smartlist_t *ed_identities = smartlist_new(); TOR_LIST_FOREACH(chan, lst, next_with_same_id) { - uint8_t *id_copy = - tor_memdup(&chan->ed25519_identity.pubkey, DIGEST256_LEN); - smartlist_add(ed_identities, id_copy); + if (BASE_CHAN_TO_TLS(chan)->conn) { + smartlist_add(channels, chan); + } } - smartlist_sort_digests256(ed_identities); - smartlist_uniq_digests256(ed_identities); - /* Now, for each Ed identity, build a smartlist and find the best entry on - * it. */ + smartlist_sort(channels, channel_sort_by_ed25519_identity); + + const ed25519_public_key_t *common_ed25519_identity = NULL; + /* it would be more efficient to do a slice, but this case is rare */ smartlist_t *or_conns = smartlist_new(); - SMARTLIST_FOREACH_BEGIN(ed_identities, const uint8_t *, ed_id) { - TOR_LIST_FOREACH(chan, lst, next_with_same_id) { - channel_tls_t *chantls = BASE_CHAN_TO_TLS(chan); - if (tor_memneq(ed_id, &chan->ed25519_identity.pubkey, DIGEST256_LEN)) - continue; - or_connection_t *orconn = chantls->conn; - if (orconn) { - tor_assert(orconn->chan == chantls); - smartlist_add(or_conns, orconn); - } + SMARTLIST_FOREACH_BEGIN(channels, channel_t *, channel) { + if (!common_ed25519_identity) + common_ed25519_identity = &channel->ed25519_identity; + + if (! ed25519_pubkey_eq(&channel->ed25519_identity, + common_ed25519_identity)) { + connection_or_group_set_badness_(or_conns, force); + smartlist_clear(or_conns); + common_ed25519_identity = &channel->ed25519_identity; } - connection_or_group_set_badness_(or_conns, force); - smartlist_clear(or_conns); - } SMARTLIST_FOREACH_END(ed_id); + smartlist_add(or_conns, BASE_CHAN_TO_TLS(channel)->conn); + } SMARTLIST_FOREACH_END(channel); + + connection_or_group_set_badness_(or_conns, force); /* XXXX 15056 we may want to do something special with connections that have * no set Ed25519 identity! */ smartlist_free(or_conns); - - SMARTLIST_FOREACH(ed_identities, uint8_t *, ed_id, tor_free(ed_id)); - smartlist_free(ed_identities); + smartlist_free(channels); } /** Go through all the channels (or if <b>digest</b> is non-NULL, just @@ -4809,83 +3489,3 @@ channel_update_bad_for_new_circs(const char *digest, int force) } } -/** - * Update the estimated number of bytes queued to transmit for this channel, - * and notify the scheduler. The estimate includes both the channel queue and - * the queue size reported by the lower layer, and an overhead estimate - * optionally provided by the lower layer. - */ - -void -channel_update_xmit_queue_size(channel_t *chan) -{ - uint64_t queued, adj; - double overhead; - - tor_assert(chan); - tor_assert(chan->num_bytes_queued); - - /* - * First, get the number of bytes we have queued without factoring in - * lower-layer overhead. - */ - queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue; - /* Next, adjust by the overhead factor, if any is available */ - if (chan->get_overhead_estimate) { - overhead = chan->get_overhead_estimate(chan); - if (overhead >= 1.0) { - queued = (uint64_t)(queued * overhead); - } else { - /* Ignore silly overhead factors */ - log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead); - } - } - - /* Now, compare to the previous estimate */ - if (queued > chan->bytes_queued_for_xmit) { - adj = queued - chan->bytes_queued_for_xmit; - log_debug(LD_CHANNEL, - "Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT - " from " U64_FORMAT " to " U64_FORMAT, - U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(adj), - U64_PRINTF_ARG(chan->bytes_queued_for_xmit), - U64_PRINTF_ARG(queued)); - /* Update the channel's estimate */ - chan->bytes_queued_for_xmit = queued; - - /* Update the global queue size estimate if appropriate */ - if (chan->state == CHANNEL_STATE_OPEN || - chan->state == CHANNEL_STATE_MAINT) { - estimated_total_queue_size += adj; - log_debug(LD_CHANNEL, - "Increasing global queue size by " U64_FORMAT " for channel " - U64_FORMAT ", new size is " U64_FORMAT, - U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(estimated_total_queue_size)); - } - } else if (queued < chan->bytes_queued_for_xmit) { - adj = chan->bytes_queued_for_xmit - queued; - log_debug(LD_CHANNEL, - "Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT - " from " U64_FORMAT " to " U64_FORMAT, - U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(adj), - U64_PRINTF_ARG(chan->bytes_queued_for_xmit), - U64_PRINTF_ARG(queued)); - /* Update the channel's estimate */ - chan->bytes_queued_for_xmit = queued; - - /* Update the global queue size estimate if appropriate */ - if (chan->state == CHANNEL_STATE_OPEN || - chan->state == CHANNEL_STATE_MAINT) { - estimated_total_queue_size -= adj; - log_debug(LD_CHANNEL, - "Decreasing global queue size by " U64_FORMAT " for channel " - U64_FORMAT ", new size is " U64_FORMAT, - U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(estimated_total_queue_size)); - } - } -} - |