diff options
-rw-r--r-- | changes/ticket23709 | 11 | ||||
-rw-r--r-- | src/or/channel.c | 1586 | ||||
-rw-r--r-- | src/or/channel.h | 91 | ||||
-rw-r--r-- | src/or/channeltls.c | 13 | ||||
-rw-r--r-- | src/or/circuitbuild.c | 3 | ||||
-rw-r--r-- | src/or/circuitlist.c | 3 | ||||
-rw-r--r-- | src/or/connection_or.c | 3 | ||||
-rw-r--r-- | src/or/relay.c | 129 | ||||
-rw-r--r-- | src/test/fakechans.h | 1 | ||||
-rw-r--r-- | src/test/test_channel.c | 1765 | ||||
-rw-r--r-- | src/test/test_relay.c | 4 | ||||
-rw-r--r-- | src/test/test_scheduler.c | 12 |
12 files changed, 884 insertions, 2737 deletions
diff --git a/changes/ticket23709 b/changes/ticket23709 new file mode 100644 index 0000000000..7948f9ae03 --- /dev/null +++ b/changes/ticket23709 @@ -0,0 +1,11 @@ + o Major feature (channel): + - Remove the incoming and outgoing channel queues. The reason to do so was + due to the fact that they were always empty meaning never used but still + looked at in our fast path. Bottom line, it was an unused code path. + - We've simplify a lot the channel subsystem by removing those queues but + also by removing a lot of unused code or dead code around it. Overall + this is a cleanup removing more than 1500 lines of code overall and + adding very little except for unit test. + - The majority ot the channel unit tests have been rewritten and the code + coverage has now been raised to 83.6% for channel.c. + Closes ticket 23709. diff --git a/src/or/channel.c b/src/or/channel.c index 7590ba8a5b..5b9d860baf 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -5,9 +5,8 @@ * \file channel.c * * \brief OR/OP-to-OR channel abstraction layer. A channel's job is to - * transfer cells from Tor instance to Tor instance. - * Currently, there is only one implementation of the channel abstraction: in - * channeltls.c. + * transfer cells from Tor instance to Tor instance. Currently, there is only + * one implementation of the channel abstraction: in channeltls.c. * * Channels are a higher-level abstraction than or_connection_t: In general, * any means that two Tor relays use to exchange cells, or any means that a @@ -24,16 +23,28 @@ * connection. * * Every channel implementation is responsible for being able to transmit - * cells that are added to it with channel_write_cell() and related functions, - * and to receive incoming cells with the channel_queue_cell() and related - * functions. See the channel_t documentation for more information. - * - * When new cells arrive on a channel, they are passed to cell handler - * functions, which can be set by channel_set_cell_handlers() - * functions. (Tor's cell handlers are in command.c.) - * - * Tor flushes cells to channels from relay.c in - * channel_flush_from_first_active_circuit(). + * cells that are passed to it + * + * For *inbound* cells, the entry point is: channel_process_cell(). It takes a + * cell and will pass it to the cell handler set by + * channel_set_cell_handlers(). Currently, this is passed back to the command + * subsystem which is command_process_cell(). + * + * NOTE: For now, the seperation between channels and specialized channels + * (like channeltls) is not that well defined. So the channeltls layer calls + * channel_process_cell() which originally comes from the connection subsytem. + * This should be hopefully be fixed with #23993. + * + * For *outbound* cells, the entry point is: channel_write_packed_cell(). + * Only packed cells are dequeued from the circuit queue by the scheduler + * which uses channel_flush_from_first_active_circuit() to decide which cells + * to flush from which circuit on the channel. They are then passed down to + * the channel subsystem. This calls the low layer with the function pointer + * .write_packed_cell(). + * + * Each specialized channel (currently only channeltls_t) MUST implement a + * series of function found in channel_t. See channel.h for more + * documentation. **/ /* @@ -112,59 +123,6 @@ HANDLE_IMPL(channel, channel_s,) /* Counter for ID numbers */ static uint64_t n_channels_allocated = 0; -/* - * Channel global byte/cell counters, for statistics and for scheduler high - * /low-water marks. - */ - -/* - * Total number of cells ever given to any channel with the - * channel_write_*_cell() functions. - */ - -static uint64_t n_channel_cells_queued = 0; - -/* - * Total number of cells ever passed to a channel lower layer with the - * write_*_cell() methods. - */ - -static uint64_t n_channel_cells_passed_to_lower_layer = 0; - -/* - * Current number of cells in all channel queues; should be - * n_channel_cells_queued - n_channel_cells_passed_to_lower_layer. - */ - -static uint64_t n_channel_cells_in_queues = 0; - -/* - * Total number of bytes for all cells ever queued to a channel and - * counted in n_channel_cells_queued. - */ - -static uint64_t n_channel_bytes_queued = 0; - -/* - * Total number of bytes for all cells ever passed to a channel lower layer - * and counted in n_channel_cells_passed_to_lower_layer. - */ - -static uint64_t n_channel_bytes_passed_to_lower_layer = 0; - -/* - * Current number of bytes in all channel queues; should be - * n_channel_bytes_queued - n_channel_bytes_passed_to_lower_layer. - */ - -static uint64_t n_channel_bytes_in_queues = 0; - -/* - * Current total estimated queue size *including lower layer queues and - * transmit overhead* - */ - -STATIC uint64_t estimated_total_queue_size = 0; /* Digest->channel map * @@ -201,40 +159,15 @@ HT_PROTOTYPE(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash, HT_GENERATE2(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash, channel_idmap_eq, 0.5, tor_reallocarray_, tor_free_) -static cell_queue_entry_t * cell_queue_entry_dup(cell_queue_entry_t *q); -#if 0 -static int cell_queue_entry_is_padding(cell_queue_entry_t *q); -#endif -static cell_queue_entry_t * -cell_queue_entry_new_fixed(cell_t *cell); -static cell_queue_entry_t * -cell_queue_entry_new_var(var_cell_t *var_cell); -static int is_destroy_cell(channel_t *chan, - const cell_queue_entry_t *q, circid_t *circid_out); - -static void channel_assert_counter_consistency(void); - /* Functions to maintain the digest map */ -static void channel_add_to_digest_map(channel_t *chan); static void channel_remove_from_digest_map(channel_t *chan); -/* - * Flush cells from just the outgoing queue without trying to get them - * from circuits; used internall by channel_flush_some_cells(). - */ -static ssize_t -channel_flush_some_cells_from_outgoing_queue(channel_t *chan, - ssize_t num_cells); static void channel_force_free(channel_t *chan); static void channel_free_list(smartlist_t *channels, int mark_for_close); static void channel_listener_free_list(smartlist_t *channels, int mark_for_close); static void channel_listener_force_free(channel_listener_t *chan_l); -static size_t channel_get_cell_queue_entry_size(channel_t *chan, - cell_queue_entry_t *q); -static void -channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q); /*********************************** * Channel state utility functions * @@ -628,7 +561,7 @@ channel_listener_unregister(channel_listener_t *chan_l) * already exist. */ -static void +STATIC void channel_add_to_digest_map(channel_t *chan) { channel_idmap_entry_t *ent, search; @@ -676,33 +609,6 @@ channel_remove_from_digest_map(channel_t *chan) /* Assert that there is a digest */ tor_assert(!tor_digest_is_zero(chan->identity_digest)); -#if 0 - /* Make sure we have a map */ - if (!channel_identity_map) { - /* - * No identity map, so we can't find it by definition. This - * case is similar to digestmap_get() failing below. - */ - log_warn(LD_BUG, - "Trying to remove channel %p (global ID " U64_FORMAT ") " - "with digest %s from identity map, but didn't have any identity " - "map", - chan, U64_PRINTF_ARG(chan->global_identifier), - hex_str(chan->identity_digest, DIGEST_LEN)); - /* Clear out its next/prev pointers */ - if (chan->next_with_same_id) { - chan->next_with_same_id->prev_with_same_id = chan->prev_with_same_id; - } - if (chan->prev_with_same_id) { - chan->prev_with_same_id->next_with_same_id = chan->next_with_same_id; - } - chan->next_with_same_id = NULL; - chan->prev_with_same_id = NULL; - - return; - } -#endif /* 0 */ - /* Pull it out of its list, wherever that list is */ TOR_LIST_REMOVE(chan, next_with_same_id); @@ -936,10 +842,6 @@ channel_init(channel_t *chan) /* Warn about exhausted circuit IDs no more than hourly. */ chan->last_warned_circ_ids_exhausted.rate = 3600; - /* Initialize queues. */ - TOR_SIMPLEQ_INIT(&chan->incoming_queue); - TOR_SIMPLEQ_INIT(&chan->outgoing_queue); - /* Initialize list entries. */ memset(&chan->next_with_same_id, 0, sizeof(chan->next_with_same_id)); @@ -1022,8 +924,6 @@ channel_free(channel_t *chan) chan->cmux = NULL; } - /* We're in CLOSED or ERROR, so the cell queue is already empty */ - tor_free(chan); } @@ -1052,11 +952,6 @@ channel_listener_free(channel_listener_t *chan_l) /* Call a free method if there is one */ if (chan_l->free_fn) chan_l->free_fn(chan_l); - /* - * We're in CLOSED or ERROR, so the incoming channel queue is already - * empty. - */ - tor_free(chan_l); } @@ -1069,7 +964,6 @@ channel_listener_free(channel_listener_t *chan_l) static void channel_force_free(channel_t *chan) { - cell_queue_entry_t *cell, *cell_tmp; tor_assert(chan); log_debug(LD_CHANNEL, @@ -1103,18 +997,6 @@ channel_force_free(channel_t *chan) chan->cmux = NULL; } - /* We might still have a cell queue; kill it */ - TOR_SIMPLEQ_FOREACH_SAFE(cell, &chan->incoming_queue, next, cell_tmp) { - cell_queue_entry_free(cell, 0); - } - TOR_SIMPLEQ_INIT(&chan->incoming_queue); - - /* Outgoing cell queue is similar, but we can have to free packed cells */ - TOR_SIMPLEQ_FOREACH_SAFE(cell, &chan->outgoing_queue, next, cell_tmp) { - cell_queue_entry_free(cell, 0); - } - TOR_SIMPLEQ_INIT(&chan->outgoing_queue); - tor_free(chan); } @@ -1156,24 +1038,6 @@ channel_listener_force_free(channel_listener_t *chan_l) } /** - * Return the current registered listener for a channel listener - * - * This function returns a function pointer to the current registered - * handler for new incoming channels on a channel listener. - */ - -channel_listener_fn_ptr -channel_listener_get_listener_fn(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - if (chan_l->state == CHANNEL_LISTENER_STATE_LISTENING) - return chan_l->listener; - - return NULL; -} - -/** * Set the listener for a channel listener * * This function sets the handler for new incoming channels on a channel @@ -1237,8 +1101,7 @@ channel_get_var_cell_handler(channel_t *chan) * Set both cell handlers for a channel * * This function sets both the fixed-length and variable length cell handlers - * for a channel and processes any incoming cells that had been blocked in the - * queue because none were available. + * for a channel. */ void @@ -1247,8 +1110,6 @@ channel_set_cell_handlers(channel_t *chan, channel_var_cell_handler_fn_ptr var_cell_handler) { - int try_again = 0; - tor_assert(chan); tor_assert(CHANNEL_CAN_HANDLE_CELLS(chan)); @@ -1259,21 +1120,9 @@ channel_set_cell_handlers(channel_t *chan, "Setting var_cell_handler callback for channel %p to %p", chan, var_cell_handler); - /* Should we try the queue? */ - if (cell_handler && - cell_handler != chan->cell_handler) try_again = 1; - if (var_cell_handler && - var_cell_handler != chan->var_cell_handler) try_again = 1; - /* Change them */ chan->cell_handler = cell_handler; chan->var_cell_handler = var_cell_handler; - - /* Re-run the queue if we have one and there's any reason to */ - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue) && - try_again && - (chan->cell_handler || - chan->var_cell_handler)) channel_process_cells(chan); } /* @@ -1400,36 +1249,6 @@ channel_close_from_lower_layer(channel_t *chan) } /** - * Close a channel listener from the lower layer - * - * Notify the channel code that the channel listener is being closed due to a - * non-error condition in the lower layer. This does not call the close() - * method, since the lower layer already knows. - */ - -void -channel_listener_close_from_lower_layer(channel_listener_t *chan_l) -{ - tor_assert(chan_l != NULL); - - /* If it's already in CLOSING, CLOSED or ERROR, this is a no-op */ - if (chan_l->state == CHANNEL_LISTENER_STATE_CLOSING || - chan_l->state == CHANNEL_LISTENER_STATE_CLOSED || - chan_l->state == CHANNEL_LISTENER_STATE_ERROR) return; - - log_debug(LD_CHANNEL, - "Closing channel listener %p (global ID " U64_FORMAT ") " - "due to lower-layer event", - chan_l, U64_PRINTF_ARG(chan_l->global_identifier)); - - /* Note closing by event from below */ - chan_l->reason_for_closing = CHANNEL_LISTENER_CLOSE_FROM_BELOW; - - /* Change state to CLOSING */ - channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_CLOSING); -} - -/** * Notify that the channel is being closed due to an error condition * * This function is called by the lower layer implementing the transport @@ -1458,37 +1277,6 @@ channel_close_for_error(channel_t *chan) } /** - * Notify that the channel listener is being closed due to an error condition - * - * This function is called by the lower layer implementing the transport - * when a channel listener must be closed due to an error condition. This - * does not call the channel listener's close method, since the lower layer - * already knows. - */ - -void -channel_listener_close_for_error(channel_listener_t *chan_l) -{ - tor_assert(chan_l != NULL); - - /* If it's already in CLOSING, CLOSED or ERROR, this is a no-op */ - if (chan_l->state == CHANNEL_LISTENER_STATE_CLOSING || - chan_l->state == CHANNEL_LISTENER_STATE_CLOSED || - chan_l->state == CHANNEL_LISTENER_STATE_ERROR) return; - - log_debug(LD_CHANNEL, - "Closing channel listener %p (global ID " U64_FORMAT ") " - "due to lower-layer error", - chan_l, U64_PRINTF_ARG(chan_l->global_identifier)); - - /* Note closing by event from below */ - chan_l->reason_for_closing = CHANNEL_LISTENER_CLOSE_FOR_ERROR; - - /* Change state to CLOSING */ - channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_CLOSING); -} - -/** * Notify that the lower layer is finished closing the channel * * This function should be called by the lower layer when a channel @@ -1522,33 +1310,6 @@ channel_closed(channel_t *chan) } /** - * Notify that the lower layer is finished closing the channel listener - * - * This function should be called by the lower layer when a channel listener - * is finished closing and it should be regarded as inactive and - * freed by the channel code. - */ - -void -channel_listener_closed(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - tor_assert(chan_l->state == CHANNEL_LISTENER_STATE_CLOSING || - chan_l->state == CHANNEL_LISTENER_STATE_CLOSED || - chan_l->state == CHANNEL_LISTENER_STATE_ERROR); - - /* No-op if already inactive */ - if (chan_l->state == CHANNEL_LISTENER_STATE_CLOSED || - chan_l->state == CHANNEL_LISTENER_STATE_ERROR) return; - - if (chan_l->reason_for_closing != CHANNEL_LISTENER_CLOSE_FOR_ERROR) { - channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_CLOSED); - } else { - channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_ERROR); - } -} - -/** * Clear the identity_digest of a channel * * This function clears the identity digest of the remote endpoint for a @@ -1638,7 +1399,7 @@ channel_set_identity_digest(channel_t *chan, } /** - * Clear the remote end metadata (identity_digest/nickname) of a channel + * Clear the remote end metadata (identity_digest) of a channel * * This function clears all the remote end info from a channel; this is * intended for use by the lower layer. @@ -1665,419 +1426,96 @@ channel_clear_remote_end(channel_t *chan) memset(chan->identity_digest, 0, sizeof(chan->identity_digest)); - tor_free(chan->nickname); } /** - * Set the remote end metadata (identity_digest/nickname) of a channel + * Write to a channel the given packed cell. * - * This function sets new remote end info on a channel; this is intended - * for use by the lower layer. - */ - -void -channel_set_remote_end(channel_t *chan, - const char *identity_digest, - const char *nickname) -{ - int was_in_digest_map, should_be_in_digest_map, state_not_in_map; - - tor_assert(chan); - - log_debug(LD_CHANNEL, - "Setting remote endpoint identity on channel %p with " - "global ID " U64_FORMAT " to nickname %s, digest %s", - chan, U64_PRINTF_ARG(chan->global_identifier), - nickname ? nickname : "(null)", - identity_digest ? - hex_str(identity_digest, DIGEST_LEN) : "(null)"); - - state_not_in_map = CHANNEL_CONDEMNED(chan); - - was_in_digest_map = - !state_not_in_map && - chan->registered && - !tor_digest_is_zero(chan->identity_digest); - should_be_in_digest_map = - !state_not_in_map && - chan->registered && - (identity_digest && - !tor_digest_is_zero(identity_digest)); - - if (was_in_digest_map) - /* We should always remove it; we'll add it back if we're writing - * in a new digest. - */ - channel_remove_from_digest_map(chan); - - if (identity_digest) { - memcpy(chan->identity_digest, - identity_digest, - sizeof(chan->identity_digest)); - - } else { - memset(chan->identity_digest, 0, - sizeof(chan->identity_digest)); - } - - tor_free(chan->nickname); - if (nickname) - chan->nickname = tor_strdup(nickname); - - /* Put it in the digest map if we should */ - if (should_be_in_digest_map) - channel_add_to_digest_map(chan); -} - -/** - * Duplicate a cell queue entry; this is a shallow copy intended for use - * in channel_write_cell_queue_entry(). - */ - -static cell_queue_entry_t * -cell_queue_entry_dup(cell_queue_entry_t *q) -{ - cell_queue_entry_t *rv = NULL; - - tor_assert(q); - - rv = tor_malloc(sizeof(*rv)); - memcpy(rv, q, sizeof(*rv)); - - return rv; -} - -/** - * Free a cell_queue_entry_t; the handed_off parameter indicates whether - * the contents were passed to the lower layer (it is responsible for - * them) or not (we should free). - */ - -STATIC void -cell_queue_entry_free(cell_queue_entry_t *q, int handed_off) -{ - if (!q) return; - - if (!handed_off) { - /* - * If we handed it off, the recipient becomes responsible (or - * with packed cells the channel_t subclass calls packed_cell - * free after writing out its contents; see, e.g., - * channel_tls_write_packed_cell_method(). Otherwise, we have - * to take care of it here if possible. - */ - switch (q->type) { - case CELL_QUEUE_FIXED: - if (q->u.fixed.cell) { - /* - * There doesn't seem to be a cell_free() function anywhere in the - * pre-channel code; just use tor_free() - */ - tor_free(q->u.fixed.cell); - } - break; - case CELL_QUEUE_PACKED: - if (q->u.packed.packed_cell) { - packed_cell_free(q->u.packed.packed_cell); - } - break; - case CELL_QUEUE_VAR: - if (q->u.var.var_cell) { - /* - * This one's in connection_or.c; it'd be nice to figure out the - * whole flow of cells from one end to the other and factor the - * cell memory management functions like this out of the specific - * TLS lower layer. - */ - var_cell_free(q->u.var.var_cell); - } - break; - default: - /* - * Nothing we can do if we don't know the type; this will - * have been warned about elsewhere. - */ - break; - } - } - tor_free(q); -} - -#if 0 -/** - * Check whether a cell queue entry is padding; this is a helper function - * for channel_write_cell_queue_entry() - */ - -static int -cell_queue_entry_is_padding(cell_queue_entry_t *q) -{ - tor_assert(q); - - if (q->type == CELL_QUEUE_FIXED) { - if (q->u.fixed.cell) { - if (q->u.fixed.cell->command == CELL_PADDING || - q->u.fixed.cell->command == CELL_VPADDING) { - return 1; - } - } - } else if (q->type == CELL_QUEUE_VAR) { - if (q->u.var.var_cell) { - if (q->u.var.var_cell->command == CELL_PADDING || - q->u.var.var_cell->command == CELL_VPADDING) { - return 1; - } - } - } - - return 0; -} -#endif /* 0 */ - -/** - * Allocate a new cell queue entry for a fixed-size cell - */ - -static cell_queue_entry_t * -cell_queue_entry_new_fixed(cell_t *cell) -{ - cell_queue_entry_t *q = NULL; - - tor_assert(cell); - - q = tor_malloc(sizeof(*q)); - q->type = CELL_QUEUE_FIXED; - q->u.fixed.cell = cell; - - return q; -} - -/** - * Allocate a new cell queue entry for a variable-size cell - */ - -static cell_queue_entry_t * -cell_queue_entry_new_var(var_cell_t *var_cell) -{ - cell_queue_entry_t *q = NULL; - - tor_assert(var_cell); - - q = tor_malloc(sizeof(*q)); - q->type = CELL_QUEUE_VAR; - q->u.var.var_cell = var_cell; - - return q; -} - -/** - * Ask how big the cell contained in a cell_queue_entry_t is - */ - -static size_t -channel_get_cell_queue_entry_size(channel_t *chan, cell_queue_entry_t *q) -{ - size_t rv = 0; - - tor_assert(chan); - tor_assert(q); - - switch (q->type) { - case CELL_QUEUE_FIXED: - rv = get_cell_network_size(chan->wide_circ_ids); - break; - case CELL_QUEUE_VAR: - rv = get_var_cell_header_size(chan->wide_circ_ids) + - (q->u.var.var_cell ? q->u.var.var_cell->payload_len : 0); - break; - case CELL_QUEUE_PACKED: - rv = get_cell_network_size(chan->wide_circ_ids); - break; - default: - tor_assert_nonfatal_unreached_once(); - } - - return rv; -} - -/** - * Write to a channel based on a cell_queue_entry_t + * Return 0 on success or -1 on error. * - * Given a cell_queue_entry_t filled out by the caller, try to send the cell - * and queue it if we can't. + * Two possible errors can happen. Either the channel is not opened or the + * lower layer (specialized channel) failed to write it. In both cases, it is + * the caller responsability to free the cell. */ - -static void -channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) +static int +write_packed_cell(channel_t *chan, packed_cell_t *cell) { - int result = 0, sent = 0; - cell_queue_entry_t *tmp = NULL; + int ret = -1; size_t cell_bytes; tor_assert(chan); - tor_assert(q); + tor_assert(cell); /* Assert that the state makes sense for a cell write */ tor_assert(CHANNEL_CAN_HANDLE_CELLS(chan)); { circid_t circ_id; - if (is_destroy_cell(chan, q, &circ_id)) { + if (packed_cell_is_destroy(chan, cell, &circ_id)) { channel_note_destroy_not_pending(chan, circ_id); } } /* For statistical purposes, figure out how big this cell is */ - cell_bytes = channel_get_cell_queue_entry_size(chan, q); + cell_bytes = get_cell_network_size(chan->wide_circ_ids); /* Can we send it right out? If so, try */ - if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) && - CHANNEL_IS_OPEN(chan)) { - /* Pick the right write function for this cell type and save the result */ - switch (q->type) { - case CELL_QUEUE_FIXED: - tor_assert(chan->write_cell); - tor_assert(q->u.fixed.cell); - result = chan->write_cell(chan, q->u.fixed.cell); - break; - case CELL_QUEUE_PACKED: - tor_assert(chan->write_packed_cell); - tor_assert(q->u.packed.packed_cell); - result = chan->write_packed_cell(chan, q->u.packed.packed_cell); - break; - case CELL_QUEUE_VAR: - tor_assert(chan->write_var_cell); - tor_assert(q->u.var.var_cell); - result = chan->write_var_cell(chan, q->u.var.var_cell); - break; - default: - tor_assert(1); - } - - /* Check if we got it out */ - if (result > 0) { - sent = 1; - /* Timestamp for transmission */ - channel_timestamp_xmit(chan); - /* If we're here the queue is empty, so it's drained too */ - channel_timestamp_drained(chan); - /* Update the counter */ - ++(chan->n_cells_xmitted); - chan->n_bytes_xmitted += cell_bytes; - /* Update global counters */ - ++n_channel_cells_queued; - ++n_channel_cells_passed_to_lower_layer; - n_channel_bytes_queued += cell_bytes; - n_channel_bytes_passed_to_lower_layer += cell_bytes; - channel_assert_counter_consistency(); - } + if (!CHANNEL_IS_OPEN(chan)) { + goto done; } - if (!sent) { - /* Not sent, queue it */ - /* - * We have to copy the queue entry passed in, since the caller probably - * used the stack. - */ - tmp = cell_queue_entry_dup(q); - TOR_SIMPLEQ_INSERT_TAIL(&chan->outgoing_queue, tmp, next); - /* Update global counters */ - ++n_channel_cells_queued; - ++n_channel_cells_in_queues; - n_channel_bytes_queued += cell_bytes; - n_channel_bytes_in_queues += cell_bytes; - channel_assert_counter_consistency(); - /* Update channel queue size */ - chan->bytes_in_queue += cell_bytes; - /* Try to process the queue? */ - if (CHANNEL_IS_OPEN(chan)) channel_flush_cells(chan); + /* Write the cell on the connection's outbuf. */ + if (chan->write_packed_cell(chan, cell) < 0) { + goto done; } + /* Timestamp for transmission */ + channel_timestamp_xmit(chan); + /* Update the counter */ + ++(chan->n_cells_xmitted); + chan->n_bytes_xmitted += cell_bytes; + /* Successfully sent the cell. */ + ret = 0; + + done: + return ret; } -/** Write a generic cell type to a channel +/** + * Write a packed cell to a channel + * + * Write a packed cell to a channel using the write_cell() method. This is + * called by the transport-independent code to deliver a packed cell to a + * channel for transmission. * - * Write a generic cell to a channel. It is called by channel_write_cell(), - * channel_write_var_cell() and channel_write_packed_cell() in order to reduce - * code duplication. Notice that it takes cell as pointer of type void, - * this can be dangerous because no type check is performed. + * Return 0 on success else a negative value. In both cases, the caller should + * not access the cell anymore, it is freed both on success and error. */ - -void -channel_write_cell_generic_(channel_t *chan, const char *cell_type, - void *cell, cell_queue_entry_t *q) +int +channel_write_packed_cell(channel_t *chan, packed_cell_t *cell) { + int ret = -1; tor_assert(chan); tor_assert(cell); if (CHANNEL_IS_CLOSING(chan)) { - log_debug(LD_CHANNEL, "Discarding %c %p on closing channel %p with " - "global ID "U64_FORMAT, *cell_type, cell, chan, + log_debug(LD_CHANNEL, "Discarding %p on closing channel %p with " + "global ID "U64_FORMAT, cell, chan, U64_PRINTF_ARG(chan->global_identifier)); - tor_free(cell); - return; + goto end; } log_debug(LD_CHANNEL, - "Writing %c %p to channel %p with global ID " - U64_FORMAT, *cell_type, - cell, chan, U64_PRINTF_ARG(chan->global_identifier)); + "Writing %p to channel %p with global ID " + U64_FORMAT, cell, chan, U64_PRINTF_ARG(chan->global_identifier)); - channel_write_cell_queue_entry(chan, q); - /* Update the queue size estimate */ - channel_update_xmit_queue_size(chan); -} - -/** - * Write a cell to a channel - * - * Write a fixed-length cell to a channel using the write_cell() method. - * This is equivalent to the pre-channels connection_or_write_cell_to_buf(); - * it is called by the transport-independent code to deliver a cell to a - * channel for transmission. - */ + ret = write_packed_cell(chan, cell); -void -channel_write_cell(channel_t *chan, cell_t *cell) -{ - cell_queue_entry_t q; - q.type = CELL_QUEUE_FIXED; - q.u.fixed.cell = cell; - channel_write_cell_generic_(chan, "cell_t", cell, &q); -} - -/** - * Write a packed cell to a channel - * - * Write a packed cell to a channel using the write_cell() method. This is - * called by the transport-independent code to deliver a packed cell to a - * channel for transmission. - */ - -void -channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell) -{ - cell_queue_entry_t q; - q.type = CELL_QUEUE_PACKED; - q.u.packed.packed_cell = packed_cell; - channel_write_cell_generic_(chan, "packed_cell_t", packed_cell, &q); -} - -/** - * Write a variable-length cell to a channel - * - * Write a variable-length cell to a channel using the write_cell() method. - * This is equivalent to the pre-channels - * connection_or_write_var_cell_to_buf(); it's called by the transport- - * independent code to deliver a var_cell to a channel for transmission. - */ - -void -channel_write_var_cell(channel_t *chan, var_cell_t *var_cell) -{ - cell_queue_entry_t q; - q.type = CELL_QUEUE_VAR; - q.u.var.var_cell = var_cell; - channel_write_cell_generic_(chan, "var_cell_t", var_cell, &q); + end: + /* Whatever happens, we free the cell. Either an error occured or the cell + * was put on the connection outbuf, both cases we have ownership of the + * cell and we free it. */ + packed_cell_free(cell); + return ret; } /** @@ -2119,15 +1557,6 @@ channel_change_state_(channel_t *chan, channel_state_t to_state) tor_assert(chan->reason_for_closing != CHANNEL_NOT_CLOSING); } - /* - * We need to maintain the queues here for some transitions: - * when we enter CHANNEL_STATE_OPEN (especially from CHANNEL_STATE_MAINT) - * we may have a backlog of cells to transmit, so drain the queues in - * that case, and when going to CHANNEL_STATE_CLOSED the subclass - * should have made sure to finish sending things (or gone to - * CHANNEL_STATE_ERROR if not possible), so we assert for that here. - */ - log_debug(LD_CHANNEL, "Changing state of channel %p (global ID " U64_FORMAT ") from \"%s\" to \"%s\"", @@ -2184,36 +1613,6 @@ channel_change_state_(channel_t *chan, channel_state_t to_state) } else if (to_state == CHANNEL_STATE_MAINT) { scheduler_channel_doesnt_want_writes(chan); } - - /* - * If we're closing, this channel no longer counts toward the global - * estimated queue size; if we're open, it now does. - */ - if ((to_state == CHANNEL_STATE_CLOSING || - to_state == CHANNEL_STATE_CLOSED || - to_state == CHANNEL_STATE_ERROR) && - (from_state == CHANNEL_STATE_OPEN || - from_state == CHANNEL_STATE_MAINT)) { - estimated_total_queue_size -= chan->bytes_in_queue; - } - - /* - * If we're opening, this channel now does count toward the global - * estimated queue size. - */ - if ((to_state == CHANNEL_STATE_OPEN || - to_state == CHANNEL_STATE_MAINT) && - !(from_state == CHANNEL_STATE_OPEN || - from_state == CHANNEL_STATE_MAINT)) { - estimated_total_queue_size += chan->bytes_in_queue; - } - - if (to_state == CHANNEL_STATE_CLOSED || - to_state == CHANNEL_STATE_ERROR) { - /* Assert that all queues are empty */ - tor_assert(TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)); - tor_assert(TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)); - } } /** @@ -2237,12 +1636,6 @@ channel_change_state_open(channel_t *chan) /* Tell circuits if we opened and stuff */ channel_do_open_actions(chan); chan->has_been_open = 1; - - /* Check for queued cells to process */ - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - channel_process_cells(chan); - if (! TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) - channel_flush_cells(chan); } /** @@ -2284,15 +1677,6 @@ channel_listener_change_state(channel_listener_t *chan_l, tor_assert(chan_l->reason_for_closing != CHANNEL_LISTENER_NOT_CLOSING); } - /* - * We need to maintain the queues here for some transitions: - * when we enter CHANNEL_STATE_OPEN (especially from CHANNEL_STATE_MAINT) - * we may have a backlog of cells to transmit, so drain the queues in - * that case, and when going to CHANNEL_STATE_CLOSED the subclass - * should have made sure to finish sending things (or gone to - * CHANNEL_STATE_ERROR if not possible), so we assert for that here. - */ - log_debug(LD_CHANNEL, "Changing state of channel listener %p (global ID " U64_FORMAT "from \"%s\" to \"%s\"", @@ -2325,30 +1709,38 @@ channel_listener_change_state(channel_listener_t *chan_l, if (to_state == CHANNEL_LISTENER_STATE_CLOSED || to_state == CHANNEL_LISTENER_STATE_ERROR) { - /* Assert that the queue is empty */ tor_assert(!(chan_l->incoming_list) || smartlist_len(chan_l->incoming_list) == 0); } } -/** - * Try to flush cells to the lower layer - * - * this is called by the lower layer to indicate that it wants more cells; - * it will try to write up to num_cells cells from the channel's cell queue or - * from circuits active on that channel, or as many as it has available if - * num_cells == -1. - */ - +/* Maximum number of cells that is allowed to flush at once withing + * channel_flush_some_cells(). */ #define MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED 256 +/* Try to flush cells of the given channel chan up to a maximum of num_cells. + * + * This is called by the scheduler when it wants to flush cells from the + * channel's circuit queue(s) to the connection outbuf (not yet on the wire). + * + * If the channel is not in state CHANNEL_STATE_OPEN, this does nothing and + * will return 0 meaning no cells were flushed. + * + * If num_cells is -1, we'll try to flush up to the maximum cells allowed + * defined in MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED. + * + * On success, the number of flushed cells are returned and it can never be + * above num_cells. If 0 is returned, no cells were flushed either because the + * channel was not opened or we had no cells on the channel. A negative number + * can NOT be sent back. + * + * This function is part of the fast path. */ MOCK_IMPL(ssize_t, channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) { unsigned int unlimited = 0; ssize_t flushed = 0; - int num_cells_from_circs, clamped_num_cells; - int q_len_before, q_len_after; + int clamped_num_cells; tor_assert(chan); @@ -2357,11 +1749,6 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */ if (CHANNEL_IS_OPEN(chan)) { - /* Try to flush as much as we can that's already queued */ - flushed += channel_flush_some_cells_from_outgoing_queue(chan, - (unlimited ? -1 : num_cells - flushed)); - if (!unlimited && num_cells <= flushed) goto done; - if (circuitmux_num_cells(chan->cmux) > 0) { /* Calculate number of cells, including clamp */ if (unlimited) { @@ -2375,45 +1762,9 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) } } - /* - * Keep track of the change in queue size; we have to count cells - * channel_flush_from_first_active_circuit() writes out directly, - * but not double-count ones we might get later in - * channel_flush_some_cells_from_outgoing_queue() - */ - q_len_before = chan_cell_queue_len(&(chan->outgoing_queue)); - /* Try to get more cells from any active circuits */ - num_cells_from_circs = channel_flush_from_first_active_circuit( + flushed = channel_flush_from_first_active_circuit( chan, clamped_num_cells); - - q_len_after = chan_cell_queue_len(&(chan->outgoing_queue)); - - /* - * If it claims we got some, adjust the flushed counter and consider - * processing the queue again - */ - if (num_cells_from_circs > 0) { - /* - * Adjust flushed by the number of cells counted in - * num_cells_from_circs that didn't go to the cell queue. - */ - - if (q_len_after > q_len_before) { - num_cells_from_circs -= (q_len_after - q_len_before); - if (num_cells_from_circs < 0) num_cells_from_circs = 0; - } - - flushed += num_cells_from_circs; - - /* Now process the queue if necessary */ - - if ((q_len_after > q_len_before) && - (unlimited || (flushed < num_cells))) { - flushed += channel_flush_some_cells_from_outgoing_queue(chan, - (unlimited ? -1 : num_cells - flushed)); - } - } } } @@ -2422,197 +1773,16 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) } /** - * Flush cells from just the channel's outgoing cell queue - * - * This gets called from channel_flush_some_cells() above to flush cells - * just from the queue without trying for active_circuits. - */ - -static ssize_t -channel_flush_some_cells_from_outgoing_queue(channel_t *chan, - ssize_t num_cells) -{ - unsigned int unlimited = 0; - ssize_t flushed = 0; - cell_queue_entry_t *q = NULL; - size_t cell_size; - int free_q = 0, handed_off = 0; - - tor_assert(chan); - tor_assert(chan->write_cell); - tor_assert(chan->write_packed_cell); - tor_assert(chan->write_var_cell); - - if (num_cells < 0) unlimited = 1; - if (!unlimited && num_cells <= flushed) return 0; - - /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */ - if (CHANNEL_IS_OPEN(chan)) { - while ((unlimited || num_cells > flushed) && - NULL != (q = TOR_SIMPLEQ_FIRST(&chan->outgoing_queue))) { - free_q = 0; - handed_off = 0; - - /* Figure out how big it is for statistical purposes */ - cell_size = channel_get_cell_queue_entry_size(chan, q); - /* - * Okay, we have a good queue entry, try to give it to the lower - * layer. - */ - switch (q->type) { - case CELL_QUEUE_FIXED: - if (q->u.fixed.cell) { - if (chan->write_cell(chan, - q->u.fixed.cell)) { - ++flushed; - channel_timestamp_xmit(chan); - ++(chan->n_cells_xmitted); - chan->n_bytes_xmitted += cell_size; - free_q = 1; - handed_off = 1; - } - /* Else couldn't write it; leave it on the queue */ - } else { - /* This shouldn't happen */ - log_info(LD_CHANNEL, - "Saw broken cell queue entry of type CELL_QUEUE_FIXED " - "with no cell on channel %p " - "(global ID " U64_FORMAT ").", - chan, U64_PRINTF_ARG(chan->global_identifier)); - /* Throw it away */ - free_q = 1; - handed_off = 0; - } - break; - case CELL_QUEUE_PACKED: - if (q->u.packed.packed_cell) { - if (chan->write_packed_cell(chan, - q->u.packed.packed_cell)) { - ++flushed; - channel_timestamp_xmit(chan); - ++(chan->n_cells_xmitted); - chan->n_bytes_xmitted += cell_size; - free_q = 1; - handed_off = 1; - } - /* Else couldn't write it; leave it on the queue */ - } else { - /* This shouldn't happen */ - log_info(LD_CHANNEL, - "Saw broken cell queue entry of type CELL_QUEUE_PACKED " - "with no cell on channel %p " - "(global ID " U64_FORMAT ").", - chan, U64_PRINTF_ARG(chan->global_identifier)); - /* Throw it away */ - free_q = 1; - handed_off = 0; - } - break; - case CELL_QUEUE_VAR: - if (q->u.var.var_cell) { - if (chan->write_var_cell(chan, - q->u.var.var_cell)) { - ++flushed; - channel_timestamp_xmit(chan); - ++(chan->n_cells_xmitted); - chan->n_bytes_xmitted += cell_size; - free_q = 1; - handed_off = 1; - } - /* Else couldn't write it; leave it on the queue */ - } else { - /* This shouldn't happen */ - log_info(LD_CHANNEL, - "Saw broken cell queue entry of type CELL_QUEUE_VAR " - "with no cell on channel %p " - "(global ID " U64_FORMAT ").", - chan, U64_PRINTF_ARG(chan->global_identifier)); - /* Throw it away */ - free_q = 1; - handed_off = 0; - } - break; - default: - /* Unknown type, log and free it */ - log_info(LD_CHANNEL, - "Saw an unknown cell queue entry type %d on channel %p " - "(global ID " U64_FORMAT "; ignoring it." - " Someone should fix this.", - q->type, chan, U64_PRINTF_ARG(chan->global_identifier)); - free_q = 1; - handed_off = 0; - } - - /* - * if free_q is set, we used it and should remove the queue entry; - * we have to do the free down here so TOR_SIMPLEQ_REMOVE_HEAD isn't - * accessing freed memory - */ - if (free_q) { - TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next); - /* - * ...and we handed a cell off to the lower layer, so we should - * update the counters. - */ - ++n_channel_cells_passed_to_lower_layer; - --n_channel_cells_in_queues; - n_channel_bytes_passed_to_lower_layer += cell_size; - n_channel_bytes_in_queues -= cell_size; - channel_assert_counter_consistency(); - /* Update the channel's queue size too */ - chan->bytes_in_queue -= cell_size; - /* Finally, free q */ - cell_queue_entry_free(q, handed_off); - q = NULL; - } else { - /* No cell removed from list, so we can't go on any further */ - break; - } - } - } - - /* Did we drain the queue? */ - if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) { - channel_timestamp_drained(chan); - } - - /* Update the estimate queue size */ - channel_update_xmit_queue_size(chan); - - return flushed; -} - -/** - * Flush as many cells as we possibly can from the queue - * - * This tries to flush as many cells from the queue as the lower layer - * will take. It just calls channel_flush_some_cells_from_outgoing_queue() - * in unlimited mode. - */ - -void -channel_flush_cells(channel_t *chan) -{ - channel_flush_some_cells_from_outgoing_queue(chan, -1); -} - -/** * Check if any cells are available * - * This gets used from the lower layer to check if any more cells are - * available. + * This is used by the scheduler to know if the channel has more to flush + * after a scheduling round. */ - MOCK_IMPL(int, channel_more_to_flush, (channel_t *chan)) { tor_assert(chan); - /* Check if we have any queued */ - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - return 1; - - /* Check if any circuits would like to queue some */ if (circuitmux_num_cells(chan->cmux) > 0) return 1; /* Else no */ @@ -2816,207 +1986,31 @@ channel_listener_queue_incoming(channel_listener_t *listener, } /** - * Process queued incoming cells - * - * Process as many queued cells as we can from the incoming - * cell queue. + * Process a cell from the given channel. */ - void -channel_process_cells(channel_t *chan) +channel_process_cell(channel_t *chan, cell_t *cell) { - cell_queue_entry_t *q; tor_assert(chan); tor_assert(CHANNEL_IS_CLOSING(chan) || CHANNEL_IS_MAINT(chan) || CHANNEL_IS_OPEN(chan)); - - log_debug(LD_CHANNEL, - "Processing as many incoming cells as we can for channel %p", - chan); - - /* Nothing we can do if we have no registered cell handlers */ - if (!(chan->cell_handler || - chan->var_cell_handler)) return; - /* Nothing we can do if we have no cells */ - if (TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) return; - - /* - * Process cells until we're done or find one we have no current handler - * for. - * - * We must free the cells here after calling the handler, since custody - * of the buffer was given to the channel layer when they were queued; - * see comments on memory management in channel_queue_cell() and in - * channel_queue_var_cell() below. - */ - while (NULL != (q = TOR_SIMPLEQ_FIRST(&chan->incoming_queue))) { - tor_assert(q); - tor_assert(q->type == CELL_QUEUE_FIXED || - q->type == CELL_QUEUE_VAR); - - if (q->type == CELL_QUEUE_FIXED && - chan->cell_handler) { - /* Handle a fixed-length cell */ - TOR_SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next); - tor_assert(q->u.fixed.cell); - log_debug(LD_CHANNEL, - "Processing incoming cell_t %p for channel %p (global ID " - U64_FORMAT ")", - q->u.fixed.cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - chan->cell_handler(chan, q->u.fixed.cell); - tor_free(q->u.fixed.cell); - tor_free(q); - } else if (q->type == CELL_QUEUE_VAR && - chan->var_cell_handler) { - /* Handle a variable-length cell */ - TOR_SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next); - tor_assert(q->u.var.var_cell); - log_debug(LD_CHANNEL, - "Processing incoming var_cell_t %p for channel %p (global ID " - U64_FORMAT ")", - q->u.var.var_cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - chan->var_cell_handler(chan, q->u.var.var_cell); - tor_free(q->u.var.var_cell); - tor_free(q); - } else { - /* Can't handle this one */ - break; - } - } -} - -/** - * Queue incoming cell - * - * This should be called by a channel_t subclass to queue an incoming fixed- - * length cell for processing, and process it if possible. - */ - -void -channel_queue_cell(channel_t *chan, cell_t *cell) -{ - int need_to_queue = 0; - cell_queue_entry_t *q; - cell_t *cell_copy = NULL; - - tor_assert(chan); tor_assert(cell); - tor_assert(CHANNEL_IS_OPEN(chan)); - /* Do we need to queue it, or can we just call the handler right away? */ - if (!(chan->cell_handler)) need_to_queue = 1; - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - need_to_queue = 1; + /* Nothing we can do if we have no registered cell handlers */ + if (!chan->cell_handler) + return; /* Timestamp for receiving */ channel_timestamp_recv(chan); - - /* Update the counters */ + /* Update received counter. */ ++(chan->n_cells_recved); chan->n_bytes_recved += get_cell_network_size(chan->wide_circ_ids); - /* If we don't need to queue we can just call cell_handler */ - if (!need_to_queue) { - tor_assert(chan->cell_handler); - log_debug(LD_CHANNEL, - "Directly handling incoming cell_t %p for channel %p " - "(global ID " U64_FORMAT ")", - cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - chan->cell_handler(chan, cell); - } else { - /* - * Otherwise queue it and then process the queue if possible. - * - * We queue a copy, not the original pointer - it might have been on the - * stack in connection_or_process_cells_from_inbuf() (or another caller - * if we ever have a subclass other than channel_tls_t), or be freed - * there after we return. This is the uncommon case; the non-copying - * fast path occurs in the if (!need_to_queue) case above when the - * upper layer has installed cell handlers. - */ - cell_copy = tor_malloc_zero(sizeof(cell_t)); - memcpy(cell_copy, cell, sizeof(cell_t)); - q = cell_queue_entry_new_fixed(cell_copy); - log_debug(LD_CHANNEL, - "Queueing incoming cell_t %p for channel %p " - "(global ID " U64_FORMAT ")", - cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - TOR_SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next); - if (chan->cell_handler || - chan->var_cell_handler) { - channel_process_cells(chan); - } - } -} - -/** - * Queue incoming variable-length cell - * - * This should be called by a channel_t subclass to queue an incoming - * variable-length cell for processing, and process it if possible. - */ - -void -channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell) -{ - int need_to_queue = 0; - cell_queue_entry_t *q; - var_cell_t *cell_copy = NULL; - - tor_assert(chan); - tor_assert(var_cell); - tor_assert(CHANNEL_IS_OPEN(chan)); - - /* Do we need to queue it, or can we just call the handler right away? */ - if (!(chan->var_cell_handler)) need_to_queue = 1; - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - need_to_queue = 1; - - /* Timestamp for receiving */ - channel_timestamp_recv(chan); - - /* Update the counter */ - ++(chan->n_cells_recved); - chan->n_bytes_recved += get_var_cell_header_size(chan->wide_circ_ids) + - var_cell->payload_len; - - /* If we don't need to queue we can just call cell_handler */ - if (!need_to_queue) { - tor_assert(chan->var_cell_handler); - log_debug(LD_CHANNEL, - "Directly handling incoming var_cell_t %p for channel %p " - "(global ID " U64_FORMAT ")", - var_cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - chan->var_cell_handler(chan, var_cell); - } else { - /* - * Otherwise queue it and then process the queue if possible. - * - * We queue a copy, not the original pointer - it might have been on the - * stack in connection_or_process_cells_from_inbuf() (or another caller - * if we ever have a subclass other than channel_tls_t), or be freed - * there after we return. This is the uncommon case; the non-copying - * fast path occurs in the if (!need_to_queue) case above when the - * upper layer has installed cell handlers. - */ - cell_copy = var_cell_copy(var_cell); - q = cell_queue_entry_new_var(cell_copy); - log_debug(LD_CHANNEL, - "Queueing incoming var_cell_t %p for channel %p " - "(global ID " U64_FORMAT ")", - var_cell, chan, - U64_PRINTF_ARG(chan->global_identifier)); - TOR_SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next); - if (chan->cell_handler || - chan->var_cell_handler) { - channel_process_cells(chan); - } - } + log_debug(LD_CHANNEL, + "Processing incoming cell_t %p for channel %p (global ID " + U64_FORMAT ")", cell, chan, + U64_PRINTF_ARG(chan->global_identifier)); + chan->cell_handler(chan, cell); } /** If <b>packed_cell</b> on <b>chan</b> is a destroy cell, then set @@ -3043,44 +2037,6 @@ packed_cell_is_destroy(channel_t *chan, } /** - * Assert that the global channel stats counters are internally consistent - */ - -static void -channel_assert_counter_consistency(void) -{ - tor_assert(n_channel_cells_queued == - (n_channel_cells_in_queues + n_channel_cells_passed_to_lower_layer)); - tor_assert(n_channel_bytes_queued == - (n_channel_bytes_in_queues + n_channel_bytes_passed_to_lower_layer)); -} - -/* DOCDOC */ -static int -is_destroy_cell(channel_t *chan, - const cell_queue_entry_t *q, circid_t *circid_out) -{ - *circid_out = 0; - switch (q->type) { - case CELL_QUEUE_FIXED: - if (q->u.fixed.cell->command == CELL_DESTROY) { - *circid_out = q->u.fixed.cell->circ_id; - return 1; - } - break; - case CELL_QUEUE_VAR: - if (q->u.var.var_cell->command == CELL_DESTROY) { - *circid_out = q->u.var.var_cell->circ_id; - return 1; - } - break; - case CELL_QUEUE_PACKED: - return packed_cell_is_destroy(chan, q->u.packed.packed_cell, circid_out); - } - return 0; -} - -/** * Send destroy cell on a channel * * Write a destroy cell with circ ID <b>circ_id</b> and reason <b>reason</b> @@ -3134,19 +2090,6 @@ channel_dumpstats(int severity) { if (all_channels && smartlist_len(all_channels) > 0) { tor_log(severity, LD_GENERAL, - "Channels have queued " U64_FORMAT " bytes in " U64_FORMAT " cells, " - "and handed " U64_FORMAT " bytes in " U64_FORMAT " cells to the lower" - " layer.", - U64_PRINTF_ARG(n_channel_bytes_queued), - U64_PRINTF_ARG(n_channel_cells_queued), - U64_PRINTF_ARG(n_channel_bytes_passed_to_lower_layer), - U64_PRINTF_ARG(n_channel_cells_passed_to_lower_layer)); - tor_log(severity, LD_GENERAL, - "There are currently " U64_FORMAT " bytes in " U64_FORMAT " cells " - "in channel queues.", - U64_PRINTF_ARG(n_channel_bytes_in_queues), - U64_PRINTF_ARG(n_channel_cells_in_queues)); - tor_log(severity, LD_GENERAL, "Dumping statistics about %d channels:", smartlist_len(all_channels)); tor_log(severity, LD_GENERAL, @@ -3629,19 +2572,6 @@ channel_listener_describe_transport(channel_listener_t *chan_l) } /** - * Return the number of entries in <b>queue</b> - */ -STATIC int -chan_cell_queue_len(const chan_cell_queue_t *queue) -{ - int r = 0; - cell_queue_entry_t *cell; - TOR_SIMPLEQ_FOREACH(cell, queue, next) - ++r; - return r; -} - -/** * Dump channel statistics * * Dump statistics for one channel to the log @@ -3676,35 +2606,18 @@ channel_dump_statistics, (channel_t *chan, int severity)) U64_PRINTF_ARG(chan->timestamp_active), U64_PRINTF_ARG(now - chan->timestamp_active)); - /* Handle digest and nickname */ + /* Handle digest. */ if (!tor_digest_is_zero(chan->identity_digest)) { - if (chan->nickname) { - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " says it is connected " - "to an OR with digest %s and nickname %s", - U64_PRINTF_ARG(chan->global_identifier), - hex_str(chan->identity_digest, DIGEST_LEN), - chan->nickname); - } else { - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " says it is connected " - "to an OR with digest %s and no known nickname", - U64_PRINTF_ARG(chan->global_identifier), - hex_str(chan->identity_digest, DIGEST_LEN)); - } + tor_log(severity, LD_GENERAL, + " * Channel " U64_FORMAT " says it is connected " + "to an OR with digest %s", + U64_PRINTF_ARG(chan->global_identifier), + hex_str(chan->identity_digest, DIGEST_LEN)); } else { - if (chan->nickname) { - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " does not know the digest" - " of the OR it is connected to, but reports its nickname is %s", - U64_PRINTF_ARG(chan->global_identifier), - chan->nickname); - } else { - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " does not know the digest" - " or the nickname of the OR it is connected to", - U64_PRINTF_ARG(chan->global_identifier)); - } + tor_log(severity, LD_GENERAL, + " * Channel " U64_FORMAT " does not know the digest" + " of the OR it is connected to", + U64_PRINTF_ARG(chan->global_identifier)); } /* Handle remote address and descriptions */ @@ -3753,14 +2666,6 @@ channel_dump_statistics, (channel_t *chan, int severity)) channel_is_incoming(chan) ? "incoming" : "outgoing"); - /* Describe queues */ - tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " has %d queued incoming cells" - " and %d queued outgoing cells", - U64_PRINTF_ARG(chan->global_identifier), - chan_cell_queue_len(&chan->incoming_queue), - chan_cell_queue_len(&chan->outgoing_queue)); - /* Describe circuits */ tor_log(severity, LD_GENERAL, " * Channel " U64_FORMAT " has %d active circuits out of" @@ -3779,12 +2684,6 @@ channel_dump_statistics, (channel_t *chan, int severity)) U64_PRINTF_ARG(chan->timestamp_client), U64_PRINTF_ARG(now - chan->timestamp_client)); tor_log(severity, LD_GENERAL, - " * Channel " U64_FORMAT " was last drained at " - U64_FORMAT " (" U64_FORMAT " seconds ago)", - U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(chan->timestamp_drained), - U64_PRINTF_ARG(now - chan->timestamp_drained)); - tor_log(severity, LD_GENERAL, " * Channel " U64_FORMAT " last received a cell " "at " U64_FORMAT " (" U64_FORMAT " seconds ago)", U64_PRINTF_ARG(chan->global_identifier), @@ -4027,29 +2926,18 @@ channel_get_addr_if_possible(channel_t *chan, tor_addr_t *addr_out) else return 0; } -/** - * Check if there are outgoing queue writes on this channel - * - * Indicate if either we have queued cells, or if not, whether the underlying - * lower-layer transport thinks it has an output queue. +/* + * Return true iff the channel has any cells on the connection outbuf waiting + * to be sent onto the network. */ - int channel_has_queued_writes(channel_t *chan) { - int has_writes = 0; - tor_assert(chan); tor_assert(chan->has_queued_writes); - if (! TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) { - has_writes = 1; - } else { - /* Check with the lower layer */ - has_writes = chan->has_queued_writes(chan); - } - - return has_writes; + /* Check with the lower layer */ + return chan->has_queued_writes(chan); } /** @@ -4274,23 +3162,10 @@ channel_mark_outgoing(channel_t *chan) ***********************/ /* - * Get the latest estimate for the total queue size of all open channels - */ - -uint64_t -channel_get_global_queue_estimate(void) -{ - return estimated_total_queue_size; -} - -/* * Estimate the number of writeable cells * - * Ask the lower layer for an estimate of how many cells it can accept, and - * then subtract the length of our outgoing_queue, if any, to produce an - * estimate of the number of cells this channel can accept for writes. + * Ask the lower layer for an estimate of how many cells it can accept. */ - int channel_num_cells_writeable(channel_t *chan) { @@ -4302,8 +3177,6 @@ channel_num_cells_writeable(channel_t *chan) if (chan->state == CHANNEL_STATE_OPEN) { /* Query lower layer */ result = chan->num_cells_writeable(chan); - /* Subtract cell queue length, if any */ - result -= chan_cell_queue_len(&chan->outgoing_queue); if (result < 0) result = 0; } else { /* No cells are writeable in any other state */ @@ -4427,25 +3300,6 @@ channel_timestamp_client(channel_t *chan) } /** - * Update the last drained timestamp - * - * This is called whenever we transmit a cell which leaves the outgoing cell - * queue completely empty. It also updates the xmit time and the active time. - */ - -void -channel_timestamp_drained(channel_t *chan) -{ - time_t now = time(NULL); - - tor_assert(chan); - - chan->timestamp_active = now; - chan->timestamp_drained = now; - chan->timestamp_xmit = now; -} - -/** * Update the recv timestamp * * This is called whenever we get an incoming cell from the lower layer. @@ -4504,54 +3358,6 @@ channel_when_created(channel_t *chan) } /** - * Query created timestamp for a channel listener - */ - -time_t -channel_listener_when_created(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - return chan_l->timestamp_created; -} - -/** - * Query last active timestamp for a channel - */ - -time_t -channel_when_last_active(channel_t *chan) -{ - tor_assert(chan); - - return chan->timestamp_active; -} - -/** - * Query last active timestamp for a channel listener - */ - -time_t -channel_listener_when_last_active(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - return chan_l->timestamp_active; -} - -/** - * Query last accepted timestamp for a channel listener - */ - -time_t -channel_listener_when_last_accepted(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - return chan_l->timestamp_accepted; -} - -/** * Query client timestamp */ @@ -4564,30 +3370,6 @@ channel_when_last_client(channel_t *chan) } /** - * Query drained timestamp - */ - -time_t -channel_when_last_drained(channel_t *chan) -{ - tor_assert(chan); - - return chan->timestamp_drained; -} - -/** - * Query recv timestamp - */ - -time_t -channel_when_last_recv(channel_t *chan) -{ - tor_assert(chan); - - return chan->timestamp_recv; -} - -/** * Query xmit timestamp */ @@ -4600,42 +3382,6 @@ channel_when_last_xmit(channel_t *chan) } /** - * Query accepted counter - */ - -uint64_t -channel_listener_count_accepted(channel_listener_t *chan_l) -{ - tor_assert(chan_l); - - return chan_l->n_accepted; -} - -/** - * Query received cell counter - */ - -uint64_t -channel_count_recved(channel_t *chan) -{ - tor_assert(chan); - - return chan->n_cells_recved; -} - -/** - * Query transmitted cell counter - */ - -uint64_t -channel_count_xmitted(channel_t *chan) -{ - tor_assert(chan); - - return chan->n_cells_xmitted; -} - -/** * Check if a channel matches an extend_info_t * * This function calls the lower layer and asks if this channel matches a @@ -4819,83 +3565,3 @@ channel_update_bad_for_new_circs(const char *digest, int force) } } -/** - * Update the estimated number of bytes queued to transmit for this channel, - * and notify the scheduler. The estimate includes both the channel queue and - * the queue size reported by the lower layer, and an overhead estimate - * optionally provided by the lower layer. - */ - -void -channel_update_xmit_queue_size(channel_t *chan) -{ - uint64_t queued, adj; - double overhead; - - tor_assert(chan); - tor_assert(chan->num_bytes_queued); - - /* - * First, get the number of bytes we have queued without factoring in - * lower-layer overhead. - */ - queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue; - /* Next, adjust by the overhead factor, if any is available */ - if (chan->get_overhead_estimate) { - overhead = chan->get_overhead_estimate(chan); - if (overhead >= 1.0) { - queued = (uint64_t)(queued * overhead); - } else { - /* Ignore silly overhead factors */ - log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead); - } - } - - /* Now, compare to the previous estimate */ - if (queued > chan->bytes_queued_for_xmit) { - adj = queued - chan->bytes_queued_for_xmit; - log_debug(LD_CHANNEL, - "Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT - " from " U64_FORMAT " to " U64_FORMAT, - U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(adj), - U64_PRINTF_ARG(chan->bytes_queued_for_xmit), - U64_PRINTF_ARG(queued)); - /* Update the channel's estimate */ - chan->bytes_queued_for_xmit = queued; - - /* Update the global queue size estimate if appropriate */ - if (chan->state == CHANNEL_STATE_OPEN || - chan->state == CHANNEL_STATE_MAINT) { - estimated_total_queue_size += adj; - log_debug(LD_CHANNEL, - "Increasing global queue size by " U64_FORMAT " for channel " - U64_FORMAT ", new size is " U64_FORMAT, - U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(estimated_total_queue_size)); - } - } else if (queued < chan->bytes_queued_for_xmit) { - adj = chan->bytes_queued_for_xmit - queued; - log_debug(LD_CHANNEL, - "Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT - " from " U64_FORMAT " to " U64_FORMAT, - U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(adj), - U64_PRINTF_ARG(chan->bytes_queued_for_xmit), - U64_PRINTF_ARG(queued)); - /* Update the channel's estimate */ - chan->bytes_queued_for_xmit = queued; - - /* Update the global queue size estimate if appropriate */ - if (chan->state == CHANNEL_STATE_OPEN || - chan->state == CHANNEL_STATE_MAINT) { - estimated_total_queue_size -= adj; - log_debug(LD_CHANNEL, - "Decreasing global queue size by " U64_FORMAT " for channel " - U64_FORMAT ", new size is " U64_FORMAT, - U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), - U64_PRINTF_ARG(estimated_total_queue_size)); - } - } -} - diff --git a/src/or/channel.h b/src/or/channel.h index 32336fe1d2..d88a77c9ae 100644 --- a/src/or/channel.h +++ b/src/or/channel.h @@ -19,10 +19,6 @@ typedef void (*channel_listener_fn_ptr)(channel_listener_t *, channel_t *); typedef void (*channel_cell_handler_fn_ptr)(channel_t *, cell_t *); typedef void (*channel_var_cell_handler_fn_ptr)(channel_t *, var_cell_t *); -struct cell_queue_entry_s; -TOR_SIMPLEQ_HEAD(chan_cell_queue, cell_queue_entry_s); -typedef struct chan_cell_queue chan_cell_queue_t; - /** * This enum is used by channelpadding to decide when to pad channels. * Don't add values to it without updating the checks in @@ -259,21 +255,12 @@ struct channel_s { */ ed25519_public_key_t ed25519_identity; - /** Nickname of the OR on the other side, or NULL if none. */ - char *nickname; - /** * Linked list of channels with the same RSA identity digest, for use with * the digest->channel map */ TOR_LIST_ENTRY(channel_s) next_with_same_id; - /** List of incoming cells to handle */ - chan_cell_queue_t incoming_queue; - - /** List of queued outgoing cells */ - chan_cell_queue_t outgoing_queue; - /** Circuit mux for circuits sending on this channel */ circuitmux_t *cmux; @@ -320,7 +307,6 @@ struct channel_s { /** Channel timestamps for cell channels */ time_t timestamp_client; /* Client used this, according to relay.c */ - time_t timestamp_drained; /* Output queue empty */ time_t timestamp_recv; /* Cell received from lower layer */ time_t timestamp_xmit; /* Cell sent to lower layer */ @@ -337,14 +323,6 @@ struct channel_s { /** Channel counters for cell channels */ uint64_t n_cells_recved, n_bytes_recved; uint64_t n_cells_xmitted, n_bytes_xmitted; - - /** Our current contribution to the scheduler's total xmit queue */ - uint64_t bytes_queued_for_xmit; - - /** Number of bytes in this channel's cell queue; does not include - * lower-layer queueing. - */ - uint64_t bytes_in_queue; }; struct channel_listener_s { @@ -412,18 +390,13 @@ channel_listener_state_to_string(channel_listener_state_t state); /* Abstract channel operations */ void channel_mark_for_close(channel_t *chan); -void channel_write_cell(channel_t *chan, cell_t *cell); -void channel_write_packed_cell(channel_t *chan, packed_cell_t *cell); -void channel_write_var_cell(channel_t *chan, var_cell_t *cell); +int channel_write_packed_cell(channel_t *chan, packed_cell_t *cell); void channel_listener_mark_for_close(channel_listener_t *chan_l); /* Channel callback registrations */ /* Listener callback */ -channel_listener_fn_ptr -channel_listener_get_listener_fn(channel_listener_t *chan); - void channel_listener_set_listener_fn(channel_listener_t *chan, channel_listener_fn_ptr listener); @@ -457,36 +430,9 @@ void channel_set_cmux_policy_everywhere(circuitmux_policy_t *pol); #ifdef TOR_CHANNEL_INTERNAL_ #ifdef CHANNEL_PRIVATE_ -/* Cell queue structure (here rather than channel.c for test suite use) */ - -typedef struct cell_queue_entry_s cell_queue_entry_t; -struct cell_queue_entry_s { - TOR_SIMPLEQ_ENTRY(cell_queue_entry_s) next; - enum { - CELL_QUEUE_FIXED, - CELL_QUEUE_VAR, - CELL_QUEUE_PACKED - } type; - union { - struct { - cell_t *cell; - } fixed; - struct { - var_cell_t *var_cell; - } var; - struct { - packed_cell_t *packed_cell; - } packed; - } u; -}; - -/* Cell queue functions for benefit of test suite */ -STATIC int chan_cell_queue_len(const chan_cell_queue_t *queue); -STATIC void cell_queue_entry_free(cell_queue_entry_t *q, int handed_off); +STATIC void channel_add_to_digest_map(channel_t *chan); -void channel_write_cell_generic_(channel_t *chan, const char *cell_type, - void *cell, cell_queue_entry_t *q); #endif /* defined(CHANNEL_PRIVATE_) */ /* Channel operations for subclasses and internal use only */ @@ -511,10 +457,6 @@ void channel_close_from_lower_layer(channel_t *chan); void channel_close_for_error(channel_t *chan); void channel_closed(channel_t *chan); -void channel_listener_close_from_lower_layer(channel_listener_t *chan_l); -void channel_listener_close_for_error(channel_listener_t *chan_l); -void channel_listener_closed(channel_listener_t *chan_l); - /* Free a channel */ void channel_free(channel_t *chan); void channel_listener_free(channel_listener_t *chan_l); @@ -532,9 +474,6 @@ void channel_mark_remote(channel_t *chan); void channel_set_identity_digest(channel_t *chan, const char *identity_digest, const ed25519_public_key_t *ed_identity); -void channel_set_remote_end(channel_t *chan, - const char *identity_digest, - const char *nickname); void channel_listener_change_state(channel_listener_t *chan_l, channel_listener_state_t to_state); @@ -542,7 +481,6 @@ void channel_listener_change_state(channel_listener_t *chan_l, /* Timestamp updates */ void channel_timestamp_created(channel_t *chan); void channel_timestamp_active(channel_t *chan); -void channel_timestamp_drained(channel_t *chan); void channel_timestamp_recv(channel_t *chan); void channel_timestamp_xmit(channel_t *chan); @@ -556,12 +494,7 @@ void channel_listener_queue_incoming(channel_listener_t *listener, channel_t *incoming); /* Incoming cell handling */ -void channel_process_cells(channel_t *chan); -void channel_queue_cell(channel_t *chan, cell_t *cell); -void channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell); - -/* Outgoing cell handling */ -void channel_flush_cells(channel_t *chan); +void channel_process_cell(channel_t *chan, cell_t *cell); /* Request from lower layer for more cells if available */ MOCK_DECL(ssize_t, channel_flush_some_cells, @@ -576,10 +509,6 @@ void channel_notify_flushed(channel_t *chan); /* Handle stuff we need to do on open like notifying circuits */ void channel_do_open_actions(channel_t *chan); -#ifdef TOR_UNIT_TESTS -extern uint64_t estimated_total_queue_size; -#endif - #endif /* defined(TOR_CHANNEL_INTERNAL_) */ /* Helper functions to perform operations on channels */ @@ -680,7 +609,6 @@ MOCK_DECL(void,channel_set_circid_type,(channel_t *chan, crypto_pk_t *identity_rcvd, int consider_identity)); void channel_timestamp_client(channel_t *chan); -void channel_update_xmit_queue_size(channel_t *chan); const char * channel_listener_describe_transport(channel_listener_t *chan_l); void channel_listener_dump_statistics(channel_listener_t *chan_l, @@ -692,27 +620,14 @@ void channel_check_for_duplicates(void); void channel_update_bad_for_new_circs(const char *digest, int force); /* Flow control queries */ -uint64_t channel_get_global_queue_estimate(void); int channel_num_cells_writeable(channel_t *chan); /* Timestamp queries */ time_t channel_when_created(channel_t *chan); -time_t channel_when_last_active(channel_t *chan); time_t channel_when_last_client(channel_t *chan); -time_t channel_when_last_drained(channel_t *chan); -time_t channel_when_last_recv(channel_t *chan); time_t channel_when_last_xmit(channel_t *chan); -time_t channel_listener_when_created(channel_listener_t *chan_l); -time_t channel_listener_when_last_active(channel_listener_t *chan_l); -time_t channel_listener_when_last_accepted(channel_listener_t *chan_l); - /* Counter queries */ -uint64_t channel_count_recved(channel_t *chan); -uint64_t channel_count_xmitted(channel_t *chan); - -uint64_t channel_listener_count_accepted(channel_listener_t *chan_l); - int packed_cell_is_destroy(channel_t *chan, const packed_cell_t *packed_cell, circid_t *circid_out); diff --git a/src/or/channeltls.c b/src/or/channeltls.c index 8277813186..023ccdefd3 100644 --- a/src/or/channeltls.c +++ b/src/or/channeltls.c @@ -832,6 +832,9 @@ channel_tls_write_cell_method(channel_t *chan, cell_t *cell) * * This implements the write_packed_cell method for channel_tls_t; given a * channel_tls_t and a packed_cell_t, transmit the packed_cell_t. + * + * Return 0 on success or negative value on error. The caller must free the + * packed cell. */ static int @@ -841,7 +844,6 @@ channel_tls_write_packed_cell_method(channel_t *chan, tor_assert(chan); channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan); size_t cell_network_size = get_cell_network_size(chan->wide_circ_ids); - int written = 0; tor_assert(tlschan); tor_assert(packed_cell); @@ -849,18 +851,15 @@ channel_tls_write_packed_cell_method(channel_t *chan, if (tlschan->conn) { connection_buf_add(packed_cell->body, cell_network_size, TO_CONN(tlschan->conn)); - - /* This is where the cell is finished; used to be done from relay.c */ - packed_cell_free(packed_cell); - ++written; } else { log_info(LD_CHANNEL, "something called write_packed_cell on a tlschan " "(%p with ID " U64_FORMAT " but no conn", chan, U64_PRINTF_ARG(chan->global_identifier)); + return -1; } - return written; + return 0; } /** @@ -1149,7 +1148,7 @@ channel_tls_handle_cell(cell_t *cell, or_connection_t *conn) * These are all transport independent and we pass them up through the * channel_t mechanism. They are ultimately handled in command.c. */ - channel_queue_cell(TLS_CHAN_TO_BASE(chan), cell); + channel_process_cell(TLS_CHAN_TO_BASE(chan), cell); break; default: log_fn(LOG_INFO, LD_PROTOCOL, diff --git a/src/or/circuitbuild.c b/src/or/circuitbuild.c index 2e6b63b4d6..4e9d2457c4 100644 --- a/src/or/circuitbuild.c +++ b/src/or/circuitbuild.c @@ -631,8 +631,7 @@ circuit_n_chan_done(channel_t *chan, int status, int close_origin_circuits) tor_assert(chan); - log_debug(LD_CIRC,"chan to %s/%s, status=%d", - chan->nickname ? chan->nickname : "NULL", + log_debug(LD_CIRC,"chan to %s, status=%d", channel_get_canonical_remote_descr(chan), status); pending_circs = smartlist_new(); diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c index fa19c0afd0..6157bf68b7 100644 --- a/src/or/circuitlist.c +++ b/src/or/circuitlist.c @@ -505,8 +505,7 @@ circuit_count_pending_on_channel(channel_t *chan) circuit_get_all_pending_on_channel(sl, chan); cnt = smartlist_len(sl); smartlist_free(sl); - log_debug(LD_CIRC,"or_conn to %s at %s, %d pending circs", - chan->nickname ? chan->nickname : "NULL", + log_debug(LD_CIRC,"or_conn to %s, %d pending circs", channel_get_canonical_remote_descr(chan), cnt); return cnt; diff --git a/src/or/connection_or.c b/src/or/connection_or.c index fdf1b2ebb1..c680c5b218 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -592,9 +592,6 @@ connection_or_flushed_some(or_connection_t *conn) { size_t datalen; - /* The channel will want to update its estimated queue size */ - channel_update_xmit_queue_size(TLS_CHAN_TO_BASE(conn->chan)); - /* If we're under the low water mark, add cells until we're just over the * high water mark. */ datalen = connection_get_outbuf_len(TO_CONN(conn)); diff --git a/src/or/relay.c b/src/or/relay.c index bbaf4ee785..0ad064417d 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -2746,7 +2746,13 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) /* this code is duplicated from some of the logic below. Ugly! XXXX */ tor_assert(destroy_queue->n > 0); cell = cell_queue_pop(destroy_queue); - channel_write_packed_cell(chan, cell); + /* Send the DESTROY cell. It is very unlikely that this fails but just + * in case, get rid of the channel. */ + if (channel_write_packed_cell(chan, cell) < 0) { + /* The cell has been freed. */ + channel_mark_for_close(chan); + continue; + } /* Update the cmux destroy counter */ circuitmux_notify_xmit_destroy(cmux); cell = NULL; @@ -2823,8 +2829,13 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) DIRREQ_TUNNELED, DIRREQ_CIRC_QUEUE_FLUSHED); - /* Now send the cell */ - channel_write_packed_cell(chan, cell); + /* Now send the cell. It is very unlikely that this fails but just in + * case, get rid of the channel. */ + if (channel_write_packed_cell(chan, cell) < 0) { + /* The cell has been freed at this point. */ + channel_mark_for_close(chan); + continue; + } cell = NULL; /* @@ -2859,22 +2870,13 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) return n_flushed; } -#if 0 -/** Indicate the current preferred cap for middle circuits; zero disables - * the cap. Right now it's just a constant, ORCIRC_MAX_MIDDLE_CELLS, but - * the logic in append_cell_to_circuit_queue() is written to be correct - * if we want to base it on a consensus param or something that might change - * in the future. - */ -static int -get_max_middle_cells(void) -{ - return ORCIRC_MAX_MIDDLE_CELLS; -} -#endif /* 0 */ - /** Add <b>cell</b> to the queue of <b>circ</b> writing to <b>chan</b> - * transmitting in <b>direction</b>. */ + * transmitting in <b>direction</b>. + * + * The given <b>cell</b> is copied over the circuit queue so the caller must + * cleanup the memory. + * + * This function is part of the fast path. */ void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, cell_t *cell, cell_direction_t direction, @@ -2883,10 +2885,6 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, or_circuit_t *orcirc = NULL; cell_queue_t *queue; int streams_blocked; -#if 0 - uint32_t tgt_max_middle_cells, p_len, n_len, tmp, hard_max_middle_cells; -#endif - int exitward; if (circ->marked_for_close) return; @@ -2901,93 +2899,14 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, streams_blocked = circ->streams_blocked_on_p_chan; } - /* - * Disabling this for now because of a possible guard discovery attack - */ -#if 0 - /* Are we a middle circuit about to exceed ORCIRC_MAX_MIDDLE_CELLS? */ - if ((circ->n_chan != NULL) && CIRCUIT_IS_ORCIRC(circ)) { - orcirc = TO_OR_CIRCUIT(circ); - if (orcirc->p_chan) { - /* We are a middle circuit if we have both n_chan and p_chan */ - /* We'll need to know the current preferred maximum */ - tgt_max_middle_cells = get_max_middle_cells(); - if (tgt_max_middle_cells > 0) { - /* Do we need to initialize middle_max_cells? */ - if (orcirc->max_middle_cells == 0) { - orcirc->max_middle_cells = tgt_max_middle_cells; - } else { - if (tgt_max_middle_cells > orcirc->max_middle_cells) { - /* If we want to increase the cap, we can do so right away */ - orcirc->max_middle_cells = tgt_max_middle_cells; - } else if (tgt_max_middle_cells < orcirc->max_middle_cells) { - /* - * If we're shrinking the cap, we can't shrink past either queue; - * compare tgt_max_middle_cells rather than tgt_max_middle_cells * - * ORCIRC_MAX_MIDDLE_KILL_THRESH so the queues don't shrink enough - * to generate spurious warnings, either. - */ - n_len = circ->n_chan_cells.n; - p_len = orcirc->p_chan_cells.n; - tmp = tgt_max_middle_cells; - if (tmp < n_len) tmp = n_len; - if (tmp < p_len) tmp = p_len; - orcirc->max_middle_cells = tmp; - } - /* else no change */ - } - } else { - /* tgt_max_middle_cells == 0 indicates we should disable the cap */ - orcirc->max_middle_cells = 0; - } - - /* Now we know orcirc->max_middle_cells is set correctly */ - if (orcirc->max_middle_cells > 0) { - hard_max_middle_cells = - (uint32_t)(((double)orcirc->max_middle_cells) * - ORCIRC_MAX_MIDDLE_KILL_THRESH); - - if ((unsigned)queue->n + 1 >= hard_max_middle_cells) { - /* Queueing this cell would put queue over the kill theshold */ - log_warn(LD_CIRC, - "Got a cell exceeding the hard cap of %u in the " - "%s direction on middle circ ID %u on chan ID " - U64_FORMAT "; killing the circuit.", - hard_max_middle_cells, - (direction == CELL_DIRECTION_OUT) ? "n" : "p", - (direction == CELL_DIRECTION_OUT) ? - circ->n_circ_id : orcirc->p_circ_id, - U64_PRINTF_ARG( - (direction == CELL_DIRECTION_OUT) ? - circ->n_chan->global_identifier : - orcirc->p_chan->global_identifier)); - circuit_mark_for_close(circ, END_CIRC_REASON_RESOURCELIMIT); - return; - } else if ((unsigned)queue->n + 1 == orcirc->max_middle_cells) { - /* Only use ==, not >= for this test so we don't spam the log */ - log_warn(LD_CIRC, - "While trying to queue a cell, reached the soft cap of %u " - "in the %s direction on middle circ ID %u " - "on chan ID " U64_FORMAT ".", - orcirc->max_middle_cells, - (direction == CELL_DIRECTION_OUT) ? "n" : "p", - (direction == CELL_DIRECTION_OUT) ? - circ->n_circ_id : orcirc->p_circ_id, - U64_PRINTF_ARG( - (direction == CELL_DIRECTION_OUT) ? - circ->n_chan->global_identifier : - orcirc->p_chan->global_identifier)); - } - } - } - } -#endif /* 0 */ - + /* Very important that we copy to the circuit queue because all calls to + * this function use the stack for the cell memory. */ cell_queue_append_packed_copy(circ, queue, exitward, cell, chan->wide_circ_ids, 1); + /* Check and run the OOM if needed. */ if (PREDICT_UNLIKELY(cell_queues_check_size())) { - /* We ran the OOM handler */ + /* We ran the OOM handler which might have closed this circuit. */ if (circ->marked_for_close) return; } diff --git a/src/test/fakechans.h b/src/test/fakechans.h index c0de430e3d..ab5d8461b6 100644 --- a/src/test/fakechans.h +++ b/src/test/fakechans.h @@ -20,7 +20,6 @@ void scheduler_release_channel_mock(channel_t *ch); /* Query some counters used by the exposed mocks */ int get_mock_scheduler_has_waiting_cells_count(void); -int get_mock_scheduler_release_channel_count(void); #endif /* !defined(TOR_FAKECHANS_H) */ diff --git a/src/test/test_channel.c b/src/test/test_channel.c index 023c2950c9..38b69a9ad3 100644 --- a/src/test/test_channel.c +++ b/src/test/test_channel.c @@ -6,8 +6,10 @@ #include "or.h" #include "channel.h" /* For channel_note_destroy_not_pending */ +#define CIRCUITLIST_PRIVATE #include "circuitlist.h" #include "circuitmux.h" +#include "circuitmux_ewma.h" /* For var_cell_free */ #include "connection_or.h" /* For packed_cell stuff */ @@ -15,8 +17,10 @@ #include "relay.h" /* For init/free stuff */ #include "scheduler.h" +#include "networkstatus.h" /* Test suite stuff */ +#include "log_test_helpers.h" #include "test.h" #include "fakechans.h" @@ -26,62 +30,18 @@ static cell_t * test_chan_last_seen_fixed_cell_ptr = NULL; static int test_chan_var_cells_recved = 0; static var_cell_t * test_chan_last_seen_var_cell_ptr = NULL; static int test_cells_written = 0; -static int test_destroy_not_pending_calls = 0; static int test_doesnt_want_writes_count = 0; static int test_dumpstats_calls = 0; static int test_has_waiting_cells_count = 0; -static double test_overhead_estimate = 1.0; static int test_releases_count = 0; -static circuitmux_t *test_target_cmux = NULL; -static unsigned int test_cmux_cells = 0; static channel_t *dump_statistics_mock_target = NULL; static int dump_statistics_mock_matches = 0; - -static void chan_test_channel_dump_statistics_mock( - channel_t *chan, int severity); -static int chan_test_channel_flush_from_first_active_circuit_mock( - channel_t *chan, int max); -static unsigned int chan_test_circuitmux_num_cells_mock(circuitmux_t *cmux); -static void channel_note_destroy_not_pending_mock(channel_t *ch, - circid_t circid); -static void chan_test_cell_handler(channel_t *ch, - cell_t *cell); -static const char * chan_test_describe_transport(channel_t *ch); -static void chan_test_dumpstats(channel_t *ch, int severity); -static void chan_test_var_cell_handler(channel_t *ch, - var_cell_t *var_cell); -static void chan_test_close(channel_t *ch); -static void chan_test_error(channel_t *ch); -static void chan_test_finish_close(channel_t *ch); -static const char * chan_test_get_remote_descr(channel_t *ch, int flags); -static int chan_test_is_canonical(channel_t *ch, int req); -static size_t chan_test_num_bytes_queued(channel_t *ch); -static int chan_test_num_cells_writeable(channel_t *ch); -static int chan_test_write_cell(channel_t *ch, cell_t *cell); -static int chan_test_write_packed_cell(channel_t *ch, - packed_cell_t *packed_cell); -static int chan_test_write_var_cell(channel_t *ch, var_cell_t *var_cell); -static void scheduler_channel_doesnt_want_writes_mock(channel_t *ch); - -static void test_channel_dumpstats(void *arg); -static void test_channel_flush(void *arg); -static void test_channel_flushmux(void *arg); -static void test_channel_incoming(void *arg); -static void test_channel_lifecycle(void *arg); -static void test_channel_multi(void *arg); -static void test_channel_queue_incoming(void *arg); -static void test_channel_queue_size(void *arg); -static void test_channel_write(void *arg); - -static void -channel_note_destroy_not_pending_mock(channel_t *ch, - circid_t circid) -{ - (void)ch; - (void)circid; - - ++test_destroy_not_pending_calls; -} +static int test_close_called = 0; +static int test_chan_should_be_canonical = 0; +static int test_chan_should_match_target = 0; +static int test_chan_canonical_should_be_reliable = 0; +static int test_chan_listener_close_fn_called = 0; +static int test_chan_listener_fn_called = 0; static const char * chan_test_describe_transport(channel_t *ch) @@ -112,71 +72,14 @@ chan_test_channel_dump_statistics_mock(channel_t *chan, int severity) return; } -/** - * If the target cmux is the cmux for chan, make fake cells up to the - * target number of cells and write them to chan. Otherwise, invoke - * the real channel_flush_from_first_active_circuit(). - */ - -static int -chan_test_channel_flush_from_first_active_circuit_mock(channel_t *chan, - int max) -{ - int result = 0, c = 0; - packed_cell_t *cell = NULL; - - tt_ptr_op(chan, OP_NE, NULL); - if (test_target_cmux != NULL && - test_target_cmux == chan->cmux) { - while (c <= max && test_cmux_cells > 0) { - cell = packed_cell_new(); - channel_write_packed_cell(chan, cell); - ++c; - --test_cmux_cells; - } - result = c; - } else { - result = channel_flush_from_first_active_circuit__real(chan, max); - } - - done: - return result; -} - -/** - * If we have a target cmux set and this matches it, lie about how - * many cells we have according to the number indicated; otherwise - * pass to the real circuitmux_num_cells(). - */ - -static unsigned int -chan_test_circuitmux_num_cells_mock(circuitmux_t *cmux) -{ - unsigned int result = 0; - - tt_ptr_op(cmux, OP_NE, NULL); - if (cmux != NULL) { - if (cmux == test_target_cmux) { - result = test_cmux_cells; - } else { - result = circuitmux_num_cells__real(cmux); - } - } - - done: - - return result; -} - /* * Handle an incoming fixed-size cell for unit tests */ static void -chan_test_cell_handler(channel_t *ch, - cell_t *cell) +chan_test_cell_handler(channel_t *chan, cell_t *cell) { - tt_assert(ch); + tt_assert(chan); tt_assert(cell); test_chan_last_seen_fixed_cell_ptr = cell; @@ -226,6 +129,8 @@ chan_test_close(channel_t *ch) { tt_assert(ch); + ++test_close_called; + done: return; } @@ -274,35 +179,6 @@ chan_test_get_remote_descr(channel_t *ch, int flags) return "Fake channel for unit tests; no real endpoint"; } -static double -chan_test_get_overhead_estimate(channel_t *ch) -{ - tt_assert(ch); - - done: - return test_overhead_estimate; -} - -static int -chan_test_is_canonical(channel_t *ch, int req) -{ - tt_ptr_op(ch, OP_NE, NULL); - tt_assert(req == 0 || req == 1); - - done: - /* Fake channels are always canonical */ - return 1; -} - -static size_t -chan_test_num_bytes_queued(channel_t *ch) -{ - tt_assert(ch); - - done: - return 0; -} - static int chan_test_num_cells_writeable(channel_t *ch) { @@ -313,26 +189,6 @@ chan_test_num_cells_writeable(channel_t *ch) } static int -chan_test_write_cell(channel_t *ch, cell_t *cell) -{ - int rv = 0; - - tt_assert(ch); - tt_assert(cell); - - if (test_chan_accept_cells) { - /* Free the cell and bump the counter */ - tor_free(cell); - ++test_cells_written; - rv = 1; - } - /* else return 0, we didn't accept it */ - - done: - return rv; -} - -static int chan_test_write_packed_cell(channel_t *ch, packed_cell_t *packed_cell) { @@ -343,7 +199,6 @@ chan_test_write_packed_cell(channel_t *ch, if (test_chan_accept_cells) { /* Free the cell and bump the counter */ - packed_cell_free(packed_cell); ++test_cells_written; rv = 1; } @@ -419,36 +274,26 @@ new_fake_channel(void) channel_init(chan); chan->close = chan_test_close; - chan->get_overhead_estimate = chan_test_get_overhead_estimate; - chan->get_remote_descr = chan_test_get_remote_descr; - chan->num_bytes_queued = chan_test_num_bytes_queued; chan->num_cells_writeable = chan_test_num_cells_writeable; - chan->write_cell = chan_test_write_cell; + chan->get_remote_descr = chan_test_get_remote_descr; chan->write_packed_cell = chan_test_write_packed_cell; chan->write_var_cell = chan_test_write_var_cell; chan->state = CHANNEL_STATE_OPEN; + chan->cmux = circuitmux_alloc(); + return chan; } void free_fake_channel(channel_t *chan) { - cell_queue_entry_t *cell, *cell_tmp; - if (! chan) return; if (chan->cmux) circuitmux_free(chan->cmux); - TOR_SIMPLEQ_FOREACH_SAFE(cell, &chan->incoming_queue, next, cell_tmp) { - cell_queue_entry_free(cell, 0); - } - TOR_SIMPLEQ_FOREACH_SAFE(cell, &chan->outgoing_queue, next, cell_tmp) { - cell_queue_entry_free(cell, 0); - } - tor_free(chan); } @@ -489,16 +334,6 @@ scheduler_channel_doesnt_want_writes_mock(channel_t *ch) } /** - * Counter query for scheduler_release_channel_mock() - */ - -int -get_mock_scheduler_release_channel_count(void) -{ - return test_releases_count; -} - -/** * Mock for scheduler_release_channel() */ @@ -513,6 +348,58 @@ scheduler_release_channel_mock(channel_t *ch) return; } +static int +test_chan_is_canonical(channel_t *chan, int req) +{ + tor_assert(chan); + + if (req && test_chan_canonical_should_be_reliable) { + return 1; + } + + if (test_chan_should_be_canonical) { + return 1; + } + return 0; +} + +static int +test_chan_matches_target(channel_t *chan, const tor_addr_t *target) +{ + (void) chan; + (void) target; + + if (test_chan_should_match_target) { + return 1; + } + return 0; +} + +static void +test_chan_listener_close(channel_listener_t *chan) +{ + (void) chan; + ++test_chan_listener_close_fn_called; + return; +} + +static void +test_chan_listener_fn(channel_listener_t *listener, channel_t *chan) +{ + (void) listener; + (void) chan; + + ++test_chan_listener_fn_called; + return; +} + +static const char * +test_chan_listener_describe_transport(channel_listener_t *chan) +{ + (void) chan; + return "Fake listener channel."; +} + /** * Test for channel_dumpstats() and limited test for * channel_dump_statistics() @@ -523,6 +410,7 @@ test_channel_dumpstats(void *arg) { channel_t *ch = NULL; cell_t *cell = NULL; + packed_cell_t *p_cell = NULL; int old_count; (void)arg; @@ -536,7 +424,6 @@ test_channel_dumpstats(void *arg) /* Set up a new fake channel */ ch = new_fake_channel(); tt_assert(ch); - ch->cmux = circuitmux_alloc(); /* Try to register it */ channel_register(ch); @@ -579,24 +466,21 @@ test_channel_dumpstats(void *arg) /* Now make another channel */ ch = new_fake_channel(); tt_assert(ch); - ch->cmux = circuitmux_alloc(); channel_register(ch); - tt_assert(ch->registered); + tt_int_op(ch->registered, OP_EQ, 1); /* Lie about its age so dumpstats gets coverage for rate calculations */ ch->timestamp_created = time(NULL) - 30; - tt_assert(ch->timestamp_created > 0); - tt_assert(time(NULL) > ch->timestamp_created); + tt_int_op(ch->timestamp_created, OP_GT, 0); + tt_int_op(time(NULL), OP_GT, ch->timestamp_created); /* Put cells through it both ways to make the counters non-zero */ - cell = tor_malloc_zero(sizeof(*cell)); - make_fake_cell(cell); + p_cell = packed_cell_new(); test_chan_accept_cells = 1; old_count = test_cells_written; - channel_write_cell(ch, cell); - cell = NULL; + channel_write_packed_cell(ch, p_cell); tt_int_op(test_cells_written, OP_EQ, old_count + 1); - tt_assert(ch->n_bytes_xmitted > 0); - tt_assert(ch->n_cells_xmitted > 0); + tt_u64_op(ch->n_bytes_xmitted, OP_GT, 0); + tt_u64_op(ch->n_cells_xmitted, OP_GT, 0); /* Receive path */ channel_set_cell_handlers(ch, @@ -605,19 +489,18 @@ test_channel_dumpstats(void *arg) tt_ptr_op(channel_get_cell_handler(ch), OP_EQ, chan_test_cell_handler); tt_ptr_op(channel_get_var_cell_handler(ch), OP_EQ, chan_test_var_cell_handler); - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); + cell = tor_malloc_zero(sizeof(*cell)); old_count = test_chan_fixed_cells_recved; - channel_queue_cell(ch, cell); - tor_free(cell); + channel_process_cell(ch, cell); tt_int_op(test_chan_fixed_cells_recved, OP_EQ, old_count + 1); - tt_assert(ch->n_bytes_recved > 0); - tt_assert(ch->n_cells_recved > 0); + tt_u64_op(ch->n_bytes_recved, OP_GT, 0); + tt_u64_op(ch->n_cells_recved, OP_GT, 0); /* Test channel_dump_statistics */ ch->describe_transport = chan_test_describe_transport; ch->dumpstats = chan_test_dumpstats; - ch->is_canonical = chan_test_is_canonical; + test_chan_should_be_canonical = 1; + ch->is_canonical = test_chan_is_canonical; old_count = test_dumpstats_calls; channel_dump_statistics(ch, LOG_DEBUG); tt_int_op(test_dumpstats_calls, OP_EQ, old_count + 1); @@ -631,8 +514,8 @@ test_channel_dumpstats(void *arg) ch = NULL; done: - tor_free(cell); free_fake_channel(ch); + tor_free(cell); UNMOCK(scheduler_channel_doesnt_want_writes); UNMOCK(scheduler_release_channel); @@ -640,215 +523,229 @@ test_channel_dumpstats(void *arg) return; } +/* Test outbound cell. The callstack is: + * channel_flush_some_cells() + * -> channel_flush_from_first_active_circuit() + * -> channel_write_packed_cell() + * -> write_packed_cell() + * -> chan->write_packed_cell() fct ptr. + * + * This test goes from a cell in a circuit up to the channel write handler + * that should put them on the connection outbuf. */ static void -test_channel_flush(void *arg) +test_channel_outbound_cell(void *arg) { - channel_t *ch = NULL; - cell_t *cell = NULL; - packed_cell_t *p_cell = NULL; - var_cell_t *v_cell = NULL; - int init_count; - - (void)arg; + int old_count; + channel_t *chan = NULL; + packed_cell_t *p_cell = NULL, *p_cell2 = NULL; + origin_circuit_t *circ = NULL; + cell_queue_t *queue; - ch = new_fake_channel(); - tt_assert(ch); + (void) arg; - /* Cache the original count */ - init_count = test_cells_written; + /* The channel will be freed so we need to hijack this so the scheduler + * doesn't get confused. */ + MOCK(scheduler_release_channel, scheduler_release_channel_mock); - /* Stop accepting so we can queue some */ - test_chan_accept_cells = 0; + /* Accept cells to lower layer */ + test_chan_accept_cells = 1; - /* Queue a regular cell */ - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - channel_write_cell(ch, cell); - /* It should be queued, so assert that we didn't write it */ - tt_int_op(test_cells_written, OP_EQ, init_count); - - /* Queue a var cell */ - v_cell = tor_malloc_zero(sizeof(var_cell_t) + CELL_PAYLOAD_SIZE); - make_fake_var_cell(v_cell); - channel_write_var_cell(ch, v_cell); - /* It should be queued, so assert that we didn't write it */ - tt_int_op(test_cells_written, OP_EQ, init_count); - - /* Try a packed cell now */ + /* Setup a valid circuit to queue a cell. */ + circ = origin_circuit_new(); + tt_assert(circ); + /* Circuit needs an origin purpose to be considered origin. */ + TO_CIRCUIT(circ)->purpose = CIRCUIT_PURPOSE_C_GENERAL; + TO_CIRCUIT(circ)->n_circ_id = 42; + /* This is the outbound test so use the next channel queue. */ + queue = &TO_CIRCUIT(circ)->n_chan_cells; + /* Setup packed cell to queue on the circuit. */ p_cell = packed_cell_new(); tt_assert(p_cell); - channel_write_packed_cell(ch, p_cell); - /* It should be queued, so assert that we didn't write it */ - tt_int_op(test_cells_written, OP_EQ, init_count); - - /* Now allow writes through again */ - test_chan_accept_cells = 1; - - /* ...and flush */ - channel_flush_cells(ch); - - /* All three should have gone through */ - tt_int_op(test_cells_written, OP_EQ, init_count + 3); - - done: - tor_free(ch); - - return; -} - -/** - * Channel flush tests that require cmux mocking - */ - -static void -test_channel_flushmux(void *arg) -{ - channel_t *ch = NULL; - int old_count, q_len_before, q_len_after; - ssize_t result; - - (void)arg; - - /* Install mocks we need for this test */ - MOCK(channel_flush_from_first_active_circuit, - chan_test_channel_flush_from_first_active_circuit_mock); - MOCK(circuitmux_num_cells, - chan_test_circuitmux_num_cells_mock); - - ch = new_fake_channel(); - tt_assert(ch); - ch->cmux = circuitmux_alloc(); - + p_cell2 = packed_cell_new(); + tt_assert(p_cell2); + /* Setup a channel to put the circuit on. */ + chan = new_fake_channel(); + tt_assert(chan); + chan->state = CHANNEL_STATE_OPENING; + channel_change_state_open(chan); + /* Outbound channel. */ + channel_mark_outgoing(chan); + /* Try to register it so we can clean it through the channel cleanup + * process. */ + channel_register(chan); + tt_int_op(chan->registered, OP_EQ, 1); + /* Set EWMA policy so we can pick it when flushing. */ + channel_set_cmux_policy_everywhere(&ewma_policy); + tt_ptr_op(circuitmux_get_policy(chan->cmux), OP_EQ, &ewma_policy); + + /* Register circuit to the channel circid map which will attach the circuit + * to the channel's cmux as well. */ + circuit_set_n_circid_chan(TO_CIRCUIT(circ), 42, chan); + tt_int_op(channel_num_circuits(chan), OP_EQ, 1); + tt_assert(!TO_CIRCUIT(circ)->next_active_on_n_chan); + tt_assert(!TO_CIRCUIT(circ)->prev_active_on_n_chan); + /* Test the cmux state. */ + tt_ptr_op(TO_CIRCUIT(circ)->n_mux, OP_EQ, chan->cmux); + tt_int_op(circuitmux_is_circuit_attached(chan->cmux, TO_CIRCUIT(circ)), + OP_EQ, 1); + + /* Flush the channel without any cell on it. */ old_count = test_cells_written; - - test_target_cmux = ch->cmux; - test_cmux_cells = 1; - - /* Enable cell acceptance */ - test_chan_accept_cells = 1; - - result = channel_flush_some_cells(ch, 1); - - tt_int_op(result, OP_EQ, 1); + ssize_t flushed = channel_flush_some_cells(chan, 1); + tt_i64_op(flushed, OP_EQ, 0); + tt_int_op(test_cells_written, OP_EQ, old_count); + tt_int_op(channel_more_to_flush(chan), OP_EQ, 0); + tt_int_op(circuitmux_num_active_circuits(chan->cmux), OP_EQ, 0); + tt_int_op(circuitmux_num_cells(chan->cmux), OP_EQ, 0); + tt_int_op(circuitmux_is_circuit_active(chan->cmux, TO_CIRCUIT(circ)), + OP_EQ, 0); + tt_u64_op(chan->n_cells_xmitted, OP_EQ, 0); + tt_u64_op(chan->n_bytes_xmitted, OP_EQ, 0); + + /* Queue cell onto the next queue that is the outbound direction. Than + * update its cmux so the circuit can be picked when flushing cells. */ + cell_queue_append(queue, p_cell); + p_cell = NULL; + tt_int_op(queue->n, OP_EQ, 1); + cell_queue_append(queue, p_cell2); + p_cell2 = NULL; + tt_int_op(queue->n, OP_EQ, 2); + + update_circuit_on_cmux(TO_CIRCUIT(circ), CELL_DIRECTION_OUT); + tt_int_op(circuitmux_num_active_circuits(chan->cmux), OP_EQ, 1); + tt_int_op(circuitmux_num_cells(chan->cmux), OP_EQ, 2); + tt_int_op(circuitmux_is_circuit_active(chan->cmux, TO_CIRCUIT(circ)), + OP_EQ, 1); + + /* From this point on, we have a queued cell on an active circuit attached + * to the channel's cmux. */ + + /* Flush the first cell. This is going to go down the call stack. */ + old_count = test_cells_written; + flushed = channel_flush_some_cells(chan, 1); + tt_i64_op(flushed, OP_EQ, 1); tt_int_op(test_cells_written, OP_EQ, old_count + 1); - tt_int_op(test_cmux_cells, OP_EQ, 0); - - /* Now try it without accepting to force them into the queue */ - test_chan_accept_cells = 0; - test_cmux_cells = 1; - q_len_before = chan_cell_queue_len(&(ch->outgoing_queue)); - - result = channel_flush_some_cells(ch, 1); - - /* We should not have actually flushed any */ - tt_int_op(result, OP_EQ, 0); + tt_int_op(circuitmux_num_cells(chan->cmux), OP_EQ, 1); + tt_int_op(channel_more_to_flush(chan), OP_EQ, 1); + /* Circuit should remain active because there is a second cell queued. */ + tt_int_op(circuitmux_is_circuit_active(chan->cmux, TO_CIRCUIT(circ)), + OP_EQ, 1); + /* Should still be attached. */ + tt_int_op(circuitmux_is_circuit_attached(chan->cmux, TO_CIRCUIT(circ)), + OP_EQ, 1); + tt_u64_op(chan->n_cells_xmitted, OP_EQ, 1); + tt_u64_op(chan->n_bytes_xmitted, OP_EQ, get_cell_network_size(0)); + + /* Flush second cell. This is going to go down the call stack. */ + old_count = test_cells_written; + flushed = channel_flush_some_cells(chan, 1); + tt_i64_op(flushed, OP_EQ, 1); tt_int_op(test_cells_written, OP_EQ, old_count + 1); - /* But we should have gotten to the fake cellgen loop */ - tt_int_op(test_cmux_cells, OP_EQ, 0); - /* ...and we should have a queued cell */ - q_len_after = chan_cell_queue_len(&(ch->outgoing_queue)); - tt_int_op(q_len_after, OP_EQ, q_len_before + 1); - - /* Now accept cells again and drain the queue */ - test_chan_accept_cells = 1; - channel_flush_cells(ch); - tt_int_op(test_cells_written, OP_EQ, old_count + 2); - tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), OP_EQ, 0); - - test_target_cmux = NULL; - test_cmux_cells = 0; + tt_int_op(circuitmux_num_cells(chan->cmux), OP_EQ, 0); + tt_int_op(channel_more_to_flush(chan), OP_EQ, 0); + /* No more cells should make the circuit inactive. */ + tt_int_op(circuitmux_is_circuit_active(chan->cmux, TO_CIRCUIT(circ)), + OP_EQ, 0); + /* Should still be attached. */ + tt_int_op(circuitmux_is_circuit_attached(chan->cmux, TO_CIRCUIT(circ)), + OP_EQ, 1); + tt_u64_op(chan->n_cells_xmitted, OP_EQ, 2); + tt_u64_op(chan->n_bytes_xmitted, OP_EQ, get_cell_network_size(0) * 2); done: - if (ch) - circuitmux_free(ch->cmux); - tor_free(ch); - - UNMOCK(channel_flush_from_first_active_circuit); - UNMOCK(circuitmux_num_cells); - - test_chan_accept_cells = 0; - - return; + if (circ) { + circuit_free(TO_CIRCUIT(circ)); + } + tor_free(p_cell); + channel_free_all(); + UNMOCK(scheduler_release_channel); } +/* Test inbound cell. The callstack is: + * channel_process_cell() + * -> chan->cell_handler() + * + * This test is about checking if we can process an inbound cell down to the + * channel handler. */ static void -test_channel_incoming(void *arg) +test_channel_inbound_cell(void *arg) { - channel_t *ch = NULL; + channel_t *chan = NULL; cell_t *cell = NULL; - var_cell_t *var_cell = NULL; int old_count; - (void)arg; + (void) arg; - /* Mock these for duration of the test */ - MOCK(scheduler_channel_doesnt_want_writes, - scheduler_channel_doesnt_want_writes_mock); - MOCK(scheduler_release_channel, - scheduler_release_channel_mock); + /* The channel will be freed so we need to hijack this so the scheduler + * doesn't get confused. */ + MOCK(scheduler_release_channel, scheduler_release_channel_mock); /* Accept cells to lower layer */ test_chan_accept_cells = 1; - /* Use default overhead factor */ - test_overhead_estimate = 1.0; - ch = new_fake_channel(); - tt_assert(ch); + chan = new_fake_channel(); + tt_assert(chan); /* Start it off in OPENING */ - ch->state = CHANNEL_STATE_OPENING; - /* We'll need a cmux */ - ch->cmux = circuitmux_alloc(); - - /* Install incoming cell handlers */ - channel_set_cell_handlers(ch, - chan_test_cell_handler, - chan_test_var_cell_handler); - /* Test cell handler getters */ - tt_ptr_op(channel_get_cell_handler(ch), OP_EQ, chan_test_cell_handler); - tt_ptr_op(channel_get_var_cell_handler(ch), OP_EQ, - chan_test_var_cell_handler); + chan->state = CHANNEL_STATE_OPENING; /* Try to register it */ - channel_register(ch); - tt_assert(ch->registered); + channel_register(chan); + tt_int_op(chan->registered, OP_EQ, 1); /* Open it */ - channel_change_state_open(ch); - tt_int_op(ch->state, OP_EQ, CHANNEL_STATE_OPEN); + channel_change_state_open(chan); + tt_int_op(chan->state, OP_EQ, CHANNEL_STATE_OPEN); + tt_int_op(chan->has_been_open, OP_EQ, 1); - /* Receive a fixed cell */ - cell = tor_malloc_zero(sizeof(cell_t)); + /* Receive a cell now. */ + cell = tor_malloc_zero(sizeof(*cell)); make_fake_cell(cell); old_count = test_chan_fixed_cells_recved; - channel_queue_cell(ch, cell); - tor_free(cell); + channel_process_cell(chan, cell); + tt_int_op(test_chan_fixed_cells_recved, OP_EQ, old_count); + tt_u64_op(chan->timestamp_xfer_ms, OP_EQ, 0); + tt_u64_op(chan->timestamp_active, OP_EQ, 0); + tt_u64_op(chan->timestamp_recv, OP_EQ, 0); + + /* Setup incoming cell handlers. We don't care about var cell, the channel + * layers is not handling those. */ + channel_set_cell_handlers(chan, chan_test_cell_handler, NULL); + tt_ptr_op(chan->cell_handler, OP_EQ, chan_test_cell_handler); + /* Now process the cell, we should see it. */ + old_count = test_chan_fixed_cells_recved; + channel_process_cell(chan, cell); tt_int_op(test_chan_fixed_cells_recved, OP_EQ, old_count + 1); - - /* Receive a variable-size cell */ - var_cell = tor_malloc_zero(sizeof(var_cell_t) + CELL_PAYLOAD_SIZE); - make_fake_var_cell(var_cell); - old_count = test_chan_var_cells_recved; - channel_queue_var_cell(ch, var_cell); - tor_free(cell); - tt_int_op(test_chan_var_cells_recved, OP_EQ, old_count + 1); + /* We should have a series of timestamp set. */ + tt_u64_op(chan->timestamp_xfer_ms, OP_NE, 0); + tt_u64_op(chan->timestamp_active, OP_NE, 0); + tt_u64_op(chan->timestamp_recv, OP_NE, 0); + tt_u64_op(chan->next_padding_time_ms, OP_EQ, 0); + tt_u64_op(chan->n_cells_recved, OP_EQ, 1); + tt_u64_op(chan->n_bytes_recved, OP_EQ, get_cell_network_size(0)); /* Close it */ - channel_mark_for_close(ch); - tt_int_op(ch->state, OP_EQ, CHANNEL_STATE_CLOSING); - chan_test_finish_close(ch); - tt_int_op(ch->state, OP_EQ, CHANNEL_STATE_CLOSED); + old_count = test_close_called; + channel_mark_for_close(chan); + tt_int_op(chan->state, OP_EQ, CHANNEL_STATE_CLOSING); + tt_int_op(chan->reason_for_closing, OP_EQ, CHANNEL_CLOSE_REQUESTED); + tt_int_op(test_close_called, OP_EQ, old_count + 1); + + /* This closes the channe so it calls in the scheduler, make sure of it. */ + old_count = test_releases_count; + chan_test_finish_close(chan); + tt_int_op(test_releases_count, OP_EQ, old_count + 1); + tt_int_op(chan->state, OP_EQ, CHANNEL_STATE_CLOSED); + + /* The channel will be free, lets make sure it is not accessible. */ + uint64_t chan_id = chan->global_identifier; + tt_ptr_op(channel_find_by_global_id(chan_id), OP_EQ, chan); channel_run_cleanup(); - ch = NULL; + chan = channel_find_by_global_id(chan_id); + tt_assert(chan == NULL); done: - free_fake_channel(ch); tor_free(cell); - tor_free(var_cell); - - UNMOCK(scheduler_channel_doesnt_want_writes); UNMOCK(scheduler_release_channel); - - return; } /** @@ -861,7 +758,7 @@ static void test_channel_lifecycle(void *arg) { channel_t *ch1 = NULL, *ch2 = NULL; - cell_t *cell = NULL; + packed_cell_t *p_cell = NULL; int old_count, init_doesnt_want_writes_count; int init_releases_count; @@ -879,38 +776,29 @@ test_channel_lifecycle(void *arg) /* Accept cells to lower layer */ test_chan_accept_cells = 1; - /* Use default overhead factor */ - test_overhead_estimate = 1.0; ch1 = new_fake_channel(); tt_assert(ch1); /* Start it off in OPENING */ ch1->state = CHANNEL_STATE_OPENING; - /* We'll need a cmux */ - ch1->cmux = circuitmux_alloc(); /* Try to register it */ channel_register(ch1); tt_assert(ch1->registered); /* Try to write a cell through (should queue) */ - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); + p_cell = packed_cell_new(); old_count = test_cells_written; - channel_write_cell(ch1, cell); + channel_write_packed_cell(ch1, p_cell); tt_int_op(old_count, OP_EQ, test_cells_written); /* Move it to OPEN and flush */ channel_change_state_open(ch1); - /* Queue should drain */ - tt_int_op(old_count + 1, OP_EQ, test_cells_written); - - /* Get another one */ +/* Get another one */ ch2 = new_fake_channel(); tt_assert(ch2); ch2->state = CHANNEL_STATE_OPENING; - ch2->cmux = circuitmux_alloc(); /* Register */ channel_register(ch2); @@ -960,8 +848,6 @@ test_channel_lifecycle(void *arg) UNMOCK(scheduler_channel_doesnt_want_writes); UNMOCK(scheduler_release_channel); - - return; } /** @@ -988,15 +874,11 @@ test_channel_lifecycle_2(void *arg) /* Accept cells to lower layer */ test_chan_accept_cells = 1; - /* Use default overhead factor */ - test_overhead_estimate = 1.0; ch = new_fake_channel(); tt_assert(ch); /* Start it off in OPENING */ ch->state = CHANNEL_STATE_OPENING; - /* The full lifecycle test needs a cmux */ - ch->cmux = circuitmux_alloc(); /* Try to register it */ channel_register(ch); @@ -1016,7 +898,6 @@ test_channel_lifecycle_2(void *arg) ch = new_fake_channel(); tt_assert(ch); ch->state = CHANNEL_STATE_OPENING; - ch->cmux = circuitmux_alloc(); channel_register(ch); tt_assert(ch->registered); @@ -1035,7 +916,6 @@ test_channel_lifecycle_2(void *arg) ch = new_fake_channel(); tt_assert(ch); ch->state = CHANNEL_STATE_OPENING; - ch->cmux = circuitmux_alloc(); channel_register(ch); tt_assert(ch->registered); @@ -1064,7 +944,6 @@ test_channel_lifecycle_2(void *arg) ch = new_fake_channel(); tt_assert(ch); ch->state = CHANNEL_STATE_OPENING; - ch->cmux = circuitmux_alloc(); channel_register(ch); tt_assert(ch->registered); @@ -1090,7 +969,6 @@ test_channel_lifecycle_2(void *arg) ch = new_fake_channel(); tt_assert(ch); ch->state = CHANNEL_STATE_OPENING; - ch->cmux = circuitmux_alloc(); channel_register(ch); tt_assert(ch->registered); @@ -1125,655 +1003,6 @@ test_channel_lifecycle_2(void *arg) } static void -test_channel_multi(void *arg) -{ - channel_t *ch1 = NULL, *ch2 = NULL; - uint64_t global_queue_estimate; - cell_t *cell = NULL; - - (void)arg; - - /* Accept cells to lower layer */ - test_chan_accept_cells = 1; - /* Use default overhead factor */ - test_overhead_estimate = 1.0; - - ch1 = new_fake_channel(); - tt_assert(ch1); - ch2 = new_fake_channel(); - tt_assert(ch2); - - /* Initial queue size update */ - channel_update_xmit_queue_size(ch1); - tt_u64_op(ch1->bytes_queued_for_xmit, OP_EQ, 0); - channel_update_xmit_queue_size(ch2); - tt_u64_op(ch2->bytes_queued_for_xmit, OP_EQ, 0); - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 0); - - /* Queue some cells, check queue estimates */ - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - channel_write_cell(ch1, cell); - - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - channel_write_cell(ch2, cell); - - channel_update_xmit_queue_size(ch1); - channel_update_xmit_queue_size(ch2); - tt_u64_op(ch1->bytes_queued_for_xmit, OP_EQ, 0); - tt_u64_op(ch2->bytes_queued_for_xmit, OP_EQ, 0); - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 0); - - /* Stop accepting cells at lower layer */ - test_chan_accept_cells = 0; - - /* Queue some cells and check queue estimates */ - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - channel_write_cell(ch1, cell); - - channel_update_xmit_queue_size(ch1); - tt_u64_op(ch1->bytes_queued_for_xmit, OP_EQ, 512); - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 512); - - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - channel_write_cell(ch2, cell); - - channel_update_xmit_queue_size(ch2); - tt_u64_op(ch2->bytes_queued_for_xmit, OP_EQ, 512); - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 1024); - - /* Allow cells through again */ - test_chan_accept_cells = 1; - - /* Flush chan 2 */ - channel_flush_cells(ch2); - - /* Update and check queue sizes */ - channel_update_xmit_queue_size(ch1); - channel_update_xmit_queue_size(ch2); - tt_u64_op(ch1->bytes_queued_for_xmit, OP_EQ, 512); - tt_u64_op(ch2->bytes_queued_for_xmit, OP_EQ, 0); - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 512); - - /* Flush chan 1 */ - channel_flush_cells(ch1); - - /* Update and check queue sizes */ - channel_update_xmit_queue_size(ch1); - channel_update_xmit_queue_size(ch2); - tt_u64_op(ch1->bytes_queued_for_xmit, OP_EQ, 0); - tt_u64_op(ch2->bytes_queued_for_xmit, OP_EQ, 0); - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 0); - - /* Now block again */ - test_chan_accept_cells = 0; - - /* Queue some cells */ - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - channel_write_cell(ch1, cell); - - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - channel_write_cell(ch2, cell); - cell = NULL; - - /* Check the estimates */ - channel_update_xmit_queue_size(ch1); - channel_update_xmit_queue_size(ch2); - tt_u64_op(ch1->bytes_queued_for_xmit, OP_EQ, 512); - tt_u64_op(ch2->bytes_queued_for_xmit, OP_EQ, 512); - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 1024); - - /* Now close channel 2; it should be subtracted from the global queue */ - MOCK(scheduler_release_channel, scheduler_release_channel_mock); - channel_mark_for_close(ch2); - UNMOCK(scheduler_release_channel); - - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 512); - - /* - * Since the fake channels aren't registered, channel_free_all() can't - * see them properly. - */ - MOCK(scheduler_release_channel, scheduler_release_channel_mock); - channel_mark_for_close(ch1); - UNMOCK(scheduler_release_channel); - - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 0); - - /* Now free everything */ - MOCK(scheduler_release_channel, scheduler_release_channel_mock); - channel_free_all(); - UNMOCK(scheduler_release_channel); - - done: - free_fake_channel(ch1); - free_fake_channel(ch2); - - return; -} - -/** - * Check some hopefully-impossible edge cases in the channel queue we - * can only trigger by doing evil things to the queue directly. - */ - -static void -test_channel_queue_impossible(void *arg) -{ - channel_t *ch = NULL; - cell_t *cell = NULL; - packed_cell_t *packed_cell = NULL; - var_cell_t *var_cell = NULL; - int old_count; - cell_queue_entry_t *q = NULL; - uint64_t global_queue_estimate; - uintptr_t cellintptr; - - /* Cache the global queue size (see below) */ - global_queue_estimate = channel_get_global_queue_estimate(); - - (void)arg; - - ch = new_fake_channel(); - tt_assert(ch); - - /* We test queueing here; tell it not to accept cells */ - test_chan_accept_cells = 0; - /* ...and keep it from trying to flush the queue */ - ch->state = CHANNEL_STATE_MAINT; - - /* Cache the cell written count */ - old_count = test_cells_written; - - /* Assert that the queue is initially empty */ - tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), OP_EQ, 0); - - /* Get a fresh cell and write it to the channel*/ - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - cellintptr = (uintptr_t)(void*)cell; - channel_write_cell(ch, cell); - - /* Now it should be queued */ - tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), OP_EQ, 1); - q = TOR_SIMPLEQ_FIRST(&(ch->outgoing_queue)); - tt_assert(q); - if (q) { - tt_int_op(q->type, OP_EQ, CELL_QUEUE_FIXED); - tt_assert((uintptr_t)q->u.fixed.cell == cellintptr); - } - /* Do perverse things to it */ - tor_free(q->u.fixed.cell); - q->u.fixed.cell = NULL; - - /* - * Now change back to open with channel_change_state() and assert that it - * gets thrown away properly. - */ - test_chan_accept_cells = 1; - channel_change_state_open(ch); - tt_assert(test_cells_written == old_count); - tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), OP_EQ, 0); - - /* Same thing but for a var_cell */ - - test_chan_accept_cells = 0; - ch->state = CHANNEL_STATE_MAINT; - var_cell = tor_malloc_zero(sizeof(var_cell_t) + CELL_PAYLOAD_SIZE); - make_fake_var_cell(var_cell); - cellintptr = (uintptr_t)(void*)var_cell; - channel_write_var_cell(ch, var_cell); - - /* Check that it's queued */ - tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), OP_EQ, 1); - q = TOR_SIMPLEQ_FIRST(&(ch->outgoing_queue)); - tt_assert(q); - if (q) { - tt_int_op(q->type, OP_EQ, CELL_QUEUE_VAR); - tt_assert((uintptr_t)q->u.var.var_cell == cellintptr); - } - - /* Remove the cell from the queue entry */ - tor_free(q->u.var.var_cell); - q->u.var.var_cell = NULL; - - /* Let it drain and check that the bad entry is discarded */ - test_chan_accept_cells = 1; - channel_change_state_open(ch); - tt_assert(test_cells_written == old_count); - tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), OP_EQ, 0); - - /* Same thing with a packed_cell */ - - test_chan_accept_cells = 0; - ch->state = CHANNEL_STATE_MAINT; - packed_cell = packed_cell_new(); - tt_assert(packed_cell); - cellintptr = (uintptr_t)(void*)packed_cell; - channel_write_packed_cell(ch, packed_cell); - - /* Check that it's queued */ - tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), OP_EQ, 1); - q = TOR_SIMPLEQ_FIRST(&(ch->outgoing_queue)); - tt_assert(q); - if (q) { - tt_int_op(q->type, OP_EQ, CELL_QUEUE_PACKED); - tt_assert((uintptr_t)q->u.packed.packed_cell == cellintptr); - } - - /* Remove the cell from the queue entry */ - packed_cell_free(q->u.packed.packed_cell); - q->u.packed.packed_cell = NULL; - - /* Let it drain and check that the bad entry is discarded */ - test_chan_accept_cells = 1; - channel_change_state_open(ch); - tt_assert(test_cells_written == old_count); - tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), OP_EQ, 0); - - /* Unknown cell type case */ - test_chan_accept_cells = 0; - ch->state = CHANNEL_STATE_MAINT; - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - cellintptr = (uintptr_t)(void*)cell; - channel_write_cell(ch, cell); - - /* Check that it's queued */ - tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), OP_EQ, 1); - q = TOR_SIMPLEQ_FIRST(&(ch->outgoing_queue)); - tt_assert(q); - if (q) { - tt_int_op(q->type, OP_EQ, CELL_QUEUE_FIXED); - tt_assert((uintptr_t)q->u.fixed.cell == cellintptr); - } - /* Clobber it, including the queue entry type */ - tor_free(q->u.fixed.cell); - q->u.fixed.cell = NULL; - q->type = CELL_QUEUE_PACKED + 1; - - /* Let it drain and check that the bad entry is discarded */ - test_chan_accept_cells = 1; - tor_capture_bugs_(1); - channel_change_state_open(ch); - tt_assert(test_cells_written == old_count); - tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), OP_EQ, 0); - - tt_int_op(smartlist_len(tor_get_captured_bug_log_()), OP_EQ, 1); - tor_end_capture_bugs_(); - - done: - free_fake_channel(ch); - - /* - * Doing that meant that we couldn't correctly adjust the queue size - * for the var cell, so manually reset the global queue size estimate - * so the next test doesn't break if we run with --no-fork. - */ - estimated_total_queue_size = global_queue_estimate; - - return; -} - -static void -test_channel_queue_incoming(void *arg) -{ - channel_t *ch = NULL; - cell_t *cell = NULL; - var_cell_t *var_cell = NULL; - int old_fixed_count, old_var_count; - - (void)arg; - - /* Mock these for duration of the test */ - MOCK(scheduler_channel_doesnt_want_writes, - scheduler_channel_doesnt_want_writes_mock); - MOCK(scheduler_release_channel, - scheduler_release_channel_mock); - - /* Accept cells to lower layer */ - test_chan_accept_cells = 1; - /* Use default overhead factor */ - test_overhead_estimate = 1.0; - - ch = new_fake_channel(); - tt_assert(ch); - /* Start it off in OPENING */ - ch->state = CHANNEL_STATE_OPENING; - /* We'll need a cmux */ - ch->cmux = circuitmux_alloc(); - - /* Test cell handler getters */ - tt_ptr_op(channel_get_cell_handler(ch), OP_EQ, NULL); - tt_ptr_op(channel_get_var_cell_handler(ch), OP_EQ, NULL); - - /* Try to register it */ - channel_register(ch); - tt_assert(ch->registered); - - /* Open it */ - channel_change_state_open(ch); - tt_int_op(ch->state, OP_EQ, CHANNEL_STATE_OPEN); - - /* Assert that the incoming queue is empty */ - tt_assert(TOR_SIMPLEQ_EMPTY(&(ch->incoming_queue))); - - /* Queue an incoming fixed-length cell */ - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - channel_queue_cell(ch, cell); - - /* Assert that the incoming queue has one entry */ - tt_int_op(chan_cell_queue_len(&(ch->incoming_queue)), OP_EQ, 1); - - /* Queue an incoming var cell */ - var_cell = tor_malloc_zero(sizeof(var_cell_t) + CELL_PAYLOAD_SIZE); - make_fake_var_cell(var_cell); - channel_queue_var_cell(ch, var_cell); - - /* Assert that the incoming queue has two entries */ - tt_int_op(chan_cell_queue_len(&(ch->incoming_queue)), OP_EQ, 2); - - /* - * Install cell handlers; this will drain the queue, so save the old - * cell counters first - */ - old_fixed_count = test_chan_fixed_cells_recved; - old_var_count = test_chan_var_cells_recved; - channel_set_cell_handlers(ch, - chan_test_cell_handler, - chan_test_var_cell_handler); - tt_ptr_op(channel_get_cell_handler(ch), OP_EQ, chan_test_cell_handler); - tt_ptr_op(channel_get_var_cell_handler(ch), OP_EQ, - chan_test_var_cell_handler); - - /* Assert cells were received */ - tt_int_op(test_chan_fixed_cells_recved, OP_EQ, old_fixed_count + 1); - tt_int_op(test_chan_var_cells_recved, OP_EQ, old_var_count + 1); - - /* - * Assert that the pointers are different from the cells we allocated; - * when queueing cells with no incoming cell handlers installed, the - * channel layer should copy them to a new buffer, and free them after - * delivery. These pointers will have already been freed by the time - * we get here, so don't dereference them. - */ - tt_ptr_op(test_chan_last_seen_fixed_cell_ptr, OP_NE, cell); - tt_ptr_op(test_chan_last_seen_var_cell_ptr, OP_NE, var_cell); - - /* Assert queue is now empty */ - tt_assert(TOR_SIMPLEQ_EMPTY(&(ch->incoming_queue))); - - /* Close it; this contains an assertion that the incoming queue is empty */ - channel_mark_for_close(ch); - tt_int_op(ch->state, OP_EQ, CHANNEL_STATE_CLOSING); - chan_test_finish_close(ch); - tt_int_op(ch->state, OP_EQ, CHANNEL_STATE_CLOSED); - channel_run_cleanup(); - ch = NULL; - - done: - free_fake_channel(ch); - tor_free(cell); - tor_free(var_cell); - - UNMOCK(scheduler_channel_doesnt_want_writes); - UNMOCK(scheduler_release_channel); - - return; -} - -static void -test_channel_queue_size(void *arg) -{ - channel_t *ch = NULL; - cell_t *cell = NULL; - int n, old_count; - uint64_t global_queue_estimate; - - (void)arg; - - ch = new_fake_channel(); - tt_assert(ch); - - /* Initial queue size update */ - channel_update_xmit_queue_size(ch); - tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 0); - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 0); - - /* Test the call-through to our fake lower layer */ - n = channel_num_cells_writeable(ch); - /* chan_test_num_cells_writeable() always returns 32 */ - tt_int_op(n, OP_EQ, 32); - - /* - * Now we queue some cells and check that channel_num_cells_writeable() - * adjusts properly - */ - - /* tell it not to accept cells */ - test_chan_accept_cells = 0; - /* ...and keep it from trying to flush the queue */ - ch->state = CHANNEL_STATE_MAINT; - - /* Get a fresh cell */ - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - - old_count = test_cells_written; - channel_write_cell(ch, cell); - /* Assert that it got queued, not written through, correctly */ - tt_int_op(test_cells_written, OP_EQ, old_count); - - /* Now check chan_test_num_cells_writeable() again */ - n = channel_num_cells_writeable(ch); - /* Should return 0 since we're in CHANNEL_STATE_MAINT */ - tt_int_op(n, OP_EQ, 0); - - /* Update queue size estimates */ - channel_update_xmit_queue_size(ch); - /* One cell, times an overhead factor of 1.0 */ - tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 512); - /* Try a different overhead factor */ - test_overhead_estimate = 0.5; - /* This one should be ignored since it's below 1.0 */ - channel_update_xmit_queue_size(ch); - tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 512); - /* Now try a larger one */ - test_overhead_estimate = 2.0; - channel_update_xmit_queue_size(ch); - tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 1024); - /* Go back to 1.0 */ - test_overhead_estimate = 1.0; - channel_update_xmit_queue_size(ch); - tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 512); - /* Check the global estimate too */ - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 512); - - /* Go to open */ - old_count = test_cells_written; - channel_change_state_open(ch); - - /* - * It should try to write, but we aren't accepting cells right now, so - * it'll requeue - */ - tt_int_op(test_cells_written, OP_EQ, old_count); - - /* Check the queue size again */ - channel_update_xmit_queue_size(ch); - tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 512); - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 512); - - /* - * Now the cell is in the queue, and we're open, so we should get 31 - * writeable cells. - */ - n = channel_num_cells_writeable(ch); - tt_int_op(n, OP_EQ, 31); - - /* Accept cells again */ - test_chan_accept_cells = 1; - /* ...and re-process the queue */ - old_count = test_cells_written; - channel_flush_cells(ch); - tt_int_op(test_cells_written, OP_EQ, old_count + 1); - - /* Should have 32 writeable now */ - n = channel_num_cells_writeable(ch); - tt_int_op(n, OP_EQ, 32); - - /* Should have queue size estimate of zero */ - channel_update_xmit_queue_size(ch); - tt_u64_op(ch->bytes_queued_for_xmit, OP_EQ, 0); - global_queue_estimate = channel_get_global_queue_estimate(); - tt_u64_op(global_queue_estimate, OP_EQ, 0); - - /* Okay, now we're done with this one */ - MOCK(scheduler_release_channel, scheduler_release_channel_mock); - channel_mark_for_close(ch); - UNMOCK(scheduler_release_channel); - - done: - free_fake_channel(ch); - - return; -} - -static void -test_channel_write(void *arg) -{ - channel_t *ch = NULL; - cell_t *cell = tor_malloc_zero(sizeof(cell_t)); - packed_cell_t *packed_cell = NULL; - var_cell_t *var_cell = - tor_malloc_zero(sizeof(var_cell_t) + CELL_PAYLOAD_SIZE); - int old_count; - - (void)arg; - - packed_cell = packed_cell_new(); - tt_assert(packed_cell); - - ch = new_fake_channel(); - tt_assert(ch); - make_fake_cell(cell); - make_fake_var_cell(var_cell); - - /* Tell it to accept cells */ - test_chan_accept_cells = 1; - - old_count = test_cells_written; - channel_write_cell(ch, cell); - cell = NULL; - tt_assert(test_cells_written == old_count + 1); - - channel_write_var_cell(ch, var_cell); - var_cell = NULL; - tt_assert(test_cells_written == old_count + 2); - - channel_write_packed_cell(ch, packed_cell); - packed_cell = NULL; - tt_assert(test_cells_written == old_count + 3); - - /* Now we test queueing; tell it not to accept cells */ - test_chan_accept_cells = 0; - /* ...and keep it from trying to flush the queue */ - ch->state = CHANNEL_STATE_MAINT; - - /* Get a fresh cell */ - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - - old_count = test_cells_written; - channel_write_cell(ch, cell); - tt_assert(test_cells_written == old_count); - - /* - * Now change back to open with channel_change_state() and assert that it - * gets drained from the queue. - */ - test_chan_accept_cells = 1; - channel_change_state_open(ch); - tt_assert(test_cells_written == old_count + 1); - - /* - * Check the note destroy case - */ - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - cell->command = CELL_DESTROY; - - /* Set up the mock */ - MOCK(channel_note_destroy_not_pending, - channel_note_destroy_not_pending_mock); - - old_count = test_destroy_not_pending_calls; - channel_write_cell(ch, cell); - tt_assert(test_destroy_not_pending_calls == old_count + 1); - - /* Now send a non-destroy and check we don't call it */ - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - channel_write_cell(ch, cell); - tt_assert(test_destroy_not_pending_calls == old_count + 1); - - UNMOCK(channel_note_destroy_not_pending); - - /* - * Now switch it to CLOSING so we can test the discard-cells case - * in the channel_write_*() functions. - */ - MOCK(scheduler_release_channel, scheduler_release_channel_mock); - channel_mark_for_close(ch); - UNMOCK(scheduler_release_channel); - - /* Send cells that will drop in the closing state */ - old_count = test_cells_written; - - cell = tor_malloc_zero(sizeof(cell_t)); - make_fake_cell(cell); - channel_write_cell(ch, cell); - cell = NULL; - tt_assert(test_cells_written == old_count); - - var_cell = tor_malloc_zero(sizeof(var_cell_t) + CELL_PAYLOAD_SIZE); - make_fake_var_cell(var_cell); - channel_write_var_cell(ch, var_cell); - var_cell = NULL; - tt_assert(test_cells_written == old_count); - - packed_cell = packed_cell_new(); - channel_write_packed_cell(ch, packed_cell); - packed_cell = NULL; - tt_assert(test_cells_written == old_count); - - done: - free_fake_channel(ch); - tor_free(var_cell); - tor_free(cell); - packed_cell_free(packed_cell); - return; -} - -static void test_channel_id_map(void *arg) { (void)arg; @@ -1879,19 +1108,449 @@ test_channel_id_map(void *arg) #undef N_CHAN } +static void +test_channel_state(void *arg) +{ + (void) arg; + + /* Test state validity. */ + tt_int_op(channel_state_is_valid(CHANNEL_STATE_CLOSED), OP_EQ, 1); + tt_int_op(channel_state_is_valid(CHANNEL_STATE_CLOSING), OP_EQ, 1); + tt_int_op(channel_state_is_valid(CHANNEL_STATE_ERROR), OP_EQ, 1); + tt_int_op(channel_state_is_valid(CHANNEL_STATE_OPEN), OP_EQ, 1); + tt_int_op(channel_state_is_valid(CHANNEL_STATE_OPENING), OP_EQ, 1); + tt_int_op(channel_state_is_valid(CHANNEL_STATE_MAINT), OP_EQ, 1); + tt_int_op(channel_state_is_valid(CHANNEL_STATE_LAST), OP_EQ, 0); + tt_int_op(channel_state_is_valid(INT_MAX), OP_EQ, 0); + + /* Test listener state validity. */ + tt_int_op(channel_listener_state_is_valid(CHANNEL_LISTENER_STATE_CLOSED), + OP_EQ, 1); + tt_int_op(channel_listener_state_is_valid(CHANNEL_LISTENER_STATE_LISTENING), + OP_EQ, 1); + tt_int_op(channel_listener_state_is_valid(CHANNEL_LISTENER_STATE_CLOSING), + OP_EQ, 1); + tt_int_op(channel_listener_state_is_valid(CHANNEL_LISTENER_STATE_ERROR), + OP_EQ, 1); + tt_int_op(channel_listener_state_is_valid(CHANNEL_LISTENER_STATE_LAST), + OP_EQ, 0); + tt_int_op(channel_listener_state_is_valid(INT_MAX), OP_EQ, 0); + + /* Test state transition. */ + tt_int_op(channel_state_can_transition(CHANNEL_STATE_CLOSED, + CHANNEL_STATE_OPENING), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_CLOSED, + CHANNEL_STATE_ERROR), OP_EQ, 0); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_CLOSING, + CHANNEL_STATE_ERROR), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_CLOSING, + CHANNEL_STATE_CLOSED), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_CLOSING, + CHANNEL_STATE_OPEN), OP_EQ, 0); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_MAINT, + CHANNEL_STATE_CLOSING), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_MAINT, + CHANNEL_STATE_ERROR), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_MAINT, + CHANNEL_STATE_OPEN), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_MAINT, + CHANNEL_STATE_OPENING), OP_EQ, 0); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_OPENING, + CHANNEL_STATE_OPEN), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_OPENING, + CHANNEL_STATE_CLOSING), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_OPENING, + CHANNEL_STATE_ERROR), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_OPEN, + CHANNEL_STATE_ERROR), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_OPEN, + CHANNEL_STATE_CLOSING), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_OPEN, + CHANNEL_STATE_ERROR), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_OPEN, + CHANNEL_STATE_MAINT), OP_EQ, 1); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_LAST, + CHANNEL_STATE_MAINT), OP_EQ, 0); + tt_int_op(channel_state_can_transition(CHANNEL_STATE_LAST, INT_MAX), + OP_EQ, 0); + + /* Test listener state transition. */ + tt_int_op(channel_listener_state_can_transition( + CHANNEL_LISTENER_STATE_CLOSED, + CHANNEL_LISTENER_STATE_LISTENING), + OP_EQ, 1); + tt_int_op(channel_listener_state_can_transition( + CHANNEL_LISTENER_STATE_CLOSED, + CHANNEL_LISTENER_STATE_ERROR), + OP_EQ, 0); + + tt_int_op(channel_listener_state_can_transition( + CHANNEL_LISTENER_STATE_CLOSING, + CHANNEL_LISTENER_STATE_CLOSED), + OP_EQ, 1); + + tt_int_op(channel_listener_state_can_transition( + CHANNEL_LISTENER_STATE_CLOSING, + CHANNEL_LISTENER_STATE_ERROR), + OP_EQ, 1); + tt_int_op(channel_listener_state_can_transition( + CHANNEL_LISTENER_STATE_ERROR, + CHANNEL_LISTENER_STATE_CLOSING), + OP_EQ, 0); + + tt_int_op(channel_listener_state_can_transition( + CHANNEL_LISTENER_STATE_LISTENING, + CHANNEL_LISTENER_STATE_CLOSING), + OP_EQ, 1); + tt_int_op(channel_listener_state_can_transition( + CHANNEL_LISTENER_STATE_LISTENING, + CHANNEL_LISTENER_STATE_ERROR), + OP_EQ, 1); + tt_int_op(channel_listener_state_can_transition( + CHANNEL_LISTENER_STATE_LAST, + INT_MAX), + OP_EQ, 0); + + /* Test state string. */ + tt_str_op(channel_state_to_string(CHANNEL_STATE_CLOSING), OP_EQ, + "closing"); + tt_str_op(channel_state_to_string(CHANNEL_STATE_ERROR), OP_EQ, + "channel error"); + tt_str_op(channel_state_to_string(CHANNEL_STATE_CLOSED), OP_EQ, + "closed"); + tt_str_op(channel_state_to_string(CHANNEL_STATE_OPEN), OP_EQ, + "open"); + tt_str_op(channel_state_to_string(CHANNEL_STATE_OPENING), OP_EQ, + "opening"); + tt_str_op(channel_state_to_string(CHANNEL_STATE_MAINT), OP_EQ, + "temporarily suspended for maintenance"); + tt_str_op(channel_state_to_string(CHANNEL_STATE_LAST), OP_EQ, + "unknown or invalid channel state"); + tt_str_op(channel_state_to_string(INT_MAX), OP_EQ, + "unknown or invalid channel state"); + + /* Test listener state string. */ + tt_str_op(channel_listener_state_to_string(CHANNEL_LISTENER_STATE_CLOSING), + OP_EQ, "closing"); + tt_str_op(channel_listener_state_to_string(CHANNEL_LISTENER_STATE_ERROR), + OP_EQ, "channel listener error"); + tt_str_op(channel_listener_state_to_string(CHANNEL_LISTENER_STATE_LISTENING), + OP_EQ, "listening"); + tt_str_op(channel_listener_state_to_string(CHANNEL_LISTENER_STATE_LAST), + OP_EQ, "unknown or invalid channel listener state"); + tt_str_op(channel_listener_state_to_string(INT_MAX), + OP_EQ, "unknown or invalid channel listener state"); + + done: + ; +} + +static networkstatus_t *mock_ns = NULL; + +static networkstatus_t * +mock_networkstatus_get_latest_consensus(void) +{ + return mock_ns; +} + +static void +test_channel_duplicates(void *arg) +{ + channel_t *chan = NULL; + routerstatus_t rs; + + (void) arg; + + setup_full_capture_of_logs(LOG_INFO); + /* Try a flat call with channel nor connections. */ + channel_check_for_duplicates(); + expect_log_msg_containing( + "Found 0 connections to 0 relays. Found 0 current canonical " + "connections, in 0 of which we were a non-canonical peer. " + "0 relays had more than 1 connection, 0 had more than 2, and " + "0 had more than 4 connections."); + + mock_ns = tor_malloc_zero(sizeof(*mock_ns)); + mock_ns->routerstatus_list = smartlist_new(); + MOCK(networkstatus_get_latest_consensus, + mock_networkstatus_get_latest_consensus); + + chan = new_fake_channel(); + tt_assert(chan); + chan->is_canonical = test_chan_is_canonical; + memset(chan->identity_digest, 'A', sizeof(chan->identity_digest)); + channel_add_to_digest_map(chan); + tt_ptr_op(channel_find_by_remote_identity(chan->identity_digest, NULL), + OP_EQ, chan); + + /* No relay has been associated with this channel. */ + channel_check_for_duplicates(); + expect_log_msg_containing( + "Found 0 connections to 0 relays. Found 0 current canonical " + "connections, in 0 of which we were a non-canonical peer. " + "0 relays had more than 1 connection, 0 had more than 2, and " + "0 had more than 4 connections."); + + /* Associate relay to this connection in the consensus. */ + memset(&rs, 0, sizeof(rs)); + memset(rs.identity_digest, 'A', sizeof(rs.identity_digest)); + smartlist_add(mock_ns->routerstatus_list, &rs); + + /* Non opened channel. */ + chan->state = CHANNEL_STATE_CLOSING; + channel_check_for_duplicates(); + expect_log_msg_containing( + "Found 0 connections to 0 relays. Found 0 current canonical " + "connections, in 0 of which we were a non-canonical peer. " + "0 relays had more than 1 connection, 0 had more than 2, and " + "0 had more than 4 connections."); + chan->state = CHANNEL_STATE_OPEN; + + channel_check_for_duplicates(); + expect_log_msg_containing( + "Found 1 connections to 1 relays. Found 0 current canonical " + "connections, in 0 of which we were a non-canonical peer. " + "0 relays had more than 1 connection, 0 had more than 2, and " + "0 had more than 4 connections."); + + test_chan_should_be_canonical = 1; + channel_check_for_duplicates(); + expect_log_msg_containing( + "Found 1 connections to 1 relays. Found 1 current canonical " + "connections, in 1 of which we were a non-canonical peer. " + "0 relays had more than 1 connection, 0 had more than 2, and " + "0 had more than 4 connections."); + teardown_capture_of_logs(); + + done: + free_fake_channel(chan); + smartlist_clear(mock_ns->routerstatus_list); + networkstatus_vote_free(mock_ns); + UNMOCK(networkstatus_get_latest_consensus); +} + +static void +test_channel_for_extend(void *arg) +{ + channel_t *chan1 = NULL, *chan2 = NULL; + channel_t *ret_chan = NULL; + char digest[DIGEST_LEN]; + ed25519_public_key_t ed_id; + tor_addr_t addr; + const char *msg; + int launch; + time_t now = time(NULL); + + (void) arg; + + memset(digest, 'A', sizeof(digest)); + memset(&ed_id, 'B', sizeof(ed_id)); + + chan1 = new_fake_channel(); + tt_assert(chan1); + /* Need to be registered to get added to the id map. */ + channel_register(chan1); + tt_int_op(chan1->registered, OP_EQ, 1); + /* We need those for the test. */ + chan1->is_canonical = test_chan_is_canonical; + chan1->matches_target = test_chan_matches_target; + chan1->timestamp_created = now - 9; + + chan2 = new_fake_channel(); + tt_assert(chan2); + /* Need to be registered to get added to the id map. */ + channel_register(chan2); + tt_int_op(chan2->registered, OP_EQ, 1); + /* We need those for the test. */ + chan2->is_canonical = test_chan_is_canonical; + chan2->matches_target = test_chan_matches_target; + /* Make it older than chan1. */ + chan2->timestamp_created = chan1->timestamp_created - 1; + + /* Set channel identities and add it to the channel map. The last one to be + * added is made the first one in the list so the lookup will always return + * that one first. */ + channel_set_identity_digest(chan2, digest, &ed_id); + channel_set_identity_digest(chan1, digest, &ed_id); + tt_ptr_op(channel_find_by_remote_identity(digest, NULL), OP_EQ, chan1); + tt_ptr_op(channel_find_by_remote_identity(digest, &ed_id), OP_EQ, chan1); + + /* The expected result is chan2 because it is older than chan1. */ + ret_chan = channel_get_for_extend(digest, &ed_id, &addr, &msg, &launch); + tt_assert(ret_chan); + tt_ptr_op(ret_chan, OP_EQ, chan2); + tt_int_op(launch, OP_EQ, 0); + tt_str_op(msg, OP_EQ, "Connection is fine; using it."); + + /* Switch that around from previous test. */ + chan2->timestamp_created = chan1->timestamp_created + 1; + ret_chan = channel_get_for_extend(digest, &ed_id, &addr, &msg, &launch); + tt_assert(ret_chan); + tt_ptr_op(ret_chan, OP_EQ, chan1); + tt_int_op(launch, OP_EQ, 0); + tt_str_op(msg, OP_EQ, "Connection is fine; using it."); + + /* Same creation time, num circuits will be used and they both have 0 so the + * channel 2 should be picked due to how channel_is_better() work. */ + chan2->timestamp_created = chan1->timestamp_created; + ret_chan = channel_get_for_extend(digest, &ed_id, &addr, &msg, &launch); + tt_assert(ret_chan); + tt_ptr_op(ret_chan, OP_EQ, chan1); + tt_int_op(launch, OP_EQ, 0); + tt_str_op(msg, OP_EQ, "Connection is fine; using it."); + + /* For the rest of the tests, we need channel 1 to be the older. */ + chan2->timestamp_created = chan1->timestamp_created + 1; + + /* Condemned the older channel. */ + chan1->state = CHANNEL_STATE_CLOSING; + ret_chan = channel_get_for_extend(digest, &ed_id, &addr, &msg, &launch); + tt_assert(ret_chan); + tt_ptr_op(ret_chan, OP_EQ, chan2); + tt_int_op(launch, OP_EQ, 0); + tt_str_op(msg, OP_EQ, "Connection is fine; using it."); + chan1->state = CHANNEL_STATE_OPEN; + + /* Make the older channel a client one. */ + channel_mark_client(chan1); + ret_chan = channel_get_for_extend(digest, &ed_id, &addr, &msg, &launch); + tt_assert(ret_chan); + tt_ptr_op(ret_chan, OP_EQ, chan2); + tt_int_op(launch, OP_EQ, 0); + tt_str_op(msg, OP_EQ, "Connection is fine; using it."); + channel_clear_client(chan1); + + /* Non matching ed identity with valid digest. */ + ed25519_public_key_t dumb_ed_id; + memset(&dumb_ed_id, 0, sizeof(dumb_ed_id)); + ret_chan = channel_get_for_extend(digest, &dumb_ed_id, &addr, &msg, + &launch); + tt_assert(!ret_chan); + tt_str_op(msg, OP_EQ, "Not connected. Connecting."); + tt_int_op(launch, OP_EQ, 1); + + /* Opening channel, we'll check if the target address matches. */ + test_chan_should_match_target = 1; + chan1->state = CHANNEL_STATE_OPENING; + chan2->state = CHANNEL_STATE_OPENING; + ret_chan = channel_get_for_extend(digest, &ed_id, &addr, &msg, &launch); + tt_assert(!ret_chan); + tt_str_op(msg, OP_EQ, "Connection in progress; waiting."); + tt_int_op(launch, OP_EQ, 0); + chan1->state = CHANNEL_STATE_OPEN; + chan2->state = CHANNEL_STATE_OPEN; + + /* Mark channel 1 as bad for circuits. */ + channel_mark_bad_for_new_circs(chan1); + ret_chan = channel_get_for_extend(digest, &ed_id, &addr, &msg, &launch); + tt_assert(ret_chan); + tt_ptr_op(ret_chan, OP_EQ, chan2); + tt_int_op(launch, OP_EQ, 0); + tt_str_op(msg, OP_EQ, "Connection is fine; using it."); + chan1->is_bad_for_new_circs = 0; + + /* Mark both channels as unusable. */ + channel_mark_bad_for_new_circs(chan1); + channel_mark_bad_for_new_circs(chan2); + ret_chan = channel_get_for_extend(digest, &ed_id, &addr, &msg, &launch); + tt_assert(!ret_chan); + tt_str_op(msg, OP_EQ, "Connections all too old, or too non-canonical. " + " Launching a new one."); + tt_int_op(launch, OP_EQ, 1); + chan1->is_bad_for_new_circs = 0; + chan2->is_bad_for_new_circs = 0; + + /* Non canonical channels. */ + test_chan_should_match_target = 0; + test_chan_canonical_should_be_reliable = 1; + ret_chan = channel_get_for_extend(digest, &ed_id, &addr, &msg, &launch); + tt_assert(!ret_chan); + tt_str_op(msg, OP_EQ, "Connections all too old, or too non-canonical. " + " Launching a new one."); + tt_int_op(launch, OP_EQ, 1); + + done: + free_fake_channel(chan1); + free_fake_channel(chan2); +} + +static void +test_channel_listener(void *arg) +{ + int old_count; + time_t now = time(NULL); + channel_listener_t *chan = NULL; + + (void) arg; + + chan = tor_malloc_zero(sizeof(*chan)); + tt_assert(chan); + channel_init_listener(chan); + tt_u64_op(chan->global_identifier, OP_EQ, 1); + tt_int_op(chan->timestamp_created, OP_GE, now); + chan->close = test_chan_listener_close; + + /* Register it. At this point, it is not open so it will be put in the + * finished list. */ + channel_listener_register(chan); + tt_int_op(chan->registered, OP_EQ, 1); + channel_listener_unregister(chan); + + /* Register it as listening now thus active. */ + chan->state = CHANNEL_LISTENER_STATE_LISTENING; + channel_listener_register(chan); + tt_int_op(chan->registered, OP_EQ, 1); + + /* Set the listener function. */ + channel_listener_set_listener_fn(chan, test_chan_listener_fn); + tt_ptr_op(chan->listener, OP_EQ, test_chan_listener_fn); + + /* Put a channel in the listener incoming list and queue it. + * function. By doing this, the listener() handler will be called. */ + channel_t *in_chan = new_fake_channel(); + old_count = test_chan_listener_fn_called; + channel_listener_queue_incoming(chan, in_chan); + free_fake_channel(in_chan); + tt_int_op(test_chan_listener_fn_called, OP_EQ, old_count + 1); + + /* Put listener channel in CLOSING state. */ + old_count = test_chan_listener_close_fn_called; + channel_listener_mark_for_close(chan); + tt_int_op(test_chan_listener_close_fn_called, OP_EQ, old_count + 1); + channel_listener_change_state(chan, CHANNEL_LISTENER_STATE_CLOSED); + + /* Dump stats so we at least hit the code path. */ + chan->describe_transport = test_chan_listener_describe_transport; + /* There is a check for "now > timestamp_created" when dumping the stats so + * make sure we go in. */ + chan->timestamp_created = now - 10; + channel_listener_dump_statistics(chan, LOG_INFO); + + done: + channel_free_all(); +} + struct testcase_t channel_tests[] = { - { "dumpstats", test_channel_dumpstats, TT_FORK, NULL, NULL }, - { "flush", test_channel_flush, TT_FORK, NULL, NULL }, - { "flushmux", test_channel_flushmux, TT_FORK, NULL, NULL }, - { "incoming", test_channel_incoming, TT_FORK, NULL, NULL }, - { "lifecycle", test_channel_lifecycle, TT_FORK, NULL, NULL }, - { "lifecycle_2", test_channel_lifecycle_2, TT_FORK, NULL, NULL }, - { "multi", test_channel_multi, TT_FORK, NULL, NULL }, - { "queue_impossible", test_channel_queue_impossible, TT_FORK, NULL, NULL }, - { "queue_incoming", test_channel_queue_incoming, TT_FORK, NULL, NULL }, - { "queue_size", test_channel_queue_size, TT_FORK, NULL, NULL }, - { "write", test_channel_write, TT_FORK, NULL, NULL }, - { "id_map", test_channel_id_map, TT_FORK, NULL, NULL }, + { "inbound_cell", test_channel_inbound_cell, TT_FORK, + NULL, NULL }, + { "outbound_cell", test_channel_outbound_cell, TT_FORK, + NULL, NULL }, + { "id_map", test_channel_id_map, TT_FORK, + NULL, NULL }, + { "lifecycle", test_channel_lifecycle, TT_FORK, + NULL, NULL }, + { "lifecycle_2", test_channel_lifecycle_2, TT_FORK, + NULL, NULL }, + { "dumpstats", test_channel_dumpstats, TT_FORK, + NULL, NULL }, + { "state", test_channel_state, TT_FORK, + NULL, NULL }, + { "duplicates", test_channel_duplicates, TT_FORK, + NULL, NULL }, + { "get_channel_for_extend", test_channel_for_extend, TT_FORK, + NULL, NULL }, + { "listener", test_channel_listener, TT_FORK, + NULL, NULL }, END_OF_TESTCASES }; diff --git a/src/test/test_relay.c b/src/test/test_relay.c index e3489627a0..73c0ed5586 100644 --- a/src/test/test_relay.c +++ b/src/test/test_relay.c @@ -67,10 +67,6 @@ test_relay_append_cell_to_circuit_queue(void *arg) pchan = new_fake_channel(); tt_assert(pchan); - /* We'll need chans with working cmuxes */ - nchan->cmux = circuitmux_alloc(); - pchan->cmux = circuitmux_alloc(); - /* Make a fake orcirc */ orcirc = new_fake_orcirc(nchan, pchan); tt_assert(orcirc); diff --git a/src/test/test_scheduler.c b/src/test/test_scheduler.c index 63add2f382..0d8a9eaa1f 100644 --- a/src/test/test_scheduler.c +++ b/src/test/test_scheduler.c @@ -299,10 +299,6 @@ channel_more_to_flush_mock(channel_t *chan) flush_mock_channel_t *found_mock_ch = NULL; - /* Check if we have any queued */ - if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) - return 1; - SMARTLIST_FOREACH_BEGIN(chans_for_flush_mock, flush_mock_channel_t *, flush_mock_ch) { @@ -444,8 +440,6 @@ perform_channel_state_tests(int KISTSchedRunInterval, int sched_type) /* Start it off in OPENING */ ch1->state = CHANNEL_STATE_OPENING; - /* We'll need a cmux */ - ch1->cmux = circuitmux_alloc(); /* Try to register it */ channel_register(ch1); tt_assert(ch1->registered); @@ -457,7 +451,6 @@ perform_channel_state_tests(int KISTSchedRunInterval, int sched_type) ch2 = new_fake_channel(); tt_assert(ch2); ch2->state = CHANNEL_STATE_OPENING; - ch2->cmux = circuitmux_alloc(); channel_register(ch2); tt_assert(ch2->registered); @@ -656,8 +649,6 @@ test_scheduler_loop_vanilla(void *arg) /* Start it off in OPENING */ ch1->state = CHANNEL_STATE_OPENING; - /* We'll need a cmux */ - ch1->cmux = circuitmux_alloc(); /* Try to register it */ channel_register(ch1); tt_assert(ch1->registered); @@ -672,7 +663,6 @@ test_scheduler_loop_vanilla(void *arg) ch2->magic = TLS_CHAN_MAGIC; tt_assert(ch2); ch2->state = CHANNEL_STATE_OPENING; - ch2->cmux = circuitmux_alloc(); channel_register(ch2); tt_assert(ch2->registered); /* @@ -824,7 +814,6 @@ test_scheduler_loop_kist(void *arg) tt_assert(ch1); ch1->magic = TLS_CHAN_MAGIC; ch1->state = CHANNEL_STATE_OPENING; - ch1->cmux = circuitmux_alloc(); channel_register(ch1); tt_assert(ch1->registered); channel_change_state_open(ch1); @@ -835,7 +824,6 @@ test_scheduler_loop_kist(void *arg) tt_assert(ch2); ch2->magic = TLS_CHAN_MAGIC; ch2->state = CHANNEL_STATE_OPENING; - ch2->cmux = circuitmux_alloc(); channel_register(ch2); tt_assert(ch2->registered); channel_change_state_open(ch2); |