diff options
Diffstat (limited to 'src/or')
-rw-r--r-- | src/or/channel.c | 1586 | ||||
-rw-r--r-- | src/or/channel.h | 91 | ||||
-rw-r--r-- | src/or/channeltls.c | 13 | ||||
-rw-r--r-- | src/or/circuitbuild.c | 3 | ||||
-rw-r--r-- | src/or/circuitlist.c | 3 | ||||
-rw-r--r-- | src/or/connection_or.c | 3 | ||||
-rw-r--r-- | src/or/relay.c | 129 |
7 files changed, 161 insertions, 1667 deletions
diff --git a/src/or/channel.c b/src/or/channel.c index 7590ba8a5b..5b9d860baf 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -5,9 +5,8 @@ * \file channel.c * * \brief OR/OP-to-OR channel abstraction layer. A channel's job is to - * transfer cells from Tor instance to Tor instance. - * Currently, there is only one implementation of the channel abstraction: in - * channeltls.c. + * transfer cells from Tor instance to Tor instance. Currently, there is only + * one implementation of the channel abstraction: in channeltls.c. * * Channels are a higher-level abstraction than or_connection_t: In general, * any means that two Tor relays use to exchange cells, or any means that a @@ -24,16 +23,28 @@ * connection. * * Every channel implementation is responsible for being able to transmit - * cells that are added to it with channel_write_cell() and related functions, - * and to receive incoming cells with the channel_queue_cell() and related - * functions. See the channel_t documentation for more information. - * - * When new cells arrive on a channel, they are passed to cell handler - * functions, which can be set by channel_set_cell_handlers() - * functions. (Tor's cell handlers are in command.c.) - * - * Tor flushes cells to channels from relay.c in - * channel_flush_from_first_active_circuit(). + * cells that are passed to it + * + * For *inbound* cells, the entry point is: channel_process_cell(). It takes a + * cell and will pass it to the cell handler set by + * channel_set_cell_handlers(). Currently, this is passed back to the command + * subsystem which is command_process_cell(). + * + * NOTE: For now, the seperation between channels and specialized channels + * (like channeltls) is not that well defined. So the channeltls layer calls + * channel_process_cell() which originally comes from the connection subsytem. + * This should be hopefully be fixed with #23993. + * + * For *outbound* cells, the entry point is: channel_write_packed_cell(). + * Only packed cells are dequeued from the circuit queue by the scheduler + * which uses channel_flush_from_first_active_circuit() to decide which cells + * to flush from which circuit on the channel. They are then passed down to + * the channel subsystem. This calls the low layer with the function pointer + * .write_packed_cell(). + * + * Each specialized channel (currently only channeltls_t) MUST implement a + * series of function found in channel_t. See channel.h for more + * documentation. **/ /* @@ -112,59 +123,6 @@ HANDLE_IMPL(channel, channel_s,) /* Counter for ID numbers */ static uint64_t n_channels_allocated = 0; -/* - * Channel global byte/cell counters, for statistics and for scheduler high - * /low-water marks. - */ - -/* - * Total number of cells ever given to any channel with the - * channel_write_*_cell() functions. - */ - -static uint64_t n_channel_cells_queued = 0; - -/* - * Total number of cells ever passed to a channel lower layer with the - * write_*_cell() methods. - */ - -static uint64_t n_channel_cells_passed_to_lower_layer = 0; - -/* - * Current number of cells in all channel queues; should be - * n_channel_cells_queued - n_channel_cells_passed_to_lower_layer. - */ - -static uint64_t n_channel_cells_in_queues = 0; - -/* - * Total number of bytes for all cells ever queued to a channel and - * counted in n_channel_cells_queued. - */ - -static uint64_t n_channel_bytes_queued = 0; - -/* - * Total number of bytes for all cells ever passed to a channel lower layer - * and counted in n_channel_cells_passed_to_lower_layer. - */ - -static uint64_t n_channel_bytes_passed_to_lower_layer = 0; - -/* - * Current number of bytes in all channel queues; should be - * n_channel_bytes_queued - n_channel_bytes_passed_to_lower_layer. - */ - -static uint64_t n_channel_bytes_in_queues = 0; - -/* - * Current total estimated queue size *including lower layer queues and - * transmit overhead* - */ - -STATIC uint64_t estimated_total_queue_size = 0; /* Digest->channel map * @@ -201,40 +159,15 @@ HT_PROTOTYPE(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash, HT_GENERATE2(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash, channel_idmap_eq, 0.5, tor_reallocarray_, tor_free_) -static cell_queue_entry_t * cell_queue_entry_dup(cell_queue_entry_t *q); -#if 0 -static int cell_queue_entry_is_padding(cell_queue_entry_t *q); -#endif -static cell_queue_entry_t * -cell_queue_entry_new_fixed(cell_t *cell); -static cell_queue_entry_t * -cell_queue_entry_new_var(var_cell_t *var_cell); -static int is_destroy_cell(channel_t *chan, - const cell_queue_entry_t *q, circid_t *circid_out); - -static void channel_assert_counter_consistency(void); - /* Functions to maintain the digest map */ -static void channel_add_to_digest_map(channel_t *chan); static void channel_remove_from_digest_map(channel_t *chan); -/* - * Flush cells from just the outgoing queue without trying to get them - * from circuits; used internall by channel_flush_some_cells(). - */ -static ssize_t -channel_flush_some_cells_from_outgoing_queue(channel_t *chan, - ssize_t num_cells); static void channel_force_free(channel_t *chan); static void channel_free_list(smartlist_t *channels, int mark_for_close); static void channel_listener_free_list(smartlist_t *channels, int mark_for_close); static void channel_listener_force_free(channel_listener_t *chan_l); -static size_t channel_get_cell_queue_entry_size(channel_t *chan, - cell_queue_entry_t *q); -static void -channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q); /*********************************** * Channel state utility functions * @@ -628,7 +561,7 @@ channel_listener_unregister(channel_listener_t *chan_l) * already exist. */ -static void +STATIC void channel_add_to_digest_map(channel_t *chan) { channel_idmap_entry_t *ent, search; @@ -676,33 +609,6 @@ channel_remove_from_digest_map(channel_t *chan) /* Assert that there is a digest */ tor_assert(!tor_digest_is_zero(chan->identity_digest)); -#if 0 - /* Make sure we have a map */ - if (!channel_identity_map) { - /* - * No identity map, so we can't find it by definition. This - * case is similar to digestmap_get() failing below. - */ - log_warn(LD_BUG, - "Trying to remove channel %p (global ID " U64_FORMAT ") " - "with digest %s from identity map, but didn't have any identity " - "map", - chan, U64_PRINTF_ARG(chan->global_identifier), - hex_str(chan->identity_digest, DIGEST_LEN)); - /* Clear out its next/prev pointers */ - if (chan->next_with_same_id) { - chan->next_with_same_id->prev_with_same_id = chan->prev_with_same_id; - } - if (chan->prev_with_same_id) { - chan->prev_with_same_id->next_with_same_id = chan->next_with_same_id; - } - chan->next_with_same_id = NULL; - chan->prev_with_same_id = NULL; - - return; - } -#endif /* 0 */ - /* Pull it out of its list, wherever that list is */ TOR_LIST_REMOVE(chan, next_with_same_id); @@ -936,10 +842,6 @@ channel_init(channel_t *chan) /* Warn about exhausted circuit IDs no more than hourly. */ chan->last_warned_circ_ids_exhausted.rate = 3600; - /* Initialize queues. */ - TOR_SIMPLEQ_INIT(&chan->incoming_queue); - TOR_SIMPLEQ_INIT(&chan->outgoing_queue); - /* Initialize list entries. */ memset(&chan->next_with_same_id, 0, sizeof(chan->next_with_same_id)); @@ -1022,8 +924,6 @@ channel_free(channel_t *chan) chan->cmux = NULL; } - /* We're in CLOSED or ERROR, so the cell queue is already empty */ - tor_free(chan); } @@ -1052,11 +952,6 @@ channel_listener_free(channel_listener_t *chan_l) /* Call a free method if there is one */ if (chan_l->free_fn) chan_l->free_fn(chan_l); - /* - * We're in CLOSED or ERROR, so the incoming channel queue is already - * empty. - */ - tor_free(chan_l); } @@ -1069,7 +964,6 @@ channel_listener_free(channel_listener_t *chan_l) static void channel_force_free(channel_t *chan) { - cell_queue_entry_t *cell, *cell_tmp; tor_assert(chan); log_debug(LD_CHANNEL, @@ -1103,18 +997,6 @@ channel_force_free(channel_t *chan) chan->cmux = NULL; } - /* We might still have a cell queue; kill it */ - TOR_SIMPLEQ_FOREACH_SAFE(cell, &chan->incoming_queue, next, cell_tmp) { - cell_queue_entry_free(cell, 0); - } - TOR_SIMPLEQ_INIT(&chan->incoming_queue); - - /* Outgoing cell queue is similar, but we can have to free packed cells */ - TOR_SIMPLEQ_FOREACH_SAFE(cell, &chan->outgoing_queue, next, cell_tmp) { - cell_queue_entry_free(cell, 0); - } - TOR_SIMPLEQ_INIT(&chan->outgoing_queue); - tor_free(chan); } @@ -1156,24 +1038,6 @@ channel_listener_force_free(channel_listener_t *chan_l) } /** - * Return the current registered listener for a channel listener - * - * This function returns a function pointer to the current registered - * handler for new incoming channels on a channel listener. - */ - -channel_listener_fn_ptr -channel_listener_get_listener_fn(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - if (chan_l->state == CHANNEL_LISTENER_STATE_LISTENING) - return chan_l->listener; - - return NULL; -} - -/** * Set the listener for a channel listener * * This function sets the handler for new incoming channels on a channel @@ -1237,8 +1101,7 @@ channel_get_var_cell_handler(channel_t *chan) * Set both cell handlers for a channel * * This function sets both the fixed-length and variable length cell handlers - * for a channel and processes any incoming cells that had been blocked in the - * queue because none were available. + * for a channel. */ void @@ -1247,8 +1110,6 @@ channel_set_cell_handlers(channel_t *chan, channel_var_cell_handler_fn_ptr var_cell_handler) { - int try_again = 0; - tor_assert(chan); tor_assert(CHANNEL_CAN_HANDLE_CELLS(chan)); @@ -1259,21 +1120,9 @@ channel_set_cell_handlers(channel_t *chan, "Setting var_cell_handler callback for channel %p to %p", chan, var_cell_handler); - /* Should we try the queue? */ - if (cell_handler && - cell_handler != chan->cell_handler) try_again = 1; - if (var_cell_handler && - var_cell_handler != chan->var_cell_handler) try_again = 1; - /* Change them */ chan->cell_handler = cell_handler; chan->var_cell_handler = var_cell_handler; - - /* Re-run the queue if we have one and there's any reason to */ - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue) && - try_again && - (chan->cell_handler || - chan->var_cell_handler)) channel_process_cells(chan); } /* @@ -1400,36 +1249,6 @@ channel_close_from_lower_layer(channel_t *chan) } /** - * Close a channel listener from the lower layer - * - * Notify the channel code that the channel listener is being closed due to a - * non-error condition in the lower layer. This does not call the close() - * method, since the lower layer already knows. - */ - -void -channel_listener_close_from_lower_layer(channel_listener_t *chan_l) -{ - tor_assert(chan_l != NULL); - - /* If it's already in CLOSING, CLOSED or ERROR, this is a no-op */ - if (chan_l->state == CHANNEL_LISTENER_STATE_CLOSING || - chan_l->state == CHANNEL_LISTENER_STATE_CLOSED || - chan_l->state == CHANNEL_LISTENER_STATE_ERROR) return; - - log_debug(LD_CHANNEL, - "Closing channel listener %p (global ID " U64_FORMAT ") " - "due to lower-layer event", - chan_l, U64_PRINTF_ARG(chan_l->global_identifier)); - - /* Note closing by event from below */ - chan_l->reason_for_closing = CHANNEL_LISTENER_CLOSE_FROM_BELOW; - - /* Change state to CLOSING */ - channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_CLOSING); -} - -/** * Notify that the channel is being closed due to an error condition * * This function is called by the lower layer implementing the transport @@ -1458,37 +1277,6 @@ channel_close_for_error(channel_t *chan) } /** - * Notify that the channel listener is being closed due to an error condition - * - * This function is called by the lower layer implementing the transport - * when a channel listener must be closed due to an error condition. This - * does not call the channel listener's close method, since the lower layer - * already knows. - */ - -void -channel_listener_close_for_error(channel_listener_t *chan_l) -{ - tor_assert(chan_l != NULL); - - /* If it's already in CLOSING, CLOSED or ERROR, this is a no-op */ - if (chan_l->state == CHANNEL_LISTENER_STATE_CLOSING || - chan_l->state == CHANNEL_LISTENER_STATE_CLOSED || - chan_l->state == CHANNEL_LISTENER_STATE_ERROR) return; - - log_debug(LD_CHANNEL, - "Closing channel listener %p (global ID " U64_FORMAT ") " - "due to lower-layer error", - chan_l, U64_PRINTF_ARG(chan_l->global_identifier)); - - /* Note closing by event from below */ - chan_l->reason_for_closing = CHANNEL_LISTENER_CLOSE_FOR_ERROR; - - /* Change state to CLOSING */ - channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_CLOSING); -} - -/** * Notify that the lower layer is finished closing the channel * * This function should be called by the lower layer when a channel @@ -1522,33 +1310,6 @@ channel_closed(channel_t *chan) } /** - * Notify that the lower layer is finished closing the channel listener - * - * This function should be called by the lower layer when a channel listener - * is finished closing and it should be regarded as inactive and - * freed by the channel code. - */ - -void -channel_listener_closed(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - tor_assert(chan_l->state == CHANNEL_LISTENER_STATE_CLOSING || - chan_l->state == CHANNEL_LISTENER_STATE_CLOSED || - chan_l->state == CHANNEL_LISTENER_STATE_ERROR); - - /* No-op if already inactive */ - if (chan_l->state == CHANNEL_LISTENER_STATE_CLOSED || - chan_l->state == CHANNEL_LISTENER_STATE_ERROR) return; - - if (chan_l->reason_for_closing != CHANNEL_LISTENER_CLOSE_FOR_ERROR) { - channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_CLOSED); - } else { - channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_ERROR); - } -} - -/** * Clear the identity_digest of a channel * * This function clears the identity digest of the remote endpoint for a @@ -1638,7 +1399,7 @@ channel_set_identity_digest(channel_t *chan, } /** - * Clear the remote end metadata (identity_digest/nickname) of a channel + * Clear the remote end metadata (identity_digest) of a channel * * This function clears all the remote end info from a channel; this is * intended for use by the lower layer. @@ -1665,419 +1426,96 @@ channel_clear_remote_end(channel_t *chan) memset(chan->identity_digest, 0, sizeof(chan->identity_digest)); - tor_free(chan->nickname); } /** - * Set the remote end metadata (identity_digest/nickname) of a channel + * Write to a channel the given packed cell. * - * This function sets new remote end info on a channel; this is intended - * for use by the lower layer. - */ - -void -channel_set_remote_end(channel_t *chan, - const char *identity_digest, - const char *nickname) -{ - int was_in_digest_map, should_be_in_digest_map, state_not_in_map; - - tor_assert(chan); - - log_debug(LD_CHANNEL, - "Setting remote endpoint identity on channel %p with " - "global ID " U64_FORMAT " to nickname %s, digest %s", - chan, U64_PRINTF_ARG(chan->global_identifier), - nickname ? nickname : "(null)", - identity_digest ? - hex_str(identity_digest, DIGEST_LEN) : "(null)"); - - state_not_in_map = CHANNEL_CONDEMNED(chan); - - was_in_digest_map = - !state_not_in_map && - chan->registered && - !tor_digest_is_zero(chan->identity_digest); - should_be_in_digest_map = - !state_not_in_map && - chan->registered && - (identity_digest && - !tor_digest_is_zero(identity_digest)); - - if (was_in_digest_map) - /* We should always remove it; we'll add it back if we're writing - * in a new digest. - */ - channel_remove_from_digest_map(chan); - - if (identity_digest) { - memcpy(chan->identity_digest, - identity_digest, - sizeof(chan->identity_digest)); - - } else { - memset(chan->identity_digest, 0, - sizeof(chan->identity_digest)); - } - - tor_free(chan->nickname); - if (nickname) - chan->nickname = tor_strdup(nickname); - - /* Put it in the digest map if we should */ - if (should_be_in_digest_map) - channel_add_to_digest_map(chan); -} - -/** - * Duplicate a cell queue entry; this is a shallow copy intended for use - * in channel_write_cell_queue_entry(). - */ - -static cell_queue_entry_t * -cell_queue_entry_dup(cell_queue_entry_t *q) -{ - cell_queue_entry_t *rv = NULL; - - tor_assert(q); - - rv = tor_malloc(sizeof(*rv)); - memcpy(rv, q, sizeof(*rv)); - - return rv; -} - -/** - * Free a cell_queue_entry_t; the handed_off parameter indicates whether - * the contents were passed to the lower layer (it is responsible for - * them) or not (we should free). - */ - -STATIC void -cell_queue_entry_free(cell_queue_entry_t *q, int handed_off) -{ - if (!q) return; - - if (!handed_off) { - /* - * If we handed it off, the recipient becomes responsible (or - * with packed cells the channel_t subclass calls packed_cell - * free after writing out its contents; see, e.g., - * channel_tls_write_packed_cell_method(). Otherwise, we have - * to take care of it here if possible. - */ - switch (q->type) { - case CELL_QUEUE_FIXED: - if (q->u.fixed.cell) { - /* - * There doesn't seem to be a cell_free() function anywhere in the - * pre-channel code; just use tor_free() - */ - tor_free(q->u.fixed.cell); - } - break; - case CELL_QUEUE_PACKED: - if (q->u.packed.packed_cell) { - packed_cell_free(q->u.packed.packed_cell); - } - break; - case CELL_QUEUE_VAR: - if (q->u.var.var_cell) { - /* - * This one's in connection_or.c; it'd be nice to figure out the - * whole flow of cells from one end to the other and factor the - * cell memory management functions like this out of the specific - * TLS lower layer. - */ - var_cell_free(q->u.var.var_cell); - } - break; - default: - /* - * Nothing we can do if we don't know the type; this will - * have been warned about elsewhere. - */ - break; - } - } - tor_free(q); -} - -#if 0 -/** - * Check whether a cell queue entry is padding; this is a helper function - * for channel_write_cell_queue_entry() - */ - -static int -cell_queue_entry_is_padding(cell_queue_entry_t *q) -{ - tor_assert(q); - - if (q->type == CELL_QUEUE_FIXED) { - if (q->u.fixed.cell) { - if (q->u.fixed.cell->command == CELL_PADDING || - q->u.fixed.cell->command == CELL_VPADDING) { - return 1; - } - } - } else if (q->type == CELL_QUEUE_VAR) { - if (q->u.var.var_cell) { - if (q->u.var.var_cell->command == CELL_PADDING || - q->u.var.var_cell->command == CELL_VPADDING) { - return 1; - } - } - } - - return 0; -} -#endif /* 0 */ - -/** - * Allocate a new cell queue entry for a fixed-size cell - */ - -static cell_queue_entry_t * -cell_queue_entry_new_fixed(cell_t *cell) -{ - cell_queue_entry_t *q = NULL; - - tor_assert(cell); - - q = tor_malloc(sizeof(*q)); - q->type = CELL_QUEUE_FIXED; - q->u.fixed.cell = cell; - - return q; -} - -/** - * Allocate a new cell queue entry for a variable-size cell - */ - -static cell_queue_entry_t * -cell_queue_entry_new_var(var_cell_t *var_cell) -{ - cell_queue_entry_t *q = NULL; - - tor_assert(var_cell); - - q = tor_malloc(sizeof(*q)); - q->type = CELL_QUEUE_VAR; - q->u.var.var_cell = var_cell; - - return q; -} - -/** - * Ask how big the cell contained in a cell_queue_entry_t is - */ - -static size_t -channel_get_cell_queue_entry_size(channel_t *chan, cell_queue_entry_t *q) -{ - size_t rv = 0; - - tor_assert(chan); - tor_assert(q); - - switch (q->type) { - case CELL_QUEUE_FIXED: - rv = get_cell_network_size(chan->wide_circ_ids); - break; - case CELL_QUEUE_VAR: - rv = get_var_cell_header_size(chan->wide_circ_ids) + - (q->u.var.var_cell ? q->u.var.var_cell->payload_len : 0); - break; - case CELL_QUEUE_PACKED: - rv = get_cell_network_size(chan->wide_circ_ids); - break; - default: - tor_assert_nonfatal_unreached_once(); - } - - return rv; -} - -/** - * Write to a channel based on a cell_queue_entry_t + * Return 0 on success or -1 on error. * - * Given a cell_queue_entry_t filled out by the caller, try to send the cell - * and queue it if we can't. + * Two possible errors can happen. Either the channel is not opened or the + * lower layer (specialized channel) failed to write it. In both cases, it is + * the caller responsability to free the cell. */ - -static void -channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) +static int +write_packed_cell(channel_t *chan, packed_cell_t *cell) { - int result = 0, sent = 0; - cell_queue_entry_t *tmp = NULL; + int ret = -1; size_t cell_bytes; tor_assert(chan); - tor_assert(q); + tor_assert(cell); /* Assert that the state makes sense for a cell write */ tor_assert(CHANNEL_CAN_HANDLE_CELLS(chan)); { circid_t circ_id; - if (is_destroy_cell(chan, q, &circ_id)) { + if (packed_cell_is_destroy(chan, cell, &circ_id)) { channel_note_destroy_not_pending(chan, circ_id); } } /* For statistical purposes, figure out how big this cell is */ - cell_bytes = channel_get_cell_queue_entry_size(chan, q); + cell_bytes = get_cell_network_size(chan->wide_circ_ids); /* Can we send it right out? If so, try */ - if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) && - CHANNEL_IS_OPEN(chan)) { - /* Pick the right write function for this cell type and save the result */ - switch (q->type) { - case CELL_QUEUE_FIXED: - tor_assert(chan->write_cell); - tor_assert(q->u.fixed.cell); - result = chan->write_cell(chan, q->u.fixed.cell); - break; - case CELL_QUEUE_PACKED: - tor_assert(chan->write_packed_cell); - tor_assert(q->u.packed.packed_cell); - result = chan->write_packed_cell(chan, q->u.packed.packed_cell); - break; - case CELL_QUEUE_VAR: - tor_assert(chan->write_var_cell); - tor_assert(q->u.var.var_cell); - result = chan->write_var_cell(chan, q->u.var.var_cell); - break; - default: - tor_assert(1); - } - - /* Check if we got it out */ - if (result > 0) { - sent = 1; - /* Timestamp for transmission */ - channel_timestamp_xmit(chan); - /* If we're here the queue is empty, so it's drained too */ - channel_timestamp_drained(chan); - /* Update the counter */ - ++(chan->n_cells_xmitted); - chan->n_bytes_xmitted += cell_bytes; - /* Update global counters */ - ++n_channel_cells_queued; - ++n_channel_cells_passed_to_lower_layer; - n_channel_bytes_queued += cell_bytes; - n_channel_bytes_passed_to_lower_layer += cell_bytes; - channel_assert_counter_consistency(); - } + if (!CHANNEL_IS_OPEN(chan)) { + goto done; } - if (!sent) { - /* Not sent, queue it */ - /* - * We have to copy the queue entry passed in, since the caller probably - * used the stack. - */ - tmp = cell_queue_entry_dup(q); - TOR_SIMPLEQ_INSERT_TAIL(&chan->outgoing_queue, tmp, next); - /* Update global counters */ - ++n_channel_cells_queued; - ++n_channel_cells_in_queues; - n_channel_bytes_queued += cell_bytes; - n_channel_bytes_in_queues += cell_bytes; - channel_assert_counter_consistency(); - /* Update channel queue size */ - chan->bytes_in_queue += cell_bytes; - /* Try to process the queue? */ - if (CHANNEL_IS_OPEN(chan)) channel_flush_cells(chan); + /* Write the cell on the connection's outbuf. */ + if (chan->write_packed_cell(chan, cell) < 0) { + goto done; } + /* Timestamp for transmission */ + channel_timestamp_xmit(chan); + /* Update the counter */ + ++(chan->n_cells_xmitted); + chan->n_bytes_xmitted += cell_bytes; + /* Successfully sent the cell. */ + ret = 0; + + done: + return ret; } -/** Write a generic cell type to a channel +/** + * Write a packed cell to a channel + * + * Write a packed cell to a channel using the write_cell() method. This is + * called by the transport-independent code to deliver a packed cell to a + * channel for transmission. * - * Write a generic cell to a channel. It is called by channel_write_cell(), - * channel_write_var_cell() and channel_write_packed_cell() in order to reduce - * code duplication. Notice that it takes cell as pointer of type void, - * this can be dangerous because no type check is performed. + * Return 0 on success else a negative value. In both cases, the caller should + * not access the cell anymore, it is freed both on success and error. */ - -void -channel_write_cell_generic_(channel_t *chan, const char *cell_type, - void *cell, cell_queue_entry_t *q) +int +channel_write_packed_cell(channel_t *chan, packed_cell_t *cell) { + int ret = -1; tor_assert(chan); tor_assert(cell); if (CHANNEL_IS_CLOSING(chan)) { - log_debug(LD_CHANNEL, "Discarding %c %p on closing channel %p with " - "global ID "U64_FORMAT, *cell_type, cell, chan, + log_debug(LD_CHANNEL, "Discarding %p on closing channel %p with " + "global ID "U64_FORMAT, cell, chan, U64_PRINTF_ARG(chan->global_identifier)); - tor_free(cell); - return; + goto end; } log_debug(LD_CHANNEL, - "Writing %c %p to channel %p with global ID " - U64_FORMAT, *cell_type, - cell, chan, U64_PRINTF_ARG(chan->global_identifier)); + "Writing %p to channel %p with global ID " + U64_FORMAT, cell, chan, U64_PRINTF_ARG(chan->global_identifier)); - channel_write_cell_queue_entry(chan, q); - /* Update the queue size estimate */ - channel_update_xmit_queue_size(chan); -} - -/** - * Write a cell to a channel - * - * Write a fixed-length cell to a channel using the write_cell() method. - * This is equivalent to the pre-channels connection_or_write_cell_to_buf(); - * it is called by the transport-independent code to deliver a cell to a - * channel for transmission. - */ + ret = write_packed_cell(chan, cell); -void -channel_write_cell(channel_t *chan, cell_t *cell) -{ - cell_queue_entry_t q; - q.type = CELL_QUEUE_FIXED; - q.u.fixed.cell = cell; - channel_write_cell_generic_(chan, "cell_t", cell, &q); -} - -/** - * Write a packed cell to a channel - * - * Write a packed cell to a channel using the write_cell() method. This is - * called by the transport-independent code to deliver a packed cell to a - * channel for transmission. - */ - -void -channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell) -{ - cell_queue_entry_t q; - q.type = CELL_QUEUE_PACKED; - q.u.packed.packed_cell = packed_cell; - channel_write_cell_generic_(chan, "packed_cell_t", packed_cell, &q); -} - -/** - * Write a variable-length cell to a channel - * - * Write a variable-length cell to a channel using the write_cell() method. - * This is equivalent to the pre-channels - * connection_or_write_var_cell_to_buf(); it's called by the transport- - * independent code to deliver a var_cell to a channel for transmission. - */ - -void -channel_write_var_cell(channel_t *chan, var_cell_t *var_cell) -{ - cell_queue_entry_t q; - q.type = CELL_QUEUE_VAR; - q.u.var.var_cell = var_cell; - channel_write_cell_generic_(chan, "var_cell_t", var_cell, &q); + end: + /* Whatever happens, we free the cell. Either an error occured or the cell + * was put on the connection outbuf, both cases we have ownership of the + * cell and we free it. */ + packed_cell_free(cell); + return ret; } /** @@ -2119,15 +1557,6 @@ channel_change_state_(channel_t *chan, channel_state_t to_state) tor_assert(chan->reason_for_closing != CHANNEL_NOT_CLOSING); } - /* - * We need to maintain the queues here for some transitions: - * when we enter CHANNEL_STATE_OPEN (especially from CHANNEL_STATE_MAINT) - * we may have a backlog of cells to transmit, so drain the queues in - * that case, and when going to CHANNEL_STATE_CLOSED the subclass - * should have made sure to finish sending things (or gone to - * CHANNEL_STATE_ERROR if not possible), so we assert for that here. - */ - log_debug(LD_CHANNEL, "Changing state of channel %p (global ID " U64_FORMAT ") from \"%s\" to \"%s\"", @@ -2184,36 +1613,6 @@ channel_change_state_(channel_t *chan, channel_state_t to_state) } else if (to_state == CHANNEL_STATE_MAINT) { scheduler_channel_doesnt_want_writes(chan); } - - /* - * If we're closing, this channel no longer counts toward the global - * estimated queue size; if we're open, it now does. - */ - if ((to_state == CHANNEL_STATE_CLOSING || - to_state == CHANNEL_STATE_CLOSED || - to_state == CHANNEL_STATE_ERROR) && - (from_state == CHANNEL_STATE_OPEN || - from_state == CHANNEL_STATE_MAINT)) { - estimated_total_queue_size -= chan->bytes_in_queue; - } - - /* - * If we're opening, this channel now does count toward the global - * estimated queue size. - */ - if ((to_state == CHANNEL_STATE_OPEN || - to_state == CHANNEL_STATE_MAINT) && - !(from_state == CHANNEL_STATE_OPEN || - from_state == CHANNEL_STATE_MAINT)) { - estimated_total_queue_size += chan->bytes_in_queue; - } - - if (to_state == CHANNEL_STATE_CLOSED || - to_state == CHANNEL_STATE_ERROR) { - /* Assert that all queues are empty */ - tor_assert(TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)); - tor_assert(TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)); - } } /** @@ -2237,12 +1636,6 @@ channel_change_state_open(channel_t *chan) /* Tell circuits if we opened and stuff */ channel_do_open_actions(chan); chan->has_been_open = 1; - - /* Check for queued cells to process */ - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - channel_process_cells(chan); - if (! TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) - channel_flush_cells(chan); } /** @@ -2284,15 +1677,6 @@ channel_listener_change_state(channel_listener_t *chan_l, tor_assert(chan_l->reason_for_closing != CHANNEL_LISTENER_NOT_CLOSING); } - /* - * We need to maintain the queues here for some transitions: - * when we enter CHANNEL_STATE_OPEN (especially from CHANNEL_STATE_MAINT) - * we may have a backlog of cells to transmit, so drain the queues in - * that case, and when going to CHANNEL_STATE_CLOSED the subclass - * should have made sure to finish sending things (or gone to - * CHANNEL_STATE_ERROR if not possible), so we assert for that here. - */ - log_debug(LD_CHANNEL, "Changing state of channel listener %p (global ID " U64_FORMAT "from \"%s\" to \"%s\"", @@ -2325,30 +1709,38 @@ channel_listener_change_state(channel_listener_t *chan_l, if (to_state == CHANNEL_LISTENER_STATE_CLOSED || to_state == CHANNEL_LISTENER_STATE_ERROR) { - /* Assert that the queue is empty */ tor_assert(!(chan_l->incoming_list) || smartlist_len(chan_l->incoming_list) == 0); } } -/** - * Try to flush cells to the lower layer - * - * this is called by the lower layer to indicate that it wants more cells; - * it will try to write up to num_cells cells from the channel's cell queue or - * from circuits active on that channel, or as many as it has available if - * num_cells == -1. - */ - +/* Maximum number of cells that is allowed to flush at once withing + * channel_flush_some_cells(). */ #define MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED 256 +/* Try to flush cells of the given channel chan up to a maximum of num_cells. + * + * This is called by the scheduler when it wants to flush cells from the + * channel's circuit queue(s) to the connection outbuf (not yet on the wire). + * + * If the channel is not in state CHANNEL_STATE_OPEN, this does nothing and + * will return 0 meaning no cells were flushed. + * + * If num_cells is -1, we'll try to flush up to the maximum cells allowed + * defined in MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED. + * + * On success, the number of flushed cells are returned and it can never be + * above num_cells. If 0 is returned, no cells were flushed either because the + * channel was not opened or we had no cells on the channel. A negative number + * can NOT be sent back. + * + * This function is part of the fast path. */ MOCK_IMPL(ssize_t, channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) { unsigned int unlimited = 0; ssize_t flushed = 0; - int num_cells_from_circs, clamped_num_cells; - int q_len_before, q_len_after; + int clamped_num_cells; tor_assert(chan); @@ -2357,11 +1749,6 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */ if (CHANNEL_IS_OPEN(chan)) { - /* Try to flush as much as we can that's already queued */ - flushed += channel_flush_some_cells_from_outgoing_queue(chan, - (unlimited ? -1 : num_cells - flushed)); - if (!unlimited && num_cells <= flushed) goto done; - if (circuitmux_num_cells(chan->cmux) > 0) { /* Calculate number of cells, including clamp */ if (unlimited) { @@ -2375,45 +1762,9 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) } } - /* - * Keep track of the change in queue size; we have to count cells - * channel_flush_from_first_active_circuit() writes out directly, - * but not double-count ones we might get later in - * channel_flush_some_cells_from_outgoing_queue() - */ - q_len_before = chan_cell_queue_len(&(chan->outgoing_queue)); - /* Try to get more cells from any active circuits */ - num_cells_from_circs = channel_flush_from_first_active_circuit( + flushed = channel_flush_from_first_active_circuit( chan, clamped_num_cells); - - q_len_after = chan_cell_queue_len(&(chan->outgoing_queue)); - - /* - * If it claims we got some, adjust the flushed counter and consider - * processing the queue again - */ - if (num_cells_from_circs > 0) { - /* - * Adjust flushed by the number of cells counted in - * num_cells_from_circs that didn't go to the cell queue. - */ - - if (q_len_after > q_len_before) { - num_cells_from_circs -= (q_len_after - q_len_before); - if (num_cells_from_circs < 0) num_cells_from_circs = 0; - } - - flushed += num_cells_from_circs; - - /* Now process the queue if necessary */ - - if ((q_len_after > q_len_before) && - (unlimited || (flushed < num_cells))) { - flushed += channel_flush_some_cells_from_outgoing_queue(chan, - (unlimited ? -1 : num_cells - flushed)); - } - } } } @@ -2422,197 +1773,16 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) } /** - * Flush cells from just the channel's outgoing cell queue - * - * This gets called from channel_flush_some_cells() above to flush cells - * just from the queue without trying for active_circuits. - */ - -static ssize_t -channel_flush_some_cells_from_outgoing_queue(channel_t *chan, - ssize_t num_cells) -{ - unsigned int unlimited = 0; - ssize_t flushed = 0; - cell_queue_entry_t *q = NULL; - size_t cell_size; - int free_q = 0, handed_off = 0; - - tor_assert(chan); - tor_assert(chan->write_cell); - tor_assert(chan->write_packed_cell); - tor_assert(chan->write_var_cell); - - if (num_cells < 0) unlimited = 1; - if (!unlimited && num_cells <= flushed) return 0; - - /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */ - if (CHANNEL_IS_OPEN(chan)) { - while ((unlimited || num_cells > flushed) && - NULL != (q = TOR_SIMPLEQ_FIRST(&chan->outgoing_queue))) { - free_q = 0; - handed_off = 0; - - /* Figure out how big it is for statistical purposes */ - cell_size = channel_get_cell_queue_entry_size(chan, q); - /* - * Okay, we have a good queue entry, try to give it to the lower - * layer. - */ - switch (q->type) { - case CELL_QUEUE_FIXED: - if (q->u.fixed.cell) { - if (chan->write_cell(chan, - q->u.fixed.cell)) { - ++flushed; - channel_timestamp_xmit(chan); - ++(chan->n_cells_xmitted); - chan->n_bytes_xmitted += cell_size; - free_q = 1; - handed_off = 1; - } - /* Else couldn't write it; leave it on the queue */ - } else { - /* This shouldn't happen */ - log_info(LD_CHANNEL, - "Saw broken cell queue entry of type CELL_QUEUE_FIXED " - "with no cell on channel %p " - "(global ID " U64_FORMAT ").", - chan, U64_PRINTF_ARG(chan->global_identifier)); - /* Throw it away */ - free_q = 1; - handed_off = 0; - } - break; - case CELL_QUEUE_PACKED: - if (q->u.packed.packed_cell) { - if (chan->write_packed_cell(chan, - q->u.packed.packed_cell)) { - ++flushed; - channel_timestamp_xmit(chan); - ++(chan->n_cells_xmitted); - chan->n_bytes_xmitted += cell_size; - free_q = 1; - handed_off = 1; - } - /* Else couldn't write it; leave it on the queue */ - } else { - /* This shouldn't happen */ - log_info(LD_CHANNEL, - "Saw broken cell queue entry of type CELL_QUEUE_PACKED " - "with no cell on channel %p " - "(global ID " U64_FORMAT ").", - chan, U64_PRINTF_ARG(chan->global_identifier)); - /* Throw it away */ - free_q = 1; - handed_off = 0; - } - break; - case CELL_QUEUE_VAR: - if (q->u.var.var_cell) { - if (chan->write_var_cell(chan, - q->u.var.var_cell)) { - ++flushed; - channel_timestamp_xmit(chan); - ++(chan->n_cells_xmitted); - chan->n_bytes_xmitted += cell_size; - free_q = 1; - handed_off = 1; - } - /* Else couldn't write it; leave it on the queue */ - } else { - /* This shouldn't happen */ - log_info(LD_CHANNEL, - "Saw broken cell queue entry of type CELL_QUEUE_VAR " - "with no cell on channel %p " - "(global ID " U64_FORMAT ").", - chan, U64_PRINTF_ARG(chan->global_identifier)); - /* Throw it away */ - free_q = 1; - handed_off = 0; - } - break; - default: - /* Unknown type, log and free it */ - log_info(LD_CHANNEL, - "Saw an unknown cell queue entry type %d on channel %p " - "(global ID " U64_FORMAT "; ignoring it." - " Someone should fix this.", - q->type, chan, U64_PRINTF_ARG(chan->global_identifier)); - free_q = 1; - handed_off = 0; - } - - /* - * if free_q is set, we used it and should remove the queue entry; - * we have to do the free down here so TOR_SIMPLEQ_REMOVE_HEAD isn't - * accessing freed memory - */ - if (free_q) { - TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next); - /* - * ...and we handed a cell off to the lower layer, so we should - * update the counters. - */ - ++n_channel_cells_passed_to_lower_layer; - --n_channel_cells_in_queues; - n_channel_bytes_passed_to_lower_layer += cell_size; - n_channel_bytes_in_queues -= cell_size; - channel_assert_counter_consistency(); - /* Update the channel's queue size too */ - chan->bytes_in_queue -= cell_size; - /* Finally, free q */ - cell_queue_entry_free(q, handed_off); - q = NULL; - } else { - /* No cell removed from list, so we can't go on any further */ - break; - } - } - } - - /* Did we drain the queue? */ - if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) { - channel_timestamp_drained(chan); - } - - /* Update the estimate queue size */ - channel_update_xmit_queue_size(chan); - - return flushed; -} - -/** - * Flush as many cells as we possibly can from the queue - * - * This tries to flush as many cells from the queue as the lower layer - * will take. It just calls channel_flush_some_cells_from_outgoing_queue() - * in unlimited mode. - */ - -void -channel_flush_cells(channel_t *chan) -{ - channel_flush_some_cells_from_outgoing_queue(chan, -1); -} - -/** * Check if any cells are available * - * This gets used from the lower layer to check if any more cells are - * available. + * This is used by the scheduler to know if the channel has more to flush + * after a scheduling round. */ - MOCK_IMPL(int, channel_more_to_flush, (channel_t *chan)) { tor_assert(chan); - /* Check if we have any queued */ - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - return 1; - - /* Check if any circuits would like to queue some */ if (circuitmux_num_cells(chan->cmux) > 0) return 1; /* Else no */ @@ -2816,207 +1986,31 @@ channel_listener_queue_incoming(channel_listener_t *listener, } /** - * Process queued incoming cells - * - * Process as many queued cells as we can from the incoming - * cell queue. + * Process a cell from the given channel. */ - void -channel_process_cells(channel_t *chan) +channel_process_cell(channel_t *chan, cell_t *cell) { - cell_queue_entry_t *q; tor_assert(chan); tor_assert(CHANNEL_IS_CLOSING(chan) || CHANNEL_IS_MAINT(chan) || CHANNEL_IS_OPEN(chan)); - - log_debug(LD_CHANNEL, - "Processing as many incoming cells as we can for channel %p", - chan); - - /* Nothing we can do if we have no registered cell handlers */ - if (!(chan->cell_handler || - chan->var_cell_handler)) return; - /* Nothing we can do if we have no cells */ - if (TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) return; - - /* - * Process cells until we're done or find one we have no current handler - * for. - * - * We must free the cells here after calling the handler, since custody - * of the buffer was given to the channel layer when they were queued; - * see comments on memory management in channel_queue_cell() and in - * channel_queue_var_cell() below. - */ - while (NULL != (q = TOR_SIMPLEQ_FIRST(&chan->incoming_queue))) { - tor_assert(q); - tor_assert(q->type == CELL_QUEUE_FIXED || - q->type == CELL_QUEUE_VAR); - - if (q->type == CELL_QUEUE_FIXED && - chan->cell_handler) { - /* Handle a fixed-length cell */ - TOR_SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next); - tor_assert(q->u.fixed.cell); - log_debug(LD_CHANNEL, - "Processing incoming cell_t %p for channel %p (global ID " - U64_FORMAT ")", - q->u.fixed.cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - chan->cell_handler(chan, q->u.fixed.cell); - tor_free(q->u.fixed.cell); - tor_free(q); - } else if (q->type == CELL_QUEUE_VAR && - chan->var_cell_handler) { - /* Handle a variable-length cell */ - TOR_SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next); - tor_assert(q->u.var.var_cell); - log_debug(LD_CHANNEL, - "Processing incoming var_cell_t %p for channel %p (global ID " - U64_FORMAT ")", - q->u.var.var_cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - chan->var_cell_handler(chan, q->u.var.var_cell); - tor_free(q->u.var.var_cell); - tor_free(q); - } else { - /* Can't handle this one */ - break; - } - } -} - -/** - * Queue incoming cell - * - * This should be called by a channel_t subclass to queue an incoming fixed- - * length cell for processing, and process it if possible. - */ - -void -channel_queue_cell(channel_t *chan, cell_t *cell) -{ - int need_to_queue = 0; - cell_queue_entry_t *q; - cell_t *cell_copy = NULL; - - tor_assert(chan); tor_assert(cell); - tor_assert(CHANNEL_IS_OPEN(chan)); - /* Do we need to queue it, or can we just call the handler right away? */ - if (!(chan->cell_handler)) need_to_queue = 1; - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - need_to_queue = 1; + /* Nothing we can do if we have no registered cell handlers */ + if (!chan->cell_handler) + return; /* Timestamp for receiving */ channel_timestamp_recv(chan); - - /* Update the counters */ + /* Update received counter. */ ++(chan->n_cells_recved); chan->n_bytes_recved += get_cell_network_size(chan->wide_circ_ids); - /* If we don't need to queue we can just call cell_handler */ - if (!need_to_queue) { - tor_assert(chan->cell_handler); - log_debug(LD_CHANNEL, - "Directly handling incoming cell_t %p for channel %p " - "(global ID " U64_FORMAT ")", - cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - chan->cell_handler(chan, cell); - } else { - /* - * Otherwise queue it and then process the queue if possible. - * - * We queue a copy, not the original pointer - it might have been on the - * stack in connection_or_process_cells_from_inbuf() (or another caller - * if we ever have a subclass other than channel_tls_t), or be freed - * there after we return. This is the uncommon case; the non-copying - * fast path occurs in the if (!need_to_queue) case above when the - * upper layer has installed cell handlers. - */ - cell_copy = tor_malloc_zero(sizeof(cell_t)); - memcpy(cell_copy, cell, sizeof(cell_t)); - q = cell_queue_entry_new_fixed(cell_copy); - log_debug(LD_CHANNEL, - "Queueing incoming cell_t %p for channel %p " - "(global ID " U64_FORMAT ")", - cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - TOR_SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next); - if (chan->cell_handler || - chan->var_cell_handler) { - channel_process_cells(chan); - } - } -} - -/** - * Queue incoming variable-length cell - * - * This should be called by a channel_t subclass to queue an incoming - * variable-length cell for processing, and process it if possible. - */ - -void -channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell) -{ - int need_to_queue = 0; - cell_queue_entry_t *q; - var_cell_t *cell_copy = NULL; - - tor_assert(chan); - tor_assert(var_cell); - tor_assert(CHANNEL_IS_OPEN(chan)); - - /* Do we need to queue it, or can we just call the handler right away? */ - if (!(chan->var_cell_handler)) need_to_queue = 1; - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - need_to_queue = 1; - - /* Timestamp for receiving */ - channel_timestamp_recv(chan); - - /* Update the counter */ - ++(chan->n_cells_recved); - chan->n_bytes_recved += get_var_cell_header_size(chan->wide_circ_ids) + - var_cell->payload_len; - - /* If we don't need to queue we can just call cell_handler */ - if (!need_to_queue) { - tor_assert(chan->var_cell_handler); - log_debug(LD_CHANNEL, - "Directly handling incoming var_cell_t %p for channel %p " - "(global ID " U64_FORMAT ")", - var_cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - chan->var_cell_handler(chan, var_cell); - } else { - /* - * Otherwise queue it and then process the queue if possible. - * - * We queue a copy, not the original pointer - it might have been on the - * stack in connection_or_process_cells_from_inbuf() (or another caller - * if we ever have a subclass other than channel_tls_t), or be freed - * there after we return. This is the uncommon case; the non-copying - * fast path occurs in the if (!need_to_queue) case above when the - * upper layer has installed cell handlers. - */ - cell_copy = var_cell_copy(var_cell); - q = cell_queue_entry_new_var(cell_copy); - log_debug(LD_CHANNEL, - "Queueing incoming var_cell_t %p for channel %p " - "(global ID " U64_FORMAT ")", - var_cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - TOR_SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next); - if (chan->cell_handler || - chan->var_cell_handler) { - channel_process_cells(chan); - } - } + log_debug(LD_CHANNEL, + "Processing incoming cell_t %p for channel %p (global ID " + U64_FORMAT ")", cell, chan, + U64_PRINTF_ARG(chan->global_identifier)); + chan->cell_handler(chan, cell); } /** If <b>packed_cell</b> on <b>chan</b> is a destroy cell, then set @@ -3043,44 +2037,6 @@ packed_cell_is_destroy(channel_t *chan, } /** - * Assert that the global channel stats counters are internally consistent - */ - -static void -channel_assert_counter_consistency(void) -{ - tor_assert(n_channel_cells_queued == - (n_channel_cells_in_queues + n_channel_cells_passed_to_lower_layer)); - tor_assert(n_channel_bytes_queued == - (n_channel_bytes_in_queues + n_channel_bytes_passed_to_lower_layer)); -} - -/* DOCDOC */ -static int -is_destroy_cell(channel_t *chan, - const cell_queue_entry_t *q, circid_t *circid_out) -{ - *circid_out = 0; - switch (q->type) { - case CELL_QUEUE_FIXED: - if (q->u.fixed.cell->command == CELL_DESTROY) { - *circid_out = q->u.fixed.cell->circ_id; - return 1; - } - break; - case CELL_QUEUE_VAR: - if (q->u.var.var_cell->command == CELL_DESTROY) { - *circid_out = q->u.var.var_cell->circ_id; - return 1; - } - break; - case CELL_QUEUE_PACKED: - return packed_cell_is_destroy(chan, q->u.packed.packed_cell, circid_out); - } - return 0; -} - -/** * Send destroy cell on a channel * * Write a destroy cell with circ ID <b>circ_id</b> and reason <b>reason</b> @@ -3134,19 +2090,6 @@ channel_dumpstats(int severity) { if (all_channels && smartlist_len(all_channels) > 0) { tor_log(severity, LD_GENERAL, - "Channels have queued " U64_FORMAT " bytes in " U64_FORMAT " cells, " - "and handed " U64_FORMAT " bytes in " U64_FORMAT " cells to the lower" - " layer.", - U64_PRINTF_ARG(n_channel_bytes_queued), - U64_PRINTF_ARG(n_channel_cells_queued), - U64_PRINTF_ARG(n_channel_bytes_passed_to_lower_layer), - U64_PRINTF_ARG(n_channel_cells_passed_to_lower_layer)); - tor_log(severity, LD_GENERAL, - "There are currently " U64_FORMAT " bytes in " U64_FORMAT " cells " - "in channel queues.", - U64_PRINTF_ARG(n_channel_bytes_in_queues), - U64_PRINTF_ARG(n_channel_cells_in_queues)); - tor_log(severity, LD_GENERAL, "Dumping statistics about %d channels:", smartlist_len(all_channels)); tor_log(severity, LD_GENERAL, @@ -3629,19 +2572,6 @@ channel_listener_describe_transport(channel_listener_t *chan_l) } /** - * Return the number of entries in <b>queue</b> - */ -STATIC int -chan_cell_queue_len(const chan_cell_queue_t *queue) -{ - int r = 0; - cell_queue_entry_t *cell; - TOR_SIMPLEQ_FOREACH(cell, queue, next) - ++r; - return r; -} - -/** * Dump channel statistics * * Dump statistics for one channel to the log @@ -3676,35 +2606,18 @@ channel_dump_statistics, (channel_t *chan, int severity)) U64_PRINTF_ARG(chan->timestamp_active), U64_PRINTF_ARG(now - chan->timestamp_active)); - /* Handle digest and nickname */ + /* Handle digest. */ if (!tor_digest_is_zero(chan->identity_digest)) { - if (chan->nickname) { - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " says it is connected " - "to an OR with digest %s and nickname %s", - U64_PRINTF_ARG(chan->global_identifier), - hex_str(chan->identity_digest, DIGEST_LEN), - chan->nickname); - } else { - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " says it is connected " - "to an OR with digest %s and no known nickname", - U64_PRINTF_ARG(chan->global_identifier), - hex_str(chan->identity_digest, DIGEST_LEN)); - } + tor_log(severity, LD_GENERAL, + " * Channel " U64_FORMAT " says it is connected " + "to an OR with digest %s", + U64_PRINTF_ARG(chan->global_identifier), + hex_str(chan->identity_digest, DIGEST_LEN)); } else { - if (chan->nickname) { - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " does not know the digest" - " of the OR it is connected to, but reports its nickname is %s", - U64_PRINTF_ARG(chan->global_identifier), - chan->nickname); - } else { - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " does not know the digest" - " or the nickname of the OR it is connected to", - U64_PRINTF_ARG(chan->global_identifier)); - } + tor_log(severity, LD_GENERAL, + " * Channel " U64_FORMAT " does not know the digest" + " of the OR it is connected to", + U64_PRINTF_ARG(chan->global_identifier)); } /* Handle remote address and descriptions */ @@ -3753,14 +2666,6 @@ channel_dump_statistics, (channel_t *chan, int severity)) channel_is_incoming(chan) ? "incoming" : "outgoing"); - /* Describe queues */ - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " has %d queued incoming cells" - " and %d queued outgoing cells", - U64_PRINTF_ARG(chan->global_identifier), - chan_cell_queue_len(&chan->incoming_queue), - chan_cell_queue_len(&chan->outgoing_queue)); - /* Describe circuits */ tor_log(severity, LD_GENERAL, " * Channel " U64_FORMAT " has %d active circuits out of" @@ -3779,12 +2684,6 @@ channel_dump_statistics, (channel_t *chan, int severity)) U64_PRINTF_ARG(chan->timestamp_client), U64_PRINTF_ARG(now - chan->timestamp_client)); tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " was last drained at " - U64_FORMAT " (" U64_FORMAT " seconds ago)", - U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(chan->timestamp_drained), - U64_PRINTF_ARG(now - chan->timestamp_drained)); - tor_log(severity, LD_GENERAL, " * Channel " U64_FORMAT " last received a cell " "at " U64_FORMAT " (" U64_FORMAT " seconds ago)", U64_PRINTF_ARG(chan->global_identifier), @@ -4027,29 +2926,18 @@ channel_get_addr_if_possible(channel_t *chan, tor_addr_t *addr_out) else return 0; } -/** - * Check if there are outgoing queue writes on this channel - * - * Indicate if either we have queued cells, or if not, whether the underlying - * lower-layer transport thinks it has an output queue. +/* + * Return true iff the channel has any cells on the connection outbuf waiting + * to be sent onto the network. */ - int channel_has_queued_writes(channel_t *chan) { - int has_writes = 0; - tor_assert(chan); tor_assert(chan->has_queued_writes); - if (! TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) { - has_writes = 1; - } else { - /* Check with the lower layer */ - has_writes = chan->has_queued_writes(chan); - } - - return has_writes; + /* Check with the lower layer */ + return chan->has_queued_writes(chan); } /** @@ -4274,23 +3162,10 @@ channel_mark_outgoing(channel_t *chan) ***********************/ /* - * Get the latest estimate for the total queue size of all open channels - */ - -uint64_t -channel_get_global_queue_estimate(void) -{ - return estimated_total_queue_size; -} - -/* * Estimate the number of writeable cells * - * Ask the lower layer for an estimate of how many cells it can accept, and - * then subtract the length of our outgoing_queue, if any, to produce an - * estimate of the number of cells this channel can accept for writes. + * Ask the lower layer for an estimate of how many cells it can accept. */ - int channel_num_cells_writeable(channel_t *chan) { @@ -4302,8 +3177,6 @@ channel_num_cells_writeable(channel_t *chan) if (chan->state == CHANNEL_STATE_OPEN) { /* Query lower layer */ result = chan->num_cells_writeable(chan); - /* Subtract cell queue length, if any */ - result -= chan_cell_queue_len(&chan->outgoing_queue); if (result < 0) result = 0; } else { /* No cells are writeable in any other state */ @@ -4427,25 +3300,6 @@ channel_timestamp_client(channel_t *chan) } /** - * Update the last drained timestamp - * - * This is called whenever we transmit a cell which leaves the outgoing cell - * queue completely empty. It also updates the xmit time and the active time. - */ - -void -channel_timestamp_drained(channel_t *chan) -{ - time_t now = time(NULL); - - tor_assert(chan); - - chan->timestamp_active = now; - chan->timestamp_drained = now; - chan->timestamp_xmit = now; -} - -/** * Update the recv timestamp * * This is called whenever we get an incoming cell from the lower layer. @@ -4504,54 +3358,6 @@ channel_when_created(channel_t *chan) } /** - * Query created timestamp for a channel listener - */ - -time_t -channel_listener_when_created(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - return chan_l->timestamp_created; -} - -/** - * Query last active timestamp for a channel - */ - -time_t -channel_when_last_active(channel_t *chan) -{ - tor_assert(chan); - - return chan->timestamp_active; -} - -/** - * Query last active timestamp for a channel listener - */ - -time_t -channel_listener_when_last_active(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - return chan_l->timestamp_active; -} - -/** - * Query last accepted timestamp for a channel listener - */ - -time_t -channel_listener_when_last_accepted(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - return chan_l->timestamp_accepted; -} - -/** * Query client timestamp */ @@ -4564,30 +3370,6 @@ channel_when_last_client(channel_t *chan) } /** - * Query drained timestamp - */ - -time_t -channel_when_last_drained(channel_t *chan) -{ - tor_assert(chan); - - return chan->timestamp_drained; -} - -/** - * Query recv timestamp - */ - -time_t -channel_when_last_recv(channel_t *chan) -{ - tor_assert(chan); - - return chan->timestamp_recv; -} - -/** * Query xmit timestamp */ @@ -4600,42 +3382,6 @@ channel_when_last_xmit(channel_t *chan) } /** - * Query accepted counter - */ - -uint64_t -channel_listener_count_accepted(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - return chan_l->n_accepted; -} - -/** - * Query received cell counter - */ - -uint64_t -channel_count_recved(channel_t *chan) -{ - tor_assert(chan); - - return chan->n_cells_recved; -} - -/** - * Query transmitted cell counter - */ - -uint64_t -channel_count_xmitted(channel_t *chan) -{ - tor_assert(chan); - - return chan->n_cells_xmitted; -} - -/** * Check if a channel matches an extend_info_t * * This function calls the lower layer and asks if this channel matches a @@ -4819,83 +3565,3 @@ channel_update_bad_for_new_circs(const char *digest, int force) } } -/** - * Update the estimated number of bytes queued to transmit for this channel, - * and notify the scheduler. The estimate includes both the channel queue and - * the queue size reported by the lower layer, and an overhead estimate - * optionally provided by the lower layer. - */ - -void -channel_update_xmit_queue_size(channel_t *chan) -{ - uint64_t queued, adj; - double overhead; - - tor_assert(chan); - tor_assert(chan->num_bytes_queued); - - /* - * First, get the number of bytes we have queued without factoring in - * lower-layer overhead. - */ - queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue; - /* Next, adjust by the overhead factor, if any is available */ - if (chan->get_overhead_estimate) { - overhead = chan->get_overhead_estimate(chan); - if (overhead >= 1.0) { - queued = (uint64_t)(queued * overhead); - } else { - /* Ignore silly overhead factors */ - log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead); - } - } - - /* Now, compare to the previous estimate */ - if (queued > chan->bytes_queued_for_xmit) { - adj = queued - chan->bytes_queued_for_xmit; - log_debug(LD_CHANNEL, - "Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT - " from " U64_FORMAT " to " U64_FORMAT, - U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(adj), - U64_PRINTF_ARG(chan->bytes_queued_for_xmit), - U64_PRINTF_ARG(queued)); - /* Update the channel's estimate */ - chan->bytes_queued_for_xmit = queued; - - /* Update the global queue size estimate if appropriate */ - if (chan->state == CHANNEL_STATE_OPEN || - chan->state == CHANNEL_STATE_MAINT) { - estimated_total_queue_size += adj; - log_debug(LD_CHANNEL, - "Increasing global queue size by " U64_FORMAT " for channel " - U64_FORMAT ", new size is " U64_FORMAT, - U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(estimated_total_queue_size)); - } - } else if (queued < chan->bytes_queued_for_xmit) { - adj = chan->bytes_queued_for_xmit - queued; - log_debug(LD_CHANNEL, - "Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT - " from " U64_FORMAT " to " U64_FORMAT, - U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(adj), - U64_PRINTF_ARG(chan->bytes_queued_for_xmit), - U64_PRINTF_ARG(queued)); - /* Update the channel's estimate */ - chan->bytes_queued_for_xmit = queued; - - /* Update the global queue size estimate if appropriate */ - if (chan->state == CHANNEL_STATE_OPEN || - chan->state == CHANNEL_STATE_MAINT) { - estimated_total_queue_size -= adj; - log_debug(LD_CHANNEL, - "Decreasing global queue size by " U64_FORMAT " for channel " - U64_FORMAT ", new size is " U64_FORMAT, - U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(estimated_total_queue_size)); - } - } -} - diff --git a/src/or/channel.h b/src/or/channel.h index 32336fe1d2..d88a77c9ae 100644 --- a/src/or/channel.h +++ b/src/or/channel.h @@ -19,10 +19,6 @@ typedef void (*channel_listener_fn_ptr)(channel_listener_t *, channel_t *); typedef void (*channel_cell_handler_fn_ptr)(channel_t *, cell_t *); typedef void (*channel_var_cell_handler_fn_ptr)(channel_t *, var_cell_t *); -struct cell_queue_entry_s; -TOR_SIMPLEQ_HEAD(chan_cell_queue, cell_queue_entry_s); -typedef struct chan_cell_queue chan_cell_queue_t; - /** * This enum is used by channelpadding to decide when to pad channels. * Don't add values to it without updating the checks in @@ -259,21 +255,12 @@ struct channel_s { */ ed25519_public_key_t ed25519_identity; - /** Nickname of the OR on the other side, or NULL if none. */ - char *nickname; - /** * Linked list of channels with the same RSA identity digest, for use with * the digest->channel map */ TOR_LIST_ENTRY(channel_s) next_with_same_id; - /** List of incoming cells to handle */ - chan_cell_queue_t incoming_queue; - - /** List of queued outgoing cells */ - chan_cell_queue_t outgoing_queue; - /** Circuit mux for circuits sending on this channel */ circuitmux_t *cmux; @@ -320,7 +307,6 @@ struct channel_s { /** Channel timestamps for cell channels */ time_t timestamp_client; /* Client used this, according to relay.c */ - time_t timestamp_drained; /* Output queue empty */ time_t timestamp_recv; /* Cell received from lower layer */ time_t timestamp_xmit; /* Cell sent to lower layer */ @@ -337,14 +323,6 @@ struct channel_s { /** Channel counters for cell channels */ uint64_t n_cells_recved, n_bytes_recved; uint64_t n_cells_xmitted, n_bytes_xmitted; - - /** Our current contribution to the scheduler's total xmit queue */ - uint64_t bytes_queued_for_xmit; - - /** Number of bytes in this channel's cell queue; does not include - * lower-layer queueing. - */ - uint64_t bytes_in_queue; }; struct channel_listener_s { @@ -412,18 +390,13 @@ channel_listener_state_to_string(channel_listener_state_t state); /* Abstract channel operations */ void channel_mark_for_close(channel_t *chan); -void channel_write_cell(channel_t *chan, cell_t *cell); -void channel_write_packed_cell(channel_t *chan, packed_cell_t *cell); -void channel_write_var_cell(channel_t *chan, var_cell_t *cell); +int channel_write_packed_cell(channel_t *chan, packed_cell_t *cell); void channel_listener_mark_for_close(channel_listener_t *chan_l); /* Channel callback registrations */ /* Listener callback */ -channel_listener_fn_ptr -channel_listener_get_listener_fn(channel_listener_t *chan); - void channel_listener_set_listener_fn(channel_listener_t *chan, channel_listener_fn_ptr listener); @@ -457,36 +430,9 @@ void channel_set_cmux_policy_everywhere(circuitmux_policy_t *pol); #ifdef TOR_CHANNEL_INTERNAL_ #ifdef CHANNEL_PRIVATE_ -/* Cell queue structure (here rather than channel.c for test suite use) */ - -typedef struct cell_queue_entry_s cell_queue_entry_t; -struct cell_queue_entry_s { - TOR_SIMPLEQ_ENTRY(cell_queue_entry_s) next; - enum { - CELL_QUEUE_FIXED, - CELL_QUEUE_VAR, - CELL_QUEUE_PACKED - } type; - union { - struct { - cell_t *cell; - } fixed; - struct { - var_cell_t *var_cell; - } var; - struct { - packed_cell_t *packed_cell; - } packed; - } u; -}; - -/* Cell queue functions for benefit of test suite */ -STATIC int chan_cell_queue_len(const chan_cell_queue_t *queue); -STATIC void cell_queue_entry_free(cell_queue_entry_t *q, int handed_off); +STATIC void channel_add_to_digest_map(channel_t *chan); -void channel_write_cell_generic_(channel_t *chan, const char *cell_type, - void *cell, cell_queue_entry_t *q); #endif /* defined(CHANNEL_PRIVATE_) */ /* Channel operations for subclasses and internal use only */ @@ -511,10 +457,6 @@ void channel_close_from_lower_layer(channel_t *chan); void channel_close_for_error(channel_t *chan); void channel_closed(channel_t *chan); -void channel_listener_close_from_lower_layer(channel_listener_t *chan_l); -void channel_listener_close_for_error(channel_listener_t *chan_l); -void channel_listener_closed(channel_listener_t *chan_l); - /* Free a channel */ void channel_free(channel_t *chan); void channel_listener_free(channel_listener_t *chan_l); @@ -532,9 +474,6 @@ void channel_mark_remote(channel_t *chan); void channel_set_identity_digest(channel_t *chan, const char *identity_digest, const ed25519_public_key_t *ed_identity); -void channel_set_remote_end(channel_t *chan, - const char *identity_digest, - const char *nickname); void channel_listener_change_state(channel_listener_t *chan_l, channel_listener_state_t to_state); @@ -542,7 +481,6 @@ void channel_listener_change_state(channel_listener_t *chan_l, /* Timestamp updates */ void channel_timestamp_created(channel_t *chan); void channel_timestamp_active(channel_t *chan); -void channel_timestamp_drained(channel_t *chan); void channel_timestamp_recv(channel_t *chan); void channel_timestamp_xmit(channel_t *chan); @@ -556,12 +494,7 @@ void channel_listener_queue_incoming(channel_listener_t *listener, channel_t *incoming); /* Incoming cell handling */ -void channel_process_cells(channel_t *chan); -void channel_queue_cell(channel_t *chan, cell_t *cell); -void channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell); - -/* Outgoing cell handling */ -void channel_flush_cells(channel_t *chan); +void channel_process_cell(channel_t *chan, cell_t *cell); /* Request from lower layer for more cells if available */ MOCK_DECL(ssize_t, channel_flush_some_cells, @@ -576,10 +509,6 @@ void channel_notify_flushed(channel_t *chan); /* Handle stuff we need to do on open like notifying circuits */ void channel_do_open_actions(channel_t *chan); -#ifdef TOR_UNIT_TESTS -extern uint64_t estimated_total_queue_size; -#endif - #endif /* defined(TOR_CHANNEL_INTERNAL_) */ /* Helper functions to perform operations on channels */ @@ -680,7 +609,6 @@ MOCK_DECL(void,channel_set_circid_type,(channel_t *chan, crypto_pk_t *identity_rcvd, int consider_identity)); void channel_timestamp_client(channel_t *chan); -void channel_update_xmit_queue_size(channel_t *chan); const char * channel_listener_describe_transport(channel_listener_t *chan_l); void channel_listener_dump_statistics(channel_listener_t *chan_l, @@ -692,27 +620,14 @@ void channel_check_for_duplicates(void); void channel_update_bad_for_new_circs(const char *digest, int force); /* Flow control queries */ -uint64_t channel_get_global_queue_estimate(void); int channel_num_cells_writeable(channel_t *chan); /* Timestamp queries */ time_t channel_when_created(channel_t *chan); -time_t channel_when_last_active(channel_t *chan); time_t channel_when_last_client(channel_t *chan); -time_t channel_when_last_drained(channel_t *chan); -time_t channel_when_last_recv(channel_t *chan); time_t channel_when_last_xmit(channel_t *chan); -time_t channel_listener_when_created(channel_listener_t *chan_l); -time_t channel_listener_when_last_active(channel_listener_t *chan_l); -time_t channel_listener_when_last_accepted(channel_listener_t *chan_l); - /* Counter queries */ -uint64_t channel_count_recved(channel_t *chan); -uint64_t channel_count_xmitted(channel_t *chan); - -uint64_t channel_listener_count_accepted(channel_listener_t *chan_l); - int packed_cell_is_destroy(channel_t *chan, const packed_cell_t *packed_cell, circid_t *circid_out); diff --git a/src/or/channeltls.c b/src/or/channeltls.c index 8277813186..023ccdefd3 100644 --- a/src/or/channeltls.c +++ b/src/or/channeltls.c @@ -832,6 +832,9 @@ channel_tls_write_cell_method(channel_t *chan, cell_t *cell) * * This implements the write_packed_cell method for channel_tls_t; given a * channel_tls_t and a packed_cell_t, transmit the packed_cell_t. + * + * Return 0 on success or negative value on error. The caller must free the + * packed cell. */ static int @@ -841,7 +844,6 @@ channel_tls_write_packed_cell_method(channel_t *chan, tor_assert(chan); channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan); size_t cell_network_size = get_cell_network_size(chan->wide_circ_ids); - int written = 0; tor_assert(tlschan); tor_assert(packed_cell); @@ -849,18 +851,15 @@ channel_tls_write_packed_cell_method(channel_t *chan, if (tlschan->conn) { connection_buf_add(packed_cell->body, cell_network_size, TO_CONN(tlschan->conn)); - - /* This is where the cell is finished; used to be done from relay.c */ - packed_cell_free(packed_cell); - ++written; } else { log_info(LD_CHANNEL, "something called write_packed_cell on a tlschan " "(%p with ID " U64_FORMAT " but no conn", chan, U64_PRINTF_ARG(chan->global_identifier)); + return -1; } - return written; + return 0; } /** @@ -1149,7 +1148,7 @@ channel_tls_handle_cell(cell_t *cell, or_connection_t *conn) * These are all transport independent and we pass them up through the * channel_t mechanism. They are ultimately handled in command.c. */ - channel_queue_cell(TLS_CHAN_TO_BASE(chan), cell); + channel_process_cell(TLS_CHAN_TO_BASE(chan), cell); break; default: log_fn(LOG_INFO, LD_PROTOCOL, diff --git a/src/or/circuitbuild.c b/src/or/circuitbuild.c index 2e6b63b4d6..4e9d2457c4 100644 --- a/src/or/circuitbuild.c +++ b/src/or/circuitbuild.c @@ -631,8 +631,7 @@ circuit_n_chan_done(channel_t *chan, int status, int close_origin_circuits) tor_assert(chan); - log_debug(LD_CIRC,"chan to %s/%s, status=%d", - chan->nickname ? chan->nickname : "NULL", + log_debug(LD_CIRC,"chan to %s, status=%d", channel_get_canonical_remote_descr(chan), status); pending_circs = smartlist_new(); diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c index fa19c0afd0..6157bf68b7 100644 --- a/src/or/circuitlist.c +++ b/src/or/circuitlist.c @@ -505,8 +505,7 @@ circuit_count_pending_on_channel(channel_t *chan) circuit_get_all_pending_on_channel(sl, chan); cnt = smartlist_len(sl); smartlist_free(sl); - log_debug(LD_CIRC,"or_conn to %s at %s, %d pending circs", - chan->nickname ? chan->nickname : "NULL", + log_debug(LD_CIRC,"or_conn to %s, %d pending circs", channel_get_canonical_remote_descr(chan), cnt); return cnt; diff --git a/src/or/connection_or.c b/src/or/connection_or.c index fdf1b2ebb1..c680c5b218 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -592,9 +592,6 @@ connection_or_flushed_some(or_connection_t *conn) { size_t datalen; - /* The channel will want to update its estimated queue size */ - channel_update_xmit_queue_size(TLS_CHAN_TO_BASE(conn->chan)); - /* If we're under the low water mark, add cells until we're just over the * high water mark. */ datalen = connection_get_outbuf_len(TO_CONN(conn)); diff --git a/src/or/relay.c b/src/or/relay.c index bbaf4ee785..0ad064417d 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -2746,7 +2746,13 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) /* this code is duplicated from some of the logic below. Ugly! XXXX */ tor_assert(destroy_queue->n > 0); cell = cell_queue_pop(destroy_queue); - channel_write_packed_cell(chan, cell); + /* Send the DESTROY cell. It is very unlikely that this fails but just + * in case, get rid of the channel. */ + if (channel_write_packed_cell(chan, cell) < 0) { + /* The cell has been freed. */ + channel_mark_for_close(chan); + continue; + } /* Update the cmux destroy counter */ circuitmux_notify_xmit_destroy(cmux); cell = NULL; @@ -2823,8 +2829,13 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) DIRREQ_TUNNELED, DIRREQ_CIRC_QUEUE_FLUSHED); - /* Now send the cell */ - channel_write_packed_cell(chan, cell); + /* Now send the cell. It is very unlikely that this fails but just in + * case, get rid of the channel. */ + if (channel_write_packed_cell(chan, cell) < 0) { + /* The cell has been freed at this point. */ + channel_mark_for_close(chan); + continue; + } cell = NULL; /* @@ -2859,22 +2870,13 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) return n_flushed; } -#if 0 -/** Indicate the current preferred cap for middle circuits; zero disables - * the cap. Right now it's just a constant, ORCIRC_MAX_MIDDLE_CELLS, but - * the logic in append_cell_to_circuit_queue() is written to be correct - * if we want to base it on a consensus param or something that might change - * in the future. - */ -static int -get_max_middle_cells(void) -{ - return ORCIRC_MAX_MIDDLE_CELLS; -} -#endif /* 0 */ - /** Add <b>cell</b> to the queue of <b>circ</b> writing to <b>chan</b> - * transmitting in <b>direction</b>. */ + * transmitting in <b>direction</b>. + * + * The given <b>cell</b> is copied over the circuit queue so the caller must + * cleanup the memory. + * + * This function is part of the fast path. */ void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, cell_t *cell, cell_direction_t direction, @@ -2883,10 +2885,6 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, or_circuit_t *orcirc = NULL; cell_queue_t *queue; int streams_blocked; -#if 0 - uint32_t tgt_max_middle_cells, p_len, n_len, tmp, hard_max_middle_cells; -#endif - int exitward; if (circ->marked_for_close) return; @@ -2901,93 +2899,14 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, streams_blocked = circ->streams_blocked_on_p_chan; } - /* - * Disabling this for now because of a possible guard discovery attack - */ -#if 0 - /* Are we a middle circuit about to exceed ORCIRC_MAX_MIDDLE_CELLS? */ - if ((circ->n_chan != NULL) && CIRCUIT_IS_ORCIRC(circ)) { - orcirc = TO_OR_CIRCUIT(circ); - if (orcirc->p_chan) { - /* We are a middle circuit if we have both n_chan and p_chan */ - /* We'll need to know the current preferred maximum */ - tgt_max_middle_cells = get_max_middle_cells(); - if (tgt_max_middle_cells > 0) { - /* Do we need to initialize middle_max_cells? */ - if (orcirc->max_middle_cells == 0) { - orcirc->max_middle_cells = tgt_max_middle_cells; - } else { - if (tgt_max_middle_cells > orcirc->max_middle_cells) { - /* If we want to increase the cap, we can do so right away */ - orcirc->max_middle_cells = tgt_max_middle_cells; - } else if (tgt_max_middle_cells < orcirc->max_middle_cells) { - /* - * If we're shrinking the cap, we can't shrink past either queue; - * compare tgt_max_middle_cells rather than tgt_max_middle_cells * - * ORCIRC_MAX_MIDDLE_KILL_THRESH so the queues don't shrink enough - * to generate spurious warnings, either. - */ - n_len = circ->n_chan_cells.n; - p_len = orcirc->p_chan_cells.n; - tmp = tgt_max_middle_cells; - if (tmp < n_len) tmp = n_len; - if (tmp < p_len) tmp = p_len; - orcirc->max_middle_cells = tmp; - } - /* else no change */ - } - } else { - /* tgt_max_middle_cells == 0 indicates we should disable the cap */ - orcirc->max_middle_cells = 0; - } - - /* Now we know orcirc->max_middle_cells is set correctly */ - if (orcirc->max_middle_cells > 0) { - hard_max_middle_cells = - (uint32_t)(((double)orcirc->max_middle_cells) * - ORCIRC_MAX_MIDDLE_KILL_THRESH); - - if ((unsigned)queue->n + 1 >= hard_max_middle_cells) { - /* Queueing this cell would put queue over the kill theshold */ - log_warn(LD_CIRC, - "Got a cell exceeding the hard cap of %u in the " - "%s direction on middle circ ID %u on chan ID " - U64_FORMAT "; killing the circuit.", - hard_max_middle_cells, - (direction == CELL_DIRECTION_OUT) ? "n" : "p", - (direction == CELL_DIRECTION_OUT) ? - circ->n_circ_id : orcirc->p_circ_id, - U64_PRINTF_ARG( - (direction == CELL_DIRECTION_OUT) ? - circ->n_chan->global_identifier : - orcirc->p_chan->global_identifier)); - circuit_mark_for_close(circ, END_CIRC_REASON_RESOURCELIMIT); - return; - } else if ((unsigned)queue->n + 1 == orcirc->max_middle_cells) { - /* Only use ==, not >= for this test so we don't spam the log */ - log_warn(LD_CIRC, - "While trying to queue a cell, reached the soft cap of %u " - "in the %s direction on middle circ ID %u " - "on chan ID " U64_FORMAT ".", - orcirc->max_middle_cells, - (direction == CELL_DIRECTION_OUT) ? "n" : "p", - (direction == CELL_DIRECTION_OUT) ? - circ->n_circ_id : orcirc->p_circ_id, - U64_PRINTF_ARG( - (direction == CELL_DIRECTION_OUT) ? - circ->n_chan->global_identifier : - orcirc->p_chan->global_identifier)); - } - } - } - } -#endif /* 0 */ - + /* Very important that we copy to the circuit queue because all calls to + * this function use the stack for the cell memory. */ cell_queue_append_packed_copy(circ, queue, exitward, cell, chan->wide_circ_ids, 1); + /* Check and run the OOM if needed. */ if (PREDICT_UNLIKELY(cell_queues_check_size())) { - /* We ran the OOM handler */ + /* We ran the OOM handler which might have closed this circuit. */ if (circ->marked_for_close) return; } |