summaryrefslogtreecommitdiff
path: root/src/or/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/or/channel.c')
-rw-r--r--src/or/channel.c1680
1 files changed, 182 insertions, 1498 deletions
diff --git a/src/or/channel.c b/src/or/channel.c
index 0b5a7fde90..7fa9768171 100644
--- a/src/or/channel.c
+++ b/src/or/channel.c
@@ -1,3 +1,4 @@
+
/* * Copyright (c) 2012-2017, The Tor Project, Inc. */
/* See LICENSE for licensing information */
@@ -5,9 +6,8 @@
* \file channel.c
*
* \brief OR/OP-to-OR channel abstraction layer. A channel's job is to
- * transfer cells from Tor instance to Tor instance.
- * Currently, there is only one implementation of the channel abstraction: in
- * channeltls.c.
+ * transfer cells from Tor instance to Tor instance. Currently, there is only
+ * one implementation of the channel abstraction: in channeltls.c.
*
* Channels are a higher-level abstraction than or_connection_t: In general,
* any means that two Tor relays use to exchange cells, or any means that a
@@ -24,16 +24,28 @@
* connection.
*
* Every channel implementation is responsible for being able to transmit
- * cells that are added to it with channel_write_cell() and related functions,
- * and to receive incoming cells with the channel_queue_cell() and related
- * functions. See the channel_t documentation for more information.
- *
- * When new cells arrive on a channel, they are passed to cell handler
- * functions, which can be set by channel_set_cell_handlers()
- * functions. (Tor's cell handlers are in command.c.)
- *
- * Tor flushes cells to channels from relay.c in
- * channel_flush_from_first_active_circuit().
+ * cells that are passed to it
+ *
+ * For *inbound* cells, the entry point is: channel_process_cell(). It takes a
+ * cell and will pass it to the cell handler set by
+ * channel_set_cell_handlers(). Currently, this is passed back to the command
+ * subsystem which is command_process_cell().
+ *
+ * NOTE: For now, the seperation between channels and specialized channels
+ * (like channeltls) is not that well defined. So the channeltls layer calls
+ * channel_process_cell() which originally comes from the connection subsytem.
+ * This should be hopefully be fixed with #23993.
+ *
+ * For *outbound* cells, the entry point is: channel_write_packed_cell().
+ * Only packed cells are dequeued from the circuit queue by the scheduler
+ * which uses channel_flush_from_first_active_circuit() to decide which cells
+ * to flush from which circuit on the channel. They are then passed down to
+ * the channel subsystem. This calls the low layer with the function pointer
+ * .write_packed_cell().
+ *
+ * Each specialized channel (currently only channeltls_t) MUST implement a
+ * series of function found in channel_t. See channel.h for more
+ * documentation.
**/
/*
@@ -112,59 +124,6 @@ HANDLE_IMPL(channel, channel_s,)
/* Counter for ID numbers */
static uint64_t n_channels_allocated = 0;
-/*
- * Channel global byte/cell counters, for statistics and for scheduler high
- * /low-water marks.
- */
-
-/*
- * Total number of cells ever given to any channel with the
- * channel_write_*_cell() functions.
- */
-
-static uint64_t n_channel_cells_queued = 0;
-
-/*
- * Total number of cells ever passed to a channel lower layer with the
- * write_*_cell() methods.
- */
-
-static uint64_t n_channel_cells_passed_to_lower_layer = 0;
-
-/*
- * Current number of cells in all channel queues; should be
- * n_channel_cells_queued - n_channel_cells_passed_to_lower_layer.
- */
-
-static uint64_t n_channel_cells_in_queues = 0;
-
-/*
- * Total number of bytes for all cells ever queued to a channel and
- * counted in n_channel_cells_queued.
- */
-
-static uint64_t n_channel_bytes_queued = 0;
-
-/*
- * Total number of bytes for all cells ever passed to a channel lower layer
- * and counted in n_channel_cells_passed_to_lower_layer.
- */
-
-static uint64_t n_channel_bytes_passed_to_lower_layer = 0;
-
-/*
- * Current number of bytes in all channel queues; should be
- * n_channel_bytes_queued - n_channel_bytes_passed_to_lower_layer.
- */
-
-static uint64_t n_channel_bytes_in_queues = 0;
-
-/*
- * Current total estimated queue size *including lower layer queues and
- * transmit overhead*
- */
-
-STATIC uint64_t estimated_total_queue_size = 0;
/* Digest->channel map
*
@@ -201,40 +160,15 @@ HT_PROTOTYPE(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash,
HT_GENERATE2(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash,
channel_idmap_eq, 0.5, tor_reallocarray_, tor_free_)
-static cell_queue_entry_t * cell_queue_entry_dup(cell_queue_entry_t *q);
-#if 0
-static int cell_queue_entry_is_padding(cell_queue_entry_t *q);
-#endif
-static cell_queue_entry_t *
-cell_queue_entry_new_fixed(cell_t *cell);
-static cell_queue_entry_t *
-cell_queue_entry_new_var(var_cell_t *var_cell);
-static int is_destroy_cell(channel_t *chan,
- const cell_queue_entry_t *q, circid_t *circid_out);
-
-static void channel_assert_counter_consistency(void);
-
/* Functions to maintain the digest map */
-static void channel_add_to_digest_map(channel_t *chan);
static void channel_remove_from_digest_map(channel_t *chan);
-/*
- * Flush cells from just the outgoing queue without trying to get them
- * from circuits; used internall by channel_flush_some_cells().
- */
-static ssize_t
-channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
- ssize_t num_cells);
-static void channel_force_free(channel_t *chan);
-static void
-channel_free_list(smartlist_t *channels, int mark_for_close);
-static void
-channel_listener_free_list(smartlist_t *channels, int mark_for_close);
-static void channel_listener_force_free(channel_listener_t *chan_l);
-static size_t channel_get_cell_queue_entry_size(channel_t *chan,
- cell_queue_entry_t *q);
-static void
-channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q);
+static void channel_force_xfree(channel_t *chan);
+static void channel_free_list(smartlist_t *channels,
+ int mark_for_close);
+static void channel_listener_free_list(smartlist_t *channels,
+ int mark_for_close);
+static void channel_listener_force_xfree(channel_listener_t *chan_l);
/***********************************
* Channel state utility functions *
@@ -628,7 +562,7 @@ channel_listener_unregister(channel_listener_t *chan_l)
* already exist.
*/
-static void
+STATIC void
channel_add_to_digest_map(channel_t *chan)
{
channel_idmap_entry_t *ent, search;
@@ -676,33 +610,6 @@ channel_remove_from_digest_map(channel_t *chan)
/* Assert that there is a digest */
tor_assert(!tor_digest_is_zero(chan->identity_digest));
-#if 0
- /* Make sure we have a map */
- if (!channel_identity_map) {
- /*
- * No identity map, so we can't find it by definition. This
- * case is similar to digestmap_get() failing below.
- */
- log_warn(LD_BUG,
- "Trying to remove channel %p (global ID " U64_FORMAT ") "
- "with digest %s from identity map, but didn't have any identity "
- "map",
- chan, U64_PRINTF_ARG(chan->global_identifier),
- hex_str(chan->identity_digest, DIGEST_LEN));
- /* Clear out its next/prev pointers */
- if (chan->next_with_same_id) {
- chan->next_with_same_id->prev_with_same_id = chan->prev_with_same_id;
- }
- if (chan->prev_with_same_id) {
- chan->prev_with_same_id->next_with_same_id = chan->next_with_same_id;
- }
- chan->next_with_same_id = NULL;
- chan->prev_with_same_id = NULL;
-
- return;
- }
-#endif /* 0 */
-
/* Pull it out of its list, wherever that list is */
TOR_LIST_REMOVE(chan, next_with_same_id);
@@ -936,10 +843,6 @@ channel_init(channel_t *chan)
/* Warn about exhausted circuit IDs no more than hourly. */
chan->last_warned_circ_ids_exhausted.rate = 3600;
- /* Initialize queues. */
- TOR_SIMPLEQ_INIT(&chan->incoming_queue);
- TOR_SIMPLEQ_INIT(&chan->outgoing_queue);
-
/* Initialize list entries. */
memset(&chan->next_with_same_id, 0, sizeof(chan->next_with_same_id));
@@ -979,7 +882,7 @@ channel_init_listener(channel_listener_t *chan_l)
*/
void
-channel_free(channel_t *chan)
+channel_free_(channel_t *chan)
{
if (!chan) return;
@@ -1022,8 +925,6 @@ channel_free(channel_t *chan)
chan->cmux = NULL;
}
- /* We're in CLOSED or ERROR, so the cell queue is already empty */
-
tor_free(chan);
}
@@ -1034,7 +935,7 @@ channel_free(channel_t *chan)
*/
void
-channel_listener_free(channel_listener_t *chan_l)
+channel_listener_free_(channel_listener_t *chan_l)
{
if (!chan_l) return;
@@ -1052,11 +953,6 @@ channel_listener_free(channel_listener_t *chan_l)
/* Call a free method if there is one */
if (chan_l->free_fn) chan_l->free_fn(chan_l);
- /*
- * We're in CLOSED or ERROR, so the incoming channel queue is already
- * empty.
- */
-
tor_free(chan_l);
}
@@ -1067,9 +963,8 @@ channel_listener_free(channel_listener_t *chan_l)
*/
static void
-channel_force_free(channel_t *chan)
+channel_force_xfree(channel_t *chan)
{
- cell_queue_entry_t *cell, *cell_tmp;
tor_assert(chan);
log_debug(LD_CHANNEL,
@@ -1103,18 +998,6 @@ channel_force_free(channel_t *chan)
chan->cmux = NULL;
}
- /* We might still have a cell queue; kill it */
- TOR_SIMPLEQ_FOREACH_SAFE(cell, &chan->incoming_queue, next, cell_tmp) {
- cell_queue_entry_free(cell, 0);
- }
- TOR_SIMPLEQ_INIT(&chan->incoming_queue);
-
- /* Outgoing cell queue is similar, but we can have to free packed cells */
- TOR_SIMPLEQ_FOREACH_SAFE(cell, &chan->outgoing_queue, next, cell_tmp) {
- cell_queue_entry_free(cell, 0);
- }
- TOR_SIMPLEQ_INIT(&chan->outgoing_queue);
-
tor_free(chan);
}
@@ -1125,7 +1008,7 @@ channel_force_free(channel_t *chan)
*/
static void
-channel_listener_force_free(channel_listener_t *chan_l)
+channel_listener_force_xfree(channel_listener_t *chan_l)
{
tor_assert(chan_l);
@@ -1156,24 +1039,6 @@ channel_listener_force_free(channel_listener_t *chan_l)
}
/**
- * Return the current registered listener for a channel listener
- *
- * This function returns a function pointer to the current registered
- * handler for new incoming channels on a channel listener.
- */
-
-channel_listener_fn_ptr
-channel_listener_get_listener_fn(channel_listener_t *chan_l)
-{
- tor_assert(chan_l);
-
- if (chan_l->state == CHANNEL_LISTENER_STATE_LISTENING)
- return chan_l->listener;
-
- return NULL;
-}
-
-/**
* Set the listener for a channel listener
*
* This function sets the handler for new incoming channels on a channel
@@ -1237,8 +1102,7 @@ channel_get_var_cell_handler(channel_t *chan)
* Set both cell handlers for a channel
*
* This function sets both the fixed-length and variable length cell handlers
- * for a channel and processes any incoming cells that had been blocked in the
- * queue because none were available.
+ * for a channel.
*/
void
@@ -1247,8 +1111,6 @@ channel_set_cell_handlers(channel_t *chan,
channel_var_cell_handler_fn_ptr
var_cell_handler)
{
- int try_again = 0;
-
tor_assert(chan);
tor_assert(CHANNEL_CAN_HANDLE_CELLS(chan));
@@ -1259,21 +1121,9 @@ channel_set_cell_handlers(channel_t *chan,
"Setting var_cell_handler callback for channel %p to %p",
chan, var_cell_handler);
- /* Should we try the queue? */
- if (cell_handler &&
- cell_handler != chan->cell_handler) try_again = 1;
- if (var_cell_handler &&
- var_cell_handler != chan->var_cell_handler) try_again = 1;
-
/* Change them */
chan->cell_handler = cell_handler;
chan->var_cell_handler = var_cell_handler;
-
- /* Re-run the queue if we have one and there's any reason to */
- if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue) &&
- try_again &&
- (chan->cell_handler ||
- chan->var_cell_handler)) channel_process_cells(chan);
}
/*
@@ -1400,36 +1250,6 @@ channel_close_from_lower_layer(channel_t *chan)
}
/**
- * Close a channel listener from the lower layer
- *
- * Notify the channel code that the channel listener is being closed due to a
- * non-error condition in the lower layer. This does not call the close()
- * method, since the lower layer already knows.
- */
-
-void
-channel_listener_close_from_lower_layer(channel_listener_t *chan_l)
-{
- tor_assert(chan_l != NULL);
-
- /* If it's already in CLOSING, CLOSED or ERROR, this is a no-op */
- if (chan_l->state == CHANNEL_LISTENER_STATE_CLOSING ||
- chan_l->state == CHANNEL_LISTENER_STATE_CLOSED ||
- chan_l->state == CHANNEL_LISTENER_STATE_ERROR) return;
-
- log_debug(LD_CHANNEL,
- "Closing channel listener %p (global ID " U64_FORMAT ") "
- "due to lower-layer event",
- chan_l, U64_PRINTF_ARG(chan_l->global_identifier));
-
- /* Note closing by event from below */
- chan_l->reason_for_closing = CHANNEL_LISTENER_CLOSE_FROM_BELOW;
-
- /* Change state to CLOSING */
- channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_CLOSING);
-}
-
-/**
* Notify that the channel is being closed due to an error condition
*
* This function is called by the lower layer implementing the transport
@@ -1458,37 +1278,6 @@ channel_close_for_error(channel_t *chan)
}
/**
- * Notify that the channel listener is being closed due to an error condition
- *
- * This function is called by the lower layer implementing the transport
- * when a channel listener must be closed due to an error condition. This
- * does not call the channel listener's close method, since the lower layer
- * already knows.
- */
-
-void
-channel_listener_close_for_error(channel_listener_t *chan_l)
-{
- tor_assert(chan_l != NULL);
-
- /* If it's already in CLOSING, CLOSED or ERROR, this is a no-op */
- if (chan_l->state == CHANNEL_LISTENER_STATE_CLOSING ||
- chan_l->state == CHANNEL_LISTENER_STATE_CLOSED ||
- chan_l->state == CHANNEL_LISTENER_STATE_ERROR) return;
-
- log_debug(LD_CHANNEL,
- "Closing channel listener %p (global ID " U64_FORMAT ") "
- "due to lower-layer error",
- chan_l, U64_PRINTF_ARG(chan_l->global_identifier));
-
- /* Note closing by event from below */
- chan_l->reason_for_closing = CHANNEL_LISTENER_CLOSE_FOR_ERROR;
-
- /* Change state to CLOSING */
- channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_CLOSING);
-}
-
-/**
* Notify that the lower layer is finished closing the channel
*
* This function should be called by the lower layer when a channel
@@ -1522,33 +1311,6 @@ channel_closed(channel_t *chan)
}
/**
- * Notify that the lower layer is finished closing the channel listener
- *
- * This function should be called by the lower layer when a channel listener
- * is finished closing and it should be regarded as inactive and
- * freed by the channel code.
- */
-
-void
-channel_listener_closed(channel_listener_t *chan_l)
-{
- tor_assert(chan_l);
- tor_assert(chan_l->state == CHANNEL_LISTENER_STATE_CLOSING ||
- chan_l->state == CHANNEL_LISTENER_STATE_CLOSED ||
- chan_l->state == CHANNEL_LISTENER_STATE_ERROR);
-
- /* No-op if already inactive */
- if (chan_l->state == CHANNEL_LISTENER_STATE_CLOSED ||
- chan_l->state == CHANNEL_LISTENER_STATE_ERROR) return;
-
- if (chan_l->reason_for_closing != CHANNEL_LISTENER_CLOSE_FOR_ERROR) {
- channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_CLOSED);
- } else {
- channel_listener_change_state(chan_l, CHANNEL_LISTENER_STATE_ERROR);
- }
-}
-
-/**
* Clear the identity_digest of a channel
*
* This function clears the identity digest of the remote endpoint for a
@@ -1638,7 +1400,7 @@ channel_set_identity_digest(channel_t *chan,
}
/**
- * Clear the remote end metadata (identity_digest/nickname) of a channel
+ * Clear the remote end metadata (identity_digest) of a channel
*
* This function clears all the remote end info from a channel; this is
* intended for use by the lower layer.
@@ -1665,419 +1427,95 @@ channel_clear_remote_end(channel_t *chan)
memset(chan->identity_digest, 0,
sizeof(chan->identity_digest));
- tor_free(chan->nickname);
}
/**
- * Set the remote end metadata (identity_digest/nickname) of a channel
+ * Write to a channel the given packed cell.
*
- * This function sets new remote end info on a channel; this is intended
- * for use by the lower layer.
- */
-
-void
-channel_set_remote_end(channel_t *chan,
- const char *identity_digest,
- const char *nickname)
-{
- int was_in_digest_map, should_be_in_digest_map, state_not_in_map;
-
- tor_assert(chan);
-
- log_debug(LD_CHANNEL,
- "Setting remote endpoint identity on channel %p with "
- "global ID " U64_FORMAT " to nickname %s, digest %s",
- chan, U64_PRINTF_ARG(chan->global_identifier),
- nickname ? nickname : "(null)",
- identity_digest ?
- hex_str(identity_digest, DIGEST_LEN) : "(null)");
-
- state_not_in_map = CHANNEL_CONDEMNED(chan);
-
- was_in_digest_map =
- !state_not_in_map &&
- chan->registered &&
- !tor_digest_is_zero(chan->identity_digest);
- should_be_in_digest_map =
- !state_not_in_map &&
- chan->registered &&
- (identity_digest &&
- !tor_digest_is_zero(identity_digest));
-
- if (was_in_digest_map)
- /* We should always remove it; we'll add it back if we're writing
- * in a new digest.
- */
- channel_remove_from_digest_map(chan);
-
- if (identity_digest) {
- memcpy(chan->identity_digest,
- identity_digest,
- sizeof(chan->identity_digest));
-
- } else {
- memset(chan->identity_digest, 0,
- sizeof(chan->identity_digest));
- }
-
- tor_free(chan->nickname);
- if (nickname)
- chan->nickname = tor_strdup(nickname);
-
- /* Put it in the digest map if we should */
- if (should_be_in_digest_map)
- channel_add_to_digest_map(chan);
-}
-
-/**
- * Duplicate a cell queue entry; this is a shallow copy intended for use
- * in channel_write_cell_queue_entry().
- */
-
-static cell_queue_entry_t *
-cell_queue_entry_dup(cell_queue_entry_t *q)
-{
- cell_queue_entry_t *rv = NULL;
-
- tor_assert(q);
-
- rv = tor_malloc(sizeof(*rv));
- memcpy(rv, q, sizeof(*rv));
-
- return rv;
-}
-
-/**
- * Free a cell_queue_entry_t; the handed_off parameter indicates whether
- * the contents were passed to the lower layer (it is responsible for
- * them) or not (we should free).
- */
-
-STATIC void
-cell_queue_entry_free(cell_queue_entry_t *q, int handed_off)
-{
- if (!q) return;
-
- if (!handed_off) {
- /*
- * If we handed it off, the recipient becomes responsible (or
- * with packed cells the channel_t subclass calls packed_cell
- * free after writing out its contents; see, e.g.,
- * channel_tls_write_packed_cell_method(). Otherwise, we have
- * to take care of it here if possible.
- */
- switch (q->type) {
- case CELL_QUEUE_FIXED:
- if (q->u.fixed.cell) {
- /*
- * There doesn't seem to be a cell_free() function anywhere in the
- * pre-channel code; just use tor_free()
- */
- tor_free(q->u.fixed.cell);
- }
- break;
- case CELL_QUEUE_PACKED:
- if (q->u.packed.packed_cell) {
- packed_cell_free(q->u.packed.packed_cell);
- }
- break;
- case CELL_QUEUE_VAR:
- if (q->u.var.var_cell) {
- /*
- * This one's in connection_or.c; it'd be nice to figure out the
- * whole flow of cells from one end to the other and factor the
- * cell memory management functions like this out of the specific
- * TLS lower layer.
- */
- var_cell_free(q->u.var.var_cell);
- }
- break;
- default:
- /*
- * Nothing we can do if we don't know the type; this will
- * have been warned about elsewhere.
- */
- break;
- }
- }
- tor_free(q);
-}
-
-#if 0
-/**
- * Check whether a cell queue entry is padding; this is a helper function
- * for channel_write_cell_queue_entry()
- */
-
-static int
-cell_queue_entry_is_padding(cell_queue_entry_t *q)
-{
- tor_assert(q);
-
- if (q->type == CELL_QUEUE_FIXED) {
- if (q->u.fixed.cell) {
- if (q->u.fixed.cell->command == CELL_PADDING ||
- q->u.fixed.cell->command == CELL_VPADDING) {
- return 1;
- }
- }
- } else if (q->type == CELL_QUEUE_VAR) {
- if (q->u.var.var_cell) {
- if (q->u.var.var_cell->command == CELL_PADDING ||
- q->u.var.var_cell->command == CELL_VPADDING) {
- return 1;
- }
- }
- }
-
- return 0;
-}
-#endif /* 0 */
-
-/**
- * Allocate a new cell queue entry for a fixed-size cell
- */
-
-static cell_queue_entry_t *
-cell_queue_entry_new_fixed(cell_t *cell)
-{
- cell_queue_entry_t *q = NULL;
-
- tor_assert(cell);
-
- q = tor_malloc(sizeof(*q));
- q->type = CELL_QUEUE_FIXED;
- q->u.fixed.cell = cell;
-
- return q;
-}
-
-/**
- * Allocate a new cell queue entry for a variable-size cell
- */
-
-static cell_queue_entry_t *
-cell_queue_entry_new_var(var_cell_t *var_cell)
-{
- cell_queue_entry_t *q = NULL;
-
- tor_assert(var_cell);
-
- q = tor_malloc(sizeof(*q));
- q->type = CELL_QUEUE_VAR;
- q->u.var.var_cell = var_cell;
-
- return q;
-}
-
-/**
- * Ask how big the cell contained in a cell_queue_entry_t is
- */
-
-static size_t
-channel_get_cell_queue_entry_size(channel_t *chan, cell_queue_entry_t *q)
-{
- size_t rv = 0;
-
- tor_assert(chan);
- tor_assert(q);
-
- switch (q->type) {
- case CELL_QUEUE_FIXED:
- rv = get_cell_network_size(chan->wide_circ_ids);
- break;
- case CELL_QUEUE_VAR:
- rv = get_var_cell_header_size(chan->wide_circ_ids) +
- (q->u.var.var_cell ? q->u.var.var_cell->payload_len : 0);
- break;
- case CELL_QUEUE_PACKED:
- rv = get_cell_network_size(chan->wide_circ_ids);
- break;
- default:
- tor_assert_nonfatal_unreached_once();
- }
-
- return rv;
-}
-
-/**
- * Write to a channel based on a cell_queue_entry_t
*
- * Given a cell_queue_entry_t filled out by the caller, try to send the cell
- * and queue it if we can't.
+ * Two possible errors can happen. Either the channel is not opened or the
+ * lower layer (specialized channel) failed to write it. In both cases, it is
+ * the caller responsability to free the cell.
*/
-
-static void
-channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
+static int
+write_packed_cell(channel_t *chan, packed_cell_t *cell)
{
- int result = 0, sent = 0;
- cell_queue_entry_t *tmp = NULL;
+ int ret = -1;
size_t cell_bytes;
tor_assert(chan);
- tor_assert(q);
+ tor_assert(cell);
/* Assert that the state makes sense for a cell write */
tor_assert(CHANNEL_CAN_HANDLE_CELLS(chan));
{
circid_t circ_id;
- if (is_destroy_cell(chan, q, &circ_id)) {
+ if (packed_cell_is_destroy(chan, cell, &circ_id)) {
channel_note_destroy_not_pending(chan, circ_id);
}
}
/* For statistical purposes, figure out how big this cell is */
- cell_bytes = channel_get_cell_queue_entry_size(chan, q);
+ cell_bytes = get_cell_network_size(chan->wide_circ_ids);
/* Can we send it right out? If so, try */
- if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) &&
- CHANNEL_IS_OPEN(chan)) {
- /* Pick the right write function for this cell type and save the result */
- switch (q->type) {
- case CELL_QUEUE_FIXED:
- tor_assert(chan->write_cell);
- tor_assert(q->u.fixed.cell);
- result = chan->write_cell(chan, q->u.fixed.cell);
- break;
- case CELL_QUEUE_PACKED:
- tor_assert(chan->write_packed_cell);
- tor_assert(q->u.packed.packed_cell);
- result = chan->write_packed_cell(chan, q->u.packed.packed_cell);
- break;
- case CELL_QUEUE_VAR:
- tor_assert(chan->write_var_cell);
- tor_assert(q->u.var.var_cell);
- result = chan->write_var_cell(chan, q->u.var.var_cell);
- break;
- default:
- tor_assert(1);
- }
-
- /* Check if we got it out */
- if (result > 0) {
- sent = 1;
- /* Timestamp for transmission */
- channel_timestamp_xmit(chan);
- /* If we're here the queue is empty, so it's drained too */
- channel_timestamp_drained(chan);
- /* Update the counter */
- ++(chan->n_cells_xmitted);
- chan->n_bytes_xmitted += cell_bytes;
- /* Update global counters */
- ++n_channel_cells_queued;
- ++n_channel_cells_passed_to_lower_layer;
- n_channel_bytes_queued += cell_bytes;
- n_channel_bytes_passed_to_lower_layer += cell_bytes;
- channel_assert_counter_consistency();
- }
+ if (!CHANNEL_IS_OPEN(chan)) {
+ goto done;
}
- if (!sent) {
- /* Not sent, queue it */
- /*
- * We have to copy the queue entry passed in, since the caller probably
- * used the stack.
- */
- tmp = cell_queue_entry_dup(q);
- TOR_SIMPLEQ_INSERT_TAIL(&chan->outgoing_queue, tmp, next);
- /* Update global counters */
- ++n_channel_cells_queued;
- ++n_channel_cells_in_queues;
- n_channel_bytes_queued += cell_bytes;
- n_channel_bytes_in_queues += cell_bytes;
- channel_assert_counter_consistency();
- /* Update channel queue size */
- chan->bytes_in_queue += cell_bytes;
- /* Try to process the queue? */
- if (CHANNEL_IS_OPEN(chan)) channel_flush_cells(chan);
+ /* Write the cell on the connection's outbuf. */
+ if (chan->write_packed_cell(chan, cell) < 0) {
+ goto done;
}
+ /* Timestamp for transmission */
+ channel_timestamp_xmit(chan);
+ /* Update the counter */
+ ++(chan->n_cells_xmitted);
+ chan->n_bytes_xmitted += cell_bytes;
+ /* Successfully sent the cell. */
+ ret = 0;
+
+ done:
+ return ret;
}
-/** Write a generic cell type to a channel
+/**
+ * Write a packed cell to a channel
+ *
+ * Write a packed cell to a channel using the write_cell() method. This is
+ * called by the transport-independent code to deliver a packed cell to a
+ * channel for transmission.
*
- * Write a generic cell to a channel. It is called by channel_write_cell(),
- * channel_write_var_cell() and channel_write_packed_cell() in order to reduce
- * code duplication. Notice that it takes cell as pointer of type void,
- * this can be dangerous because no type check is performed.
+ * Return 0 on success else a negative value. In both cases, the caller should
+ * not access the cell anymore, it is freed both on success and error.
*/
-
-void
-channel_write_cell_generic_(channel_t *chan, const char *cell_type,
- void *cell, cell_queue_entry_t *q)
+int
+channel_write_packed_cell(channel_t *chan, packed_cell_t *cell)
{
+ int ret = -1;
tor_assert(chan);
tor_assert(cell);
if (CHANNEL_IS_CLOSING(chan)) {
- log_debug(LD_CHANNEL, "Discarding %c %p on closing channel %p with "
- "global ID "U64_FORMAT, *cell_type, cell, chan,
+ log_debug(LD_CHANNEL, "Discarding %p on closing channel %p with "
+ "global ID "U64_FORMAT, cell, chan,
U64_PRINTF_ARG(chan->global_identifier));
- tor_free(cell);
- return;
+ goto end;
}
log_debug(LD_CHANNEL,
- "Writing %c %p to channel %p with global ID "
- U64_FORMAT, *cell_type,
- cell, chan, U64_PRINTF_ARG(chan->global_identifier));
-
- channel_write_cell_queue_entry(chan, q);
- /* Update the queue size estimate */
- channel_update_xmit_queue_size(chan);
-}
-
-/**
- * Write a cell to a channel
- *
- * Write a fixed-length cell to a channel using the write_cell() method.
- * This is equivalent to the pre-channels connection_or_write_cell_to_buf();
- * it is called by the transport-independent code to deliver a cell to a
- * channel for transmission.
- */
+ "Writing %p to channel %p with global ID "
+ U64_FORMAT, cell, chan, U64_PRINTF_ARG(chan->global_identifier));
-void
-channel_write_cell(channel_t *chan, cell_t *cell)
-{
- cell_queue_entry_t q;
- q.type = CELL_QUEUE_FIXED;
- q.u.fixed.cell = cell;
- channel_write_cell_generic_(chan, "cell_t", cell, &q);
-}
+ ret = write_packed_cell(chan, cell);
-/**
- * Write a packed cell to a channel
- *
- * Write a packed cell to a channel using the write_cell() method. This is
- * called by the transport-independent code to deliver a packed cell to a
- * channel for transmission.
- */
-
-void
-channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell)
-{
- cell_queue_entry_t q;
- q.type = CELL_QUEUE_PACKED;
- q.u.packed.packed_cell = packed_cell;
- channel_write_cell_generic_(chan, "packed_cell_t", packed_cell, &q);
-}
-
-/**
- * Write a variable-length cell to a channel
- *
- * Write a variable-length cell to a channel using the write_cell() method.
- * This is equivalent to the pre-channels
- * connection_or_write_var_cell_to_buf(); it's called by the transport-
- * independent code to deliver a var_cell to a channel for transmission.
- */
-
-void
-channel_write_var_cell(channel_t *chan, var_cell_t *var_cell)
-{
- cell_queue_entry_t q;
- q.type = CELL_QUEUE_VAR;
- q.u.var.var_cell = var_cell;
- channel_write_cell_generic_(chan, "var_cell_t", var_cell, &q);
+ end:
+ /* Whatever happens, we free the cell. Either an error occured or the cell
+ * was put on the connection outbuf, both cases we have ownership of the
+ * cell and we free it. */
+ packed_cell_free(cell);
+ return ret;
}
/**
@@ -2119,15 +1557,6 @@ channel_change_state_(channel_t *chan, channel_state_t to_state)
tor_assert(chan->reason_for_closing != CHANNEL_NOT_CLOSING);
}
- /*
- * We need to maintain the queues here for some transitions:
- * when we enter CHANNEL_STATE_OPEN (especially from CHANNEL_STATE_MAINT)
- * we may have a backlog of cells to transmit, so drain the queues in
- * that case, and when going to CHANNEL_STATE_CLOSED the subclass
- * should have made sure to finish sending things (or gone to
- * CHANNEL_STATE_ERROR if not possible), so we assert for that here.
- */
-
log_debug(LD_CHANNEL,
"Changing state of channel %p (global ID " U64_FORMAT
") from \"%s\" to \"%s\"",
@@ -2184,36 +1613,6 @@ channel_change_state_(channel_t *chan, channel_state_t to_state)
} else if (to_state == CHANNEL_STATE_MAINT) {
scheduler_channel_doesnt_want_writes(chan);
}
-
- /*
- * If we're closing, this channel no longer counts toward the global
- * estimated queue size; if we're open, it now does.
- */
- if ((to_state == CHANNEL_STATE_CLOSING ||
- to_state == CHANNEL_STATE_CLOSED ||
- to_state == CHANNEL_STATE_ERROR) &&
- (from_state == CHANNEL_STATE_OPEN ||
- from_state == CHANNEL_STATE_MAINT)) {
- estimated_total_queue_size -= chan->bytes_in_queue;
- }
-
- /*
- * If we're opening, this channel now does count toward the global
- * estimated queue size.
- */
- if ((to_state == CHANNEL_STATE_OPEN ||
- to_state == CHANNEL_STATE_MAINT) &&
- !(from_state == CHANNEL_STATE_OPEN ||
- from_state == CHANNEL_STATE_MAINT)) {
- estimated_total_queue_size += chan->bytes_in_queue;
- }
-
- if (to_state == CHANNEL_STATE_CLOSED ||
- to_state == CHANNEL_STATE_ERROR) {
- /* Assert that all queues are empty */
- tor_assert(TOR_SIMPLEQ_EMPTY(&chan->incoming_queue));
- tor_assert(TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue));
- }
}
/**
@@ -2237,12 +1636,6 @@ channel_change_state_open(channel_t *chan)
/* Tell circuits if we opened and stuff */
channel_do_open_actions(chan);
chan->has_been_open = 1;
-
- /* Check for queued cells to process */
- if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue))
- channel_process_cells(chan);
- if (! TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue))
- channel_flush_cells(chan);
}
/**
@@ -2284,15 +1677,6 @@ channel_listener_change_state(channel_listener_t *chan_l,
tor_assert(chan_l->reason_for_closing != CHANNEL_LISTENER_NOT_CLOSING);
}
- /*
- * We need to maintain the queues here for some transitions:
- * when we enter CHANNEL_STATE_OPEN (especially from CHANNEL_STATE_MAINT)
- * we may have a backlog of cells to transmit, so drain the queues in
- * that case, and when going to CHANNEL_STATE_CLOSED the subclass
- * should have made sure to finish sending things (or gone to
- * CHANNEL_STATE_ERROR if not possible), so we assert for that here.
- */
-
log_debug(LD_CHANNEL,
"Changing state of channel listener %p (global ID " U64_FORMAT
"from \"%s\" to \"%s\"",
@@ -2325,30 +1709,38 @@ channel_listener_change_state(channel_listener_t *chan_l,
if (to_state == CHANNEL_LISTENER_STATE_CLOSED ||
to_state == CHANNEL_LISTENER_STATE_ERROR) {
- /* Assert that the queue is empty */
tor_assert(!(chan_l->incoming_list) ||
smartlist_len(chan_l->incoming_list) == 0);
}
}
-/**
- * Try to flush cells to the lower layer
- *
- * this is called by the lower layer to indicate that it wants more cells;
- * it will try to write up to num_cells cells from the channel's cell queue or
- * from circuits active on that channel, or as many as it has available if
- * num_cells == -1.
- */
-
+/* Maximum number of cells that is allowed to flush at once withing
+ * channel_flush_some_cells(). */
#define MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED 256
+/* Try to flush cells of the given channel chan up to a maximum of num_cells.
+ *
+ * This is called by the scheduler when it wants to flush cells from the
+ * channel's circuit queue(s) to the connection outbuf (not yet on the wire).
+ *
+ * If the channel is not in state CHANNEL_STATE_OPEN, this does nothing and
+ * will return 0 meaning no cells were flushed.
+ *
+ * If num_cells is -1, we'll try to flush up to the maximum cells allowed
+ * defined in MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED.
+ *
+ * On success, the number of flushed cells are returned and it can never be
+ * above num_cells. If 0 is returned, no cells were flushed either because the
+ * channel was not opened or we had no cells on the channel. A negative number
+ * can NOT be sent back.
+ *
+ * This function is part of the fast path. */
MOCK_IMPL(ssize_t,
channel_flush_some_cells, (channel_t *chan, ssize_t num_cells))
{
unsigned int unlimited = 0;
ssize_t flushed = 0;
- int num_cells_from_circs, clamped_num_cells;
- int q_len_before, q_len_after;
+ int clamped_num_cells;
tor_assert(chan);
@@ -2357,11 +1749,6 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells))
/* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */
if (CHANNEL_IS_OPEN(chan)) {
- /* Try to flush as much as we can that's already queued */
- flushed += channel_flush_some_cells_from_outgoing_queue(chan,
- (unlimited ? -1 : num_cells - flushed));
- if (!unlimited && num_cells <= flushed) goto done;
-
if (circuitmux_num_cells(chan->cmux) > 0) {
/* Calculate number of cells, including clamp */
if (unlimited) {
@@ -2375,45 +1762,9 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells))
}
}
- /*
- * Keep track of the change in queue size; we have to count cells
- * channel_flush_from_first_active_circuit() writes out directly,
- * but not double-count ones we might get later in
- * channel_flush_some_cells_from_outgoing_queue()
- */
- q_len_before = chan_cell_queue_len(&(chan->outgoing_queue));
-
/* Try to get more cells from any active circuits */
- num_cells_from_circs = channel_flush_from_first_active_circuit(
+ flushed = channel_flush_from_first_active_circuit(
chan, clamped_num_cells);
-
- q_len_after = chan_cell_queue_len(&(chan->outgoing_queue));
-
- /*
- * If it claims we got some, adjust the flushed counter and consider
- * processing the queue again
- */
- if (num_cells_from_circs > 0) {
- /*
- * Adjust flushed by the number of cells counted in
- * num_cells_from_circs that didn't go to the cell queue.
- */
-
- if (q_len_after > q_len_before) {
- num_cells_from_circs -= (q_len_after - q_len_before);
- if (num_cells_from_circs < 0) num_cells_from_circs = 0;
- }
-
- flushed += num_cells_from_circs;
-
- /* Now process the queue if necessary */
-
- if ((q_len_after > q_len_before) &&
- (unlimited || (flushed < num_cells))) {
- flushed += channel_flush_some_cells_from_outgoing_queue(chan,
- (unlimited ? -1 : num_cells - flushed));
- }
- }
}
}
@@ -2422,197 +1773,16 @@ channel_flush_some_cells, (channel_t *chan, ssize_t num_cells))
}
/**
- * Flush cells from just the channel's outgoing cell queue
- *
- * This gets called from channel_flush_some_cells() above to flush cells
- * just from the queue without trying for active_circuits.
- */
-
-static ssize_t
-channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
- ssize_t num_cells)
-{
- unsigned int unlimited = 0;
- ssize_t flushed = 0;
- cell_queue_entry_t *q = NULL;
- size_t cell_size;
- int free_q = 0, handed_off = 0;
-
- tor_assert(chan);
- tor_assert(chan->write_cell);
- tor_assert(chan->write_packed_cell);
- tor_assert(chan->write_var_cell);
-
- if (num_cells < 0) unlimited = 1;
- if (!unlimited && num_cells <= flushed) return 0;
-
- /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */
- if (CHANNEL_IS_OPEN(chan)) {
- while ((unlimited || num_cells > flushed) &&
- NULL != (q = TOR_SIMPLEQ_FIRST(&chan->outgoing_queue))) {
- free_q = 0;
- handed_off = 0;
-
- /* Figure out how big it is for statistical purposes */
- cell_size = channel_get_cell_queue_entry_size(chan, q);
- /*
- * Okay, we have a good queue entry, try to give it to the lower
- * layer.
- */
- switch (q->type) {
- case CELL_QUEUE_FIXED:
- if (q->u.fixed.cell) {
- if (chan->write_cell(chan,
- q->u.fixed.cell)) {
- ++flushed;
- channel_timestamp_xmit(chan);
- ++(chan->n_cells_xmitted);
- chan->n_bytes_xmitted += cell_size;
- free_q = 1;
- handed_off = 1;
- }
- /* Else couldn't write it; leave it on the queue */
- } else {
- /* This shouldn't happen */
- log_info(LD_CHANNEL,
- "Saw broken cell queue entry of type CELL_QUEUE_FIXED "
- "with no cell on channel %p "
- "(global ID " U64_FORMAT ").",
- chan, U64_PRINTF_ARG(chan->global_identifier));
- /* Throw it away */
- free_q = 1;
- handed_off = 0;
- }
- break;
- case CELL_QUEUE_PACKED:
- if (q->u.packed.packed_cell) {
- if (chan->write_packed_cell(chan,
- q->u.packed.packed_cell)) {
- ++flushed;
- channel_timestamp_xmit(chan);
- ++(chan->n_cells_xmitted);
- chan->n_bytes_xmitted += cell_size;
- free_q = 1;
- handed_off = 1;
- }
- /* Else couldn't write it; leave it on the queue */
- } else {
- /* This shouldn't happen */
- log_info(LD_CHANNEL,
- "Saw broken cell queue entry of type CELL_QUEUE_PACKED "
- "with no cell on channel %p "
- "(global ID " U64_FORMAT ").",
- chan, U64_PRINTF_ARG(chan->global_identifier));
- /* Throw it away */
- free_q = 1;
- handed_off = 0;
- }
- break;
- case CELL_QUEUE_VAR:
- if (q->u.var.var_cell) {
- if (chan->write_var_cell(chan,
- q->u.var.var_cell)) {
- ++flushed;
- channel_timestamp_xmit(chan);
- ++(chan->n_cells_xmitted);
- chan->n_bytes_xmitted += cell_size;
- free_q = 1;
- handed_off = 1;
- }
- /* Else couldn't write it; leave it on the queue */
- } else {
- /* This shouldn't happen */
- log_info(LD_CHANNEL,
- "Saw broken cell queue entry of type CELL_QUEUE_VAR "
- "with no cell on channel %p "
- "(global ID " U64_FORMAT ").",
- chan, U64_PRINTF_ARG(chan->global_identifier));
- /* Throw it away */
- free_q = 1;
- handed_off = 0;
- }
- break;
- default:
- /* Unknown type, log and free it */
- log_info(LD_CHANNEL,
- "Saw an unknown cell queue entry type %d on channel %p "
- "(global ID " U64_FORMAT "; ignoring it."
- " Someone should fix this.",
- q->type, chan, U64_PRINTF_ARG(chan->global_identifier));
- free_q = 1;
- handed_off = 0;
- }
-
- /*
- * if free_q is set, we used it and should remove the queue entry;
- * we have to do the free down here so TOR_SIMPLEQ_REMOVE_HEAD isn't
- * accessing freed memory
- */
- if (free_q) {
- TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next);
- /*
- * ...and we handed a cell off to the lower layer, so we should
- * update the counters.
- */
- ++n_channel_cells_passed_to_lower_layer;
- --n_channel_cells_in_queues;
- n_channel_bytes_passed_to_lower_layer += cell_size;
- n_channel_bytes_in_queues -= cell_size;
- channel_assert_counter_consistency();
- /* Update the channel's queue size too */
- chan->bytes_in_queue -= cell_size;
- /* Finally, free q */
- cell_queue_entry_free(q, handed_off);
- q = NULL;
- } else {
- /* No cell removed from list, so we can't go on any further */
- break;
- }
- }
- }
-
- /* Did we drain the queue? */
- if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) {
- channel_timestamp_drained(chan);
- }
-
- /* Update the estimate queue size */
- channel_update_xmit_queue_size(chan);
-
- return flushed;
-}
-
-/**
- * Flush as many cells as we possibly can from the queue
- *
- * This tries to flush as many cells from the queue as the lower layer
- * will take. It just calls channel_flush_some_cells_from_outgoing_queue()
- * in unlimited mode.
- */
-
-void
-channel_flush_cells(channel_t *chan)
-{
- channel_flush_some_cells_from_outgoing_queue(chan, -1);
-}
-
-/**
* Check if any cells are available
*
- * This gets used from the lower layer to check if any more cells are
- * available.
+ * This is used by the scheduler to know if the channel has more to flush
+ * after a scheduling round.
*/
-
MOCK_IMPL(int,
channel_more_to_flush, (channel_t *chan))
{
tor_assert(chan);
- /* Check if we have any queued */
- if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue))
- return 1;
-
- /* Check if any circuits would like to queue some */
if (circuitmux_num_cells(chan->cmux) > 0) return 1;
/* Else no */
@@ -2816,207 +1986,31 @@ channel_listener_queue_incoming(channel_listener_t *listener,
}
/**
- * Process queued incoming cells
- *
- * Process as many queued cells as we can from the incoming
- * cell queue.
+ * Process a cell from the given channel.
*/
-
void
-channel_process_cells(channel_t *chan)
+channel_process_cell(channel_t *chan, cell_t *cell)
{
- cell_queue_entry_t *q;
tor_assert(chan);
tor_assert(CHANNEL_IS_CLOSING(chan) || CHANNEL_IS_MAINT(chan) ||
CHANNEL_IS_OPEN(chan));
-
- log_debug(LD_CHANNEL,
- "Processing as many incoming cells as we can for channel %p",
- chan);
-
- /* Nothing we can do if we have no registered cell handlers */
- if (!(chan->cell_handler ||
- chan->var_cell_handler)) return;
- /* Nothing we can do if we have no cells */
- if (TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) return;
-
- /*
- * Process cells until we're done or find one we have no current handler
- * for.
- *
- * We must free the cells here after calling the handler, since custody
- * of the buffer was given to the channel layer when they were queued;
- * see comments on memory management in channel_queue_cell() and in
- * channel_queue_var_cell() below.
- */
- while (NULL != (q = TOR_SIMPLEQ_FIRST(&chan->incoming_queue))) {
- tor_assert(q);
- tor_assert(q->type == CELL_QUEUE_FIXED ||
- q->type == CELL_QUEUE_VAR);
-
- if (q->type == CELL_QUEUE_FIXED &&
- chan->cell_handler) {
- /* Handle a fixed-length cell */
- TOR_SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next);
- tor_assert(q->u.fixed.cell);
- log_debug(LD_CHANNEL,
- "Processing incoming cell_t %p for channel %p (global ID "
- U64_FORMAT ")",
- q->u.fixed.cell, chan,
- U64_PRINTF_ARG(chan->global_identifier));
- chan->cell_handler(chan, q->u.fixed.cell);
- tor_free(q->u.fixed.cell);
- tor_free(q);
- } else if (q->type == CELL_QUEUE_VAR &&
- chan->var_cell_handler) {
- /* Handle a variable-length cell */
- TOR_SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next);
- tor_assert(q->u.var.var_cell);
- log_debug(LD_CHANNEL,
- "Processing incoming var_cell_t %p for channel %p (global ID "
- U64_FORMAT ")",
- q->u.var.var_cell, chan,
- U64_PRINTF_ARG(chan->global_identifier));
- chan->var_cell_handler(chan, q->u.var.var_cell);
- tor_free(q->u.var.var_cell);
- tor_free(q);
- } else {
- /* Can't handle this one */
- break;
- }
- }
-}
-
-/**
- * Queue incoming cell
- *
- * This should be called by a channel_t subclass to queue an incoming fixed-
- * length cell for processing, and process it if possible.
- */
-
-void
-channel_queue_cell(channel_t *chan, cell_t *cell)
-{
- int need_to_queue = 0;
- cell_queue_entry_t *q;
- cell_t *cell_copy = NULL;
-
- tor_assert(chan);
tor_assert(cell);
- tor_assert(CHANNEL_IS_OPEN(chan));
- /* Do we need to queue it, or can we just call the handler right away? */
- if (!(chan->cell_handler)) need_to_queue = 1;
- if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue))
- need_to_queue = 1;
+ /* Nothing we can do if we have no registered cell handlers */
+ if (!chan->cell_handler)
+ return;
/* Timestamp for receiving */
channel_timestamp_recv(chan);
-
- /* Update the counters */
+ /* Update received counter. */
++(chan->n_cells_recved);
chan->n_bytes_recved += get_cell_network_size(chan->wide_circ_ids);
- /* If we don't need to queue we can just call cell_handler */
- if (!need_to_queue) {
- tor_assert(chan->cell_handler);
- log_debug(LD_CHANNEL,
- "Directly handling incoming cell_t %p for channel %p "
- "(global ID " U64_FORMAT ")",
- cell, chan,
- U64_PRINTF_ARG(chan->global_identifier));
- chan->cell_handler(chan, cell);
- } else {
- /*
- * Otherwise queue it and then process the queue if possible.
- *
- * We queue a copy, not the original pointer - it might have been on the
- * stack in connection_or_process_cells_from_inbuf() (or another caller
- * if we ever have a subclass other than channel_tls_t), or be freed
- * there after we return. This is the uncommon case; the non-copying
- * fast path occurs in the if (!need_to_queue) case above when the
- * upper layer has installed cell handlers.
- */
- cell_copy = tor_malloc_zero(sizeof(cell_t));
- memcpy(cell_copy, cell, sizeof(cell_t));
- q = cell_queue_entry_new_fixed(cell_copy);
- log_debug(LD_CHANNEL,
- "Queueing incoming cell_t %p for channel %p "
- "(global ID " U64_FORMAT ")",
- cell, chan,
- U64_PRINTF_ARG(chan->global_identifier));
- TOR_SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next);
- if (chan->cell_handler ||
- chan->var_cell_handler) {
- channel_process_cells(chan);
- }
- }
-}
-
-/**
- * Queue incoming variable-length cell
- *
- * This should be called by a channel_t subclass to queue an incoming
- * variable-length cell for processing, and process it if possible.
- */
-
-void
-channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
-{
- int need_to_queue = 0;
- cell_queue_entry_t *q;
- var_cell_t *cell_copy = NULL;
-
- tor_assert(chan);
- tor_assert(var_cell);
- tor_assert(CHANNEL_IS_OPEN(chan));
-
- /* Do we need to queue it, or can we just call the handler right away? */
- if (!(chan->var_cell_handler)) need_to_queue = 1;
- if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue))
- need_to_queue = 1;
-
- /* Timestamp for receiving */
- channel_timestamp_recv(chan);
-
- /* Update the counter */
- ++(chan->n_cells_recved);
- chan->n_bytes_recved += get_var_cell_header_size(chan->wide_circ_ids) +
- var_cell->payload_len;
-
- /* If we don't need to queue we can just call cell_handler */
- if (!need_to_queue) {
- tor_assert(chan->var_cell_handler);
- log_debug(LD_CHANNEL,
- "Directly handling incoming var_cell_t %p for channel %p "
- "(global ID " U64_FORMAT ")",
- var_cell, chan,
- U64_PRINTF_ARG(chan->global_identifier));
- chan->var_cell_handler(chan, var_cell);
- } else {
- /*
- * Otherwise queue it and then process the queue if possible.
- *
- * We queue a copy, not the original pointer - it might have been on the
- * stack in connection_or_process_cells_from_inbuf() (or another caller
- * if we ever have a subclass other than channel_tls_t), or be freed
- * there after we return. This is the uncommon case; the non-copying
- * fast path occurs in the if (!need_to_queue) case above when the
- * upper layer has installed cell handlers.
- */
- cell_copy = var_cell_copy(var_cell);
- q = cell_queue_entry_new_var(cell_copy);
- log_debug(LD_CHANNEL,
- "Queueing incoming var_cell_t %p for channel %p "
- "(global ID " U64_FORMAT ")",
- var_cell, chan,
- U64_PRINTF_ARG(chan->global_identifier));
- TOR_SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next);
- if (chan->cell_handler ||
- chan->var_cell_handler) {
- channel_process_cells(chan);
- }
- }
+ log_debug(LD_CHANNEL,
+ "Processing incoming cell_t %p for channel %p (global ID "
+ U64_FORMAT ")", cell, chan,
+ U64_PRINTF_ARG(chan->global_identifier));
+ chan->cell_handler(chan, cell);
}
/** If <b>packed_cell</b> on <b>chan</b> is a destroy cell, then set
@@ -3043,44 +2037,6 @@ packed_cell_is_destroy(channel_t *chan,
}
/**
- * Assert that the global channel stats counters are internally consistent
- */
-
-static void
-channel_assert_counter_consistency(void)
-{
- tor_assert(n_channel_cells_queued ==
- (n_channel_cells_in_queues + n_channel_cells_passed_to_lower_layer));
- tor_assert(n_channel_bytes_queued ==
- (n_channel_bytes_in_queues + n_channel_bytes_passed_to_lower_layer));
-}
-
-/* DOCDOC */
-static int
-is_destroy_cell(channel_t *chan,
- const cell_queue_entry_t *q, circid_t *circid_out)
-{
- *circid_out = 0;
- switch (q->type) {
- case CELL_QUEUE_FIXED:
- if (q->u.fixed.cell->command == CELL_DESTROY) {
- *circid_out = q->u.fixed.cell->circ_id;
- return 1;
- }
- break;
- case CELL_QUEUE_VAR:
- if (q->u.var.var_cell->command == CELL_DESTROY) {
- *circid_out = q->u.var.var_cell->circ_id;
- return 1;
- }
- break;
- case CELL_QUEUE_PACKED:
- return packed_cell_is_destroy(chan, q->u.packed.packed_cell, circid_out);
- }
- return 0;
-}
-
-/**
* Send destroy cell on a channel
*
* Write a destroy cell with circ ID <b>circ_id</b> and reason <b>reason</b>
@@ -3134,19 +2090,6 @@ channel_dumpstats(int severity)
{
if (all_channels && smartlist_len(all_channels) > 0) {
tor_log(severity, LD_GENERAL,
- "Channels have queued " U64_FORMAT " bytes in " U64_FORMAT " cells, "
- "and handed " U64_FORMAT " bytes in " U64_FORMAT " cells to the lower"
- " layer.",
- U64_PRINTF_ARG(n_channel_bytes_queued),
- U64_PRINTF_ARG(n_channel_cells_queued),
- U64_PRINTF_ARG(n_channel_bytes_passed_to_lower_layer),
- U64_PRINTF_ARG(n_channel_cells_passed_to_lower_layer));
- tor_log(severity, LD_GENERAL,
- "There are currently " U64_FORMAT " bytes in " U64_FORMAT " cells "
- "in channel queues.",
- U64_PRINTF_ARG(n_channel_bytes_in_queues),
- U64_PRINTF_ARG(n_channel_cells_in_queues));
- tor_log(severity, LD_GENERAL,
"Dumping statistics about %d channels:",
smartlist_len(all_channels));
tor_log(severity, LD_GENERAL,
@@ -3296,7 +2239,7 @@ channel_free_list(smartlist_t *channels, int mark_for_close)
if (!CHANNEL_CONDEMNED(curr)) {
channel_mark_for_close(curr);
}
- channel_force_free(curr);
+ channel_force_xfree(curr);
} else channel_free(curr);
} SMARTLIST_FOREACH_END(curr);
}
@@ -3325,7 +2268,7 @@ channel_listener_free_list(smartlist_t *listeners, int mark_for_close)
curr->state == CHANNEL_LISTENER_STATE_ERROR)) {
channel_listener_mark_for_close(curr);
}
- channel_listener_force_free(curr);
+ channel_listener_force_xfree(curr);
} else channel_listener_free(curr);
} SMARTLIST_FOREACH_END(curr);
}
@@ -3629,19 +2572,6 @@ channel_listener_describe_transport(channel_listener_t *chan_l)
}
/**
- * Return the number of entries in <b>queue</b>
- */
-STATIC int
-chan_cell_queue_len(const chan_cell_queue_t *queue)
-{
- int r = 0;
- cell_queue_entry_t *cell;
- TOR_SIMPLEQ_FOREACH(cell, queue, next)
- ++r;
- return r;
-}
-
-/**
* Dump channel statistics
*
* Dump statistics for one channel to the log
@@ -3676,35 +2606,18 @@ channel_dump_statistics, (channel_t *chan, int severity))
U64_PRINTF_ARG(chan->timestamp_active),
U64_PRINTF_ARG(now - chan->timestamp_active));
- /* Handle digest and nickname */
+ /* Handle digest. */
if (!tor_digest_is_zero(chan->identity_digest)) {
- if (chan->nickname) {
- tor_log(severity, LD_GENERAL,
- " * Channel " U64_FORMAT " says it is connected "
- "to an OR with digest %s and nickname %s",
- U64_PRINTF_ARG(chan->global_identifier),
- hex_str(chan->identity_digest, DIGEST_LEN),
- chan->nickname);
- } else {
- tor_log(severity, LD_GENERAL,
- " * Channel " U64_FORMAT " says it is connected "
- "to an OR with digest %s and no known nickname",
- U64_PRINTF_ARG(chan->global_identifier),
- hex_str(chan->identity_digest, DIGEST_LEN));
- }
+ tor_log(severity, LD_GENERAL,
+ " * Channel " U64_FORMAT " says it is connected "
+ "to an OR with digest %s",
+ U64_PRINTF_ARG(chan->global_identifier),
+ hex_str(chan->identity_digest, DIGEST_LEN));
} else {
- if (chan->nickname) {
- tor_log(severity, LD_GENERAL,
- " * Channel " U64_FORMAT " does not know the digest"
- " of the OR it is connected to, but reports its nickname is %s",
- U64_PRINTF_ARG(chan->global_identifier),
- chan->nickname);
- } else {
- tor_log(severity, LD_GENERAL,
- " * Channel " U64_FORMAT " does not know the digest"
- " or the nickname of the OR it is connected to",
- U64_PRINTF_ARG(chan->global_identifier));
- }
+ tor_log(severity, LD_GENERAL,
+ " * Channel " U64_FORMAT " does not know the digest"
+ " of the OR it is connected to",
+ U64_PRINTF_ARG(chan->global_identifier));
}
/* Handle remote address and descriptions */
@@ -3753,14 +2666,6 @@ channel_dump_statistics, (channel_t *chan, int severity))
channel_is_incoming(chan) ?
"incoming" : "outgoing");
- /* Describe queues */
- tor_log(severity, LD_GENERAL,
- " * Channel " U64_FORMAT " has %d queued incoming cells"
- " and %d queued outgoing cells",
- U64_PRINTF_ARG(chan->global_identifier),
- chan_cell_queue_len(&chan->incoming_queue),
- chan_cell_queue_len(&chan->outgoing_queue));
-
/* Describe circuits */
tor_log(severity, LD_GENERAL,
" * Channel " U64_FORMAT " has %d active circuits out of"
@@ -3779,12 +2684,6 @@ channel_dump_statistics, (channel_t *chan, int severity))
U64_PRINTF_ARG(chan->timestamp_client),
U64_PRINTF_ARG(now - chan->timestamp_client));
tor_log(severity, LD_GENERAL,
- " * Channel " U64_FORMAT " was last drained at "
- U64_FORMAT " (" U64_FORMAT " seconds ago)",
- U64_PRINTF_ARG(chan->global_identifier),
- U64_PRINTF_ARG(chan->timestamp_drained),
- U64_PRINTF_ARG(now - chan->timestamp_drained));
- tor_log(severity, LD_GENERAL,
" * Channel " U64_FORMAT " last received a cell "
"at " U64_FORMAT " (" U64_FORMAT " seconds ago)",
U64_PRINTF_ARG(chan->global_identifier),
@@ -4027,29 +2926,18 @@ channel_get_addr_if_possible(channel_t *chan, tor_addr_t *addr_out)
else return 0;
}
-/**
- * Check if there are outgoing queue writes on this channel
- *
- * Indicate if either we have queued cells, or if not, whether the underlying
- * lower-layer transport thinks it has an output queue.
+/*
+ * Return true iff the channel has any cells on the connection outbuf waiting
+ * to be sent onto the network.
*/
-
int
channel_has_queued_writes(channel_t *chan)
{
- int has_writes = 0;
-
tor_assert(chan);
tor_assert(chan->has_queued_writes);
- if (! TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue)) {
- has_writes = 1;
- } else {
- /* Check with the lower layer */
- has_writes = chan->has_queued_writes(chan);
- }
-
- return has_writes;
+ /* Check with the lower layer */
+ return chan->has_queued_writes(chan);
}
/**
@@ -4274,23 +3162,10 @@ channel_mark_outgoing(channel_t *chan)
***********************/
/*
- * Get the latest estimate for the total queue size of all open channels
- */
-
-uint64_t
-channel_get_global_queue_estimate(void)
-{
- return estimated_total_queue_size;
-}
-
-/*
* Estimate the number of writeable cells
*
- * Ask the lower layer for an estimate of how many cells it can accept, and
- * then subtract the length of our outgoing_queue, if any, to produce an
- * estimate of the number of cells this channel can accept for writes.
+ * Ask the lower layer for an estimate of how many cells it can accept.
*/
-
int
channel_num_cells_writeable(channel_t *chan)
{
@@ -4302,8 +3177,6 @@ channel_num_cells_writeable(channel_t *chan)
if (chan->state == CHANNEL_STATE_OPEN) {
/* Query lower layer */
result = chan->num_cells_writeable(chan);
- /* Subtract cell queue length, if any */
- result -= chan_cell_queue_len(&chan->outgoing_queue);
if (result < 0) result = 0;
} else {
/* No cells are writeable in any other state */
@@ -4427,25 +3300,6 @@ channel_timestamp_client(channel_t *chan)
}
/**
- * Update the last drained timestamp
- *
- * This is called whenever we transmit a cell which leaves the outgoing cell
- * queue completely empty. It also updates the xmit time and the active time.
- */
-
-void
-channel_timestamp_drained(channel_t *chan)
-{
- time_t now = time(NULL);
-
- tor_assert(chan);
-
- chan->timestamp_active = now;
- chan->timestamp_drained = now;
- chan->timestamp_xmit = now;
-}
-
-/**
* Update the recv timestamp
*
* This is called whenever we get an incoming cell from the lower layer.
@@ -4504,54 +3358,6 @@ channel_when_created(channel_t *chan)
}
/**
- * Query created timestamp for a channel listener
- */
-
-time_t
-channel_listener_when_created(channel_listener_t *chan_l)
-{
- tor_assert(chan_l);
-
- return chan_l->timestamp_created;
-}
-
-/**
- * Query last active timestamp for a channel
- */
-
-time_t
-channel_when_last_active(channel_t *chan)
-{
- tor_assert(chan);
-
- return chan->timestamp_active;
-}
-
-/**
- * Query last active timestamp for a channel listener
- */
-
-time_t
-channel_listener_when_last_active(channel_listener_t *chan_l)
-{
- tor_assert(chan_l);
-
- return chan_l->timestamp_active;
-}
-
-/**
- * Query last accepted timestamp for a channel listener
- */
-
-time_t
-channel_listener_when_last_accepted(channel_listener_t *chan_l)
-{
- tor_assert(chan_l);
-
- return chan_l->timestamp_accepted;
-}
-
-/**
* Query client timestamp
*/
@@ -4564,30 +3370,6 @@ channel_when_last_client(channel_t *chan)
}
/**
- * Query drained timestamp
- */
-
-time_t
-channel_when_last_drained(channel_t *chan)
-{
- tor_assert(chan);
-
- return chan->timestamp_drained;
-}
-
-/**
- * Query recv timestamp
- */
-
-time_t
-channel_when_last_recv(channel_t *chan)
-{
- tor_assert(chan);
-
- return chan->timestamp_recv;
-}
-
-/**
* Query xmit timestamp
*/
@@ -4600,42 +3382,6 @@ channel_when_last_xmit(channel_t *chan)
}
/**
- * Query accepted counter
- */
-
-uint64_t
-channel_listener_count_accepted(channel_listener_t *chan_l)
-{
- tor_assert(chan_l);
-
- return chan_l->n_accepted;
-}
-
-/**
- * Query received cell counter
- */
-
-uint64_t
-channel_count_recved(channel_t *chan)
-{
- tor_assert(chan);
-
- return chan->n_cells_recved;
-}
-
-/**
- * Query transmitted cell counter
- */
-
-uint64_t
-channel_count_xmitted(channel_t *chan)
-{
- tor_assert(chan);
-
- return chan->n_cells_xmitted;
-}
-
-/**
* Check if a channel matches an extend_info_t
*
* This function calls the lower layer and asks if this channel matches a
@@ -4726,6 +3472,16 @@ channel_set_circid_type,(channel_t *chan,
}
}
+static int
+channel_sort_by_ed25519_identity(const void **a_, const void **b_)
+{
+ const channel_t *a = *a_,
+ *b = *b_;
+ return fast_memcmp(&a->ed25519_identity.pubkey,
+ &b->ed25519_identity.pubkey,
+ sizeof(a->ed25519_identity.pubkey));
+}
+
/** Helper for channel_update_bad_for_new_circs(): Perform the
* channel_update_bad_for_new_circs operation on all channels in <b>lst</b>,
* all of which MUST have the same RSA ID. (They MAY have different
@@ -4734,44 +3490,52 @@ static void
channel_rsa_id_group_set_badness(struct channel_list_s *lst, int force)
{
/*XXXX This function should really be about channels. 15056 */
- channel_t *chan;
+ channel_t *chan = TOR_LIST_FIRST(lst);
+
+ if (!chan)
+ return;
+
+ /* if there is only one channel, don't bother looping */
+ if (PREDICT_LIKELY(!TOR_LIST_NEXT(chan, next_with_same_id))) {
+ connection_or_single_set_badness_(
+ time(NULL), BASE_CHAN_TO_TLS(chan)->conn, force);
+ return;
+ }
+
+ smartlist_t *channels = smartlist_new();
- /* First, get a minimal list of the ed25519 identites */
- smartlist_t *ed_identities = smartlist_new();
TOR_LIST_FOREACH(chan, lst, next_with_same_id) {
- uint8_t *id_copy =
- tor_memdup(&chan->ed25519_identity.pubkey, DIGEST256_LEN);
- smartlist_add(ed_identities, id_copy);
+ if (BASE_CHAN_TO_TLS(chan)->conn) {
+ smartlist_add(channels, chan);
+ }
}
- smartlist_sort_digests256(ed_identities);
- smartlist_uniq_digests256(ed_identities);
- /* Now, for each Ed identity, build a smartlist and find the best entry on
- * it. */
+ smartlist_sort(channels, channel_sort_by_ed25519_identity);
+
+ const ed25519_public_key_t *common_ed25519_identity = NULL;
+ /* it would be more efficient to do a slice, but this case is rare */
smartlist_t *or_conns = smartlist_new();
- SMARTLIST_FOREACH_BEGIN(ed_identities, const uint8_t *, ed_id) {
- TOR_LIST_FOREACH(chan, lst, next_with_same_id) {
- channel_tls_t *chantls = BASE_CHAN_TO_TLS(chan);
- if (tor_memneq(ed_id, &chan->ed25519_identity.pubkey, DIGEST256_LEN))
- continue;
- or_connection_t *orconn = chantls->conn;
- if (orconn) {
- tor_assert(orconn->chan == chantls);
- smartlist_add(or_conns, orconn);
- }
+ SMARTLIST_FOREACH_BEGIN(channels, channel_t *, channel) {
+ if (!common_ed25519_identity)
+ common_ed25519_identity = &channel->ed25519_identity;
+
+ if (! ed25519_pubkey_eq(&channel->ed25519_identity,
+ common_ed25519_identity)) {
+ connection_or_group_set_badness_(or_conns, force);
+ smartlist_clear(or_conns);
+ common_ed25519_identity = &channel->ed25519_identity;
}
- connection_or_group_set_badness_(or_conns, force);
- smartlist_clear(or_conns);
- } SMARTLIST_FOREACH_END(ed_id);
+ smartlist_add(or_conns, BASE_CHAN_TO_TLS(channel)->conn);
+ } SMARTLIST_FOREACH_END(channel);
+
+ connection_or_group_set_badness_(or_conns, force);
/* XXXX 15056 we may want to do something special with connections that have
* no set Ed25519 identity! */
smartlist_free(or_conns);
-
- SMARTLIST_FOREACH(ed_identities, uint8_t *, ed_id, tor_free(ed_id));
- smartlist_free(ed_identities);
+ smartlist_free(channels);
}
/** Go through all the channels (or if <b>digest</b> is non-NULL, just
@@ -4801,83 +3565,3 @@ channel_update_bad_for_new_circs(const char *digest, int force)
}
}
-/**
- * Update the estimated number of bytes queued to transmit for this channel,
- * and notify the scheduler. The estimate includes both the channel queue and
- * the queue size reported by the lower layer, and an overhead estimate
- * optionally provided by the lower layer.
- */
-
-void
-channel_update_xmit_queue_size(channel_t *chan)
-{
- uint64_t queued, adj;
- double overhead;
-
- tor_assert(chan);
- tor_assert(chan->num_bytes_queued);
-
- /*
- * First, get the number of bytes we have queued without factoring in
- * lower-layer overhead.
- */
- queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue;
- /* Next, adjust by the overhead factor, if any is available */
- if (chan->get_overhead_estimate) {
- overhead = chan->get_overhead_estimate(chan);
- if (overhead >= 1.0) {
- queued = (uint64_t)(queued * overhead);
- } else {
- /* Ignore silly overhead factors */
- log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead);
- }
- }
-
- /* Now, compare to the previous estimate */
- if (queued > chan->bytes_queued_for_xmit) {
- adj = queued - chan->bytes_queued_for_xmit;
- log_debug(LD_CHANNEL,
- "Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT
- " from " U64_FORMAT " to " U64_FORMAT,
- U64_PRINTF_ARG(chan->global_identifier),
- U64_PRINTF_ARG(adj),
- U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
- U64_PRINTF_ARG(queued));
- /* Update the channel's estimate */
- chan->bytes_queued_for_xmit = queued;
-
- /* Update the global queue size estimate if appropriate */
- if (chan->state == CHANNEL_STATE_OPEN ||
- chan->state == CHANNEL_STATE_MAINT) {
- estimated_total_queue_size += adj;
- log_debug(LD_CHANNEL,
- "Increasing global queue size by " U64_FORMAT " for channel "
- U64_FORMAT ", new size is " U64_FORMAT,
- U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
- U64_PRINTF_ARG(estimated_total_queue_size));
- }
- } else if (queued < chan->bytes_queued_for_xmit) {
- adj = chan->bytes_queued_for_xmit - queued;
- log_debug(LD_CHANNEL,
- "Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT
- " from " U64_FORMAT " to " U64_FORMAT,
- U64_PRINTF_ARG(chan->global_identifier),
- U64_PRINTF_ARG(adj),
- U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
- U64_PRINTF_ARG(queued));
- /* Update the channel's estimate */
- chan->bytes_queued_for_xmit = queued;
-
- /* Update the global queue size estimate if appropriate */
- if (chan->state == CHANNEL_STATE_OPEN ||
- chan->state == CHANNEL_STATE_MAINT) {
- estimated_total_queue_size -= adj;
- log_debug(LD_CHANNEL,
- "Decreasing global queue size by " U64_FORMAT " for channel "
- U64_FORMAT ", new size is " U64_FORMAT,
- U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
- U64_PRINTF_ARG(estimated_total_queue_size));
- }
- }
-}
-