diff options
Diffstat (limited to 'src/or/channel.c')
-rw-r--r-- | src/or/channel.c | 722 |
1 files changed, 564 insertions, 158 deletions
diff --git a/src/or/channel.c b/src/or/channel.c index b2b670e4fb..54e10666d2 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -1,9 +1,39 @@ -/* * Copyright (c) 2012-2013, The Tor Project, Inc. */ +/* * Copyright (c) 2012-2016, The Tor Project, Inc. */ /* See LICENSE for licensing information */ /** * \file channel.c - * \brief OR-to-OR channel abstraction layer + * + * \brief OR/OP-to-OR channel abstraction layer. A channel's job is to + * transfer cells from Tor instance to Tor instance. + * Currently, there is only one implementation of the channel abstraction: in + * channeltls.c. + * + * Channels are a higher-level abstraction than or_connection_t: In general, + * any means that two Tor relays use to exchange cells, or any means that a + * relay and a client use to exchange cells, is a channel. + * + * Channels differ from pluggable transports in that they do not wrap an + * underlying protocol over which cells are transmitted: they <em>are</em> the + * underlying protocol. + * + * This module defines the generic parts of the channel_t interface, and + * provides the machinery necessary for specialized implementations to be + * created. At present, there is one specialized implementation in + * channeltls.c, which uses connection_or.c to send cells over a TLS + * connection. + * + * Every channel implementation is responsible for being able to transmit + * cells that are added to it with channel_write_cell() and related functions, + * and to receive incoming cells with the channel_queue_cell() and related + * functions. See the channel_t documentation for more information. + * + * When new cells arrive on a channel, they are passed to cell handler + * functions, which can be set by channel_set_cell_handlers() + * functions. (Tor's cell handlers are in command.c.) + * + * Tor flushes cells to channels from relay.c in + * channel_flush_from_first_active_circuit(). **/ /* @@ -13,6 +43,9 @@ #define TOR_CHANNEL_INTERNAL_ +/* This one's for stuff only channel.c and the test suite should see */ +#define CHANNEL_PRIVATE_ + #include "or.h" #include "channel.h" #include "channeltls.h" @@ -29,29 +62,7 @@ #include "rephist.h" #include "router.h" #include "routerlist.h" - -/* Cell queue structure */ - -typedef struct cell_queue_entry_s cell_queue_entry_t; -struct cell_queue_entry_s { - TOR_SIMPLEQ_ENTRY(cell_queue_entry_s) next; - enum { - CELL_QUEUE_FIXED, - CELL_QUEUE_VAR, - CELL_QUEUE_PACKED - } type; - union { - struct { - cell_t *cell; - } fixed; - struct { - var_cell_t *var_cell; - } var; - struct { - packed_cell_t *packed_cell; - } packed; - } u; -}; +#include "scheduler.h" /* Global lists of channels */ @@ -75,6 +86,59 @@ static smartlist_t *finished_listeners = NULL; /* Counter for ID numbers */ static uint64_t n_channels_allocated = 0; +/* + * Channel global byte/cell counters, for statistics and for scheduler high + * /low-water marks. + */ + +/* + * Total number of cells ever given to any channel with the + * channel_write_*_cell() functions. + */ + +static uint64_t n_channel_cells_queued = 0; + +/* + * Total number of cells ever passed to a channel lower layer with the + * write_*_cell() methods. + */ + +static uint64_t n_channel_cells_passed_to_lower_layer = 0; + +/* + * Current number of cells in all channel queues; should be + * n_channel_cells_queued - n_channel_cells_passed_to_lower_layer. + */ + +static uint64_t n_channel_cells_in_queues = 0; + +/* + * Total number of bytes for all cells ever queued to a channel and + * counted in n_channel_cells_queued. + */ + +static uint64_t n_channel_bytes_queued = 0; + +/* + * Total number of bytes for all cells ever passed to a channel lower layer + * and counted in n_channel_cells_passed_to_lower_layer. + */ + +static uint64_t n_channel_bytes_passed_to_lower_layer = 0; + +/* + * Current number of bytes in all channel queues; should be + * n_channel_bytes_queued - n_channel_bytes_passed_to_lower_layer. + */ + +static uint64_t n_channel_bytes_in_queues = 0; + +/* + * Current total estimated queue size *including lower layer queues and + * transmit overhead* + */ + +STATIC uint64_t estimated_total_queue_size = 0; /* Digest->channel map * @@ -84,7 +148,7 @@ static uint64_t n_channels_allocated = 0; * If more than one channel exists, follow the next_with_same_id pointer * as a linked list. */ -HT_HEAD(channel_idmap, channel_idmap_entry_s) channel_identity_map = +static HT_HEAD(channel_idmap, channel_idmap_entry_s) channel_identity_map = HT_INITIALIZER(); typedef struct channel_idmap_entry_s { @@ -93,13 +157,13 @@ typedef struct channel_idmap_entry_s { TOR_LIST_HEAD(channel_list_s, channel_s) channel_list; } channel_idmap_entry_t; -static INLINE unsigned +static inline unsigned channel_idmap_hash(const channel_idmap_entry_t *ent) { return (unsigned) siphash24g(ent->digest, DIGEST_LEN); } -static INLINE int +static inline int channel_idmap_eq(const channel_idmap_entry_t *a, const channel_idmap_entry_t *b) { @@ -107,12 +171,11 @@ channel_idmap_eq(const channel_idmap_entry_t *a, } HT_PROTOTYPE(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash, - channel_idmap_eq); -HT_GENERATE(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash, - channel_idmap_eq, 0.5, tor_malloc, tor_realloc, tor_free_); + channel_idmap_eq) +HT_GENERATE2(channel_idmap, channel_idmap_entry_s, node, channel_idmap_hash, + channel_idmap_eq, 0.5, tor_reallocarray_, tor_free_) static cell_queue_entry_t * cell_queue_entry_dup(cell_queue_entry_t *q); -static void cell_queue_entry_free(cell_queue_entry_t *q, int handed_off); #if 0 static int cell_queue_entry_is_padding(cell_queue_entry_t *q); #endif @@ -123,6 +186,8 @@ cell_queue_entry_new_var(var_cell_t *var_cell); static int is_destroy_cell(channel_t *chan, const cell_queue_entry_t *q, circid_t *circid_out); +static void channel_assert_counter_consistency(void); + /* Functions to maintain the digest map */ static void channel_add_to_digest_map(channel_t *chan); static void channel_remove_from_digest_map(channel_t *chan); @@ -140,6 +205,8 @@ channel_free_list(smartlist_t *channels, int mark_for_close); static void channel_listener_free_list(smartlist_t *channels, int mark_for_close); static void channel_listener_force_free(channel_listener_t *chan_l); +static size_t channel_get_cell_queue_entry_size(channel_t *chan, + cell_queue_entry_t *q); static void channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q); @@ -378,8 +445,7 @@ channel_register(channel_t *chan) smartlist_add(all_channels, chan); /* Is it finished? */ - if (chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR) { + if (CHANNEL_FINISHED(chan)) { /* Put it in the finished list, creating it if necessary */ if (!finished_channels) finished_channels = smartlist_new(); smartlist_add(finished_channels, chan); @@ -388,7 +454,7 @@ channel_register(channel_t *chan) if (!active_channels) active_channels = smartlist_new(); smartlist_add(active_channels, chan); - if (chan->state != CHANNEL_STATE_CLOSING) { + if (!CHANNEL_IS_CLOSING(chan)) { /* It should have a digest set */ if (!tor_digest_is_zero(chan->identity_digest)) { /* Yeah, we're good, add it to the map */ @@ -423,8 +489,7 @@ channel_unregister(channel_t *chan) if (!(chan->registered)) return; /* Is it finished? */ - if (chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR) { + if (CHANNEL_FINISHED(chan)) { /* Get it out of the finished list */ if (finished_channels) smartlist_remove(finished_channels, chan); } else { @@ -440,9 +505,7 @@ channel_unregister(channel_t *chan) /* Should it be in the digest map? */ if (!tor_digest_is_zero(chan->identity_digest) && - !(chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR)) { + !(CHANNEL_CONDEMNED(chan))) { /* Remove it */ channel_remove_from_digest_map(chan); } @@ -542,9 +605,7 @@ channel_add_to_digest_map(channel_t *chan) tor_assert(chan); /* Assert that the state makes sense */ - tor_assert(!(chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR)); + tor_assert(!CHANNEL_CONDEMNED(chan)); /* Assert that there is a digest */ tor_assert(!tor_digest_is_zero(chan->identity_digest)); @@ -746,6 +807,9 @@ channel_init(channel_t *chan) /* It hasn't been open yet. */ chan->has_been_open = 0; + + /* Scheduler state is idle */ + chan->scheduler_state = SCHED_CHAN_IDLE; } /** @@ -779,8 +843,8 @@ channel_free(channel_t *chan) if (!chan) return; /* It must be closed or errored */ - tor_assert(chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR); + tor_assert(CHANNEL_FINISHED(chan)); + /* It must be deregistered */ tor_assert(!(chan->registered)); @@ -788,6 +852,9 @@ channel_free(channel_t *chan) "Freeing channel " U64_FORMAT " at %p", U64_PRINTF_ARG(chan->global_identifier), chan); + /* Get this one out of the scheduler */ + scheduler_release_channel(chan); + /* * Get rid of cmux policy before we do anything, so cmux policies don't * see channels in weird half-freed states. @@ -797,7 +864,7 @@ channel_free(channel_t *chan) } /* Call a free method if there is one */ - if (chan->free) chan->free(chan); + if (chan->free_fn) chan->free_fn(chan); channel_clear_remote_end(chan); @@ -837,7 +904,7 @@ channel_listener_free(channel_listener_t *chan_l) tor_assert(!(chan_l->registered)); /* Call a free method if there is one */ - if (chan_l->free) chan_l->free(chan_l); + if (chan_l->free_fn) chan_l->free_fn(chan_l); /* * We're in CLOSED or ERROR, so the incoming channel queue is already @@ -863,6 +930,9 @@ channel_force_free(channel_t *chan) "Force-freeing channel " U64_FORMAT " at %p", U64_PRINTF_ARG(chan->global_identifier), chan); + /* Get this one out of the scheduler */ + scheduler_release_channel(chan); + /* * Get rid of cmux policy before we do anything, so cmux policies don't * see channels in weird half-freed states. @@ -872,7 +942,7 @@ channel_force_free(channel_t *chan) } /* Call a free method if there is one */ - if (chan->free) chan->free(chan); + if (chan->free_fn) chan->free_fn(chan); channel_clear_remote_end(chan); @@ -914,7 +984,7 @@ channel_listener_force_free(channel_listener_t *chan_l) chan_l); /* Call a free method if there is one */ - if (chan_l->free) chan_l->free(chan_l); + if (chan_l->free_fn) chan_l->free_fn(chan_l); /* * The incoming list just gets emptied and freed; we request close on @@ -988,9 +1058,7 @@ channel_get_cell_handler(channel_t *chan) { tor_assert(chan); - if (chan->state == CHANNEL_STATE_OPENING || - chan->state == CHANNEL_STATE_OPEN || - chan->state == CHANNEL_STATE_MAINT) + if (CHANNEL_CAN_HANDLE_CELLS(chan)) return chan->cell_handler; return NULL; @@ -1008,9 +1076,7 @@ channel_get_var_cell_handler(channel_t *chan) { tor_assert(chan); - if (chan->state == CHANNEL_STATE_OPENING || - chan->state == CHANNEL_STATE_OPEN || - chan->state == CHANNEL_STATE_MAINT) + if (CHANNEL_CAN_HANDLE_CELLS(chan)) return chan->var_cell_handler; return NULL; @@ -1033,9 +1099,7 @@ channel_set_cell_handlers(channel_t *chan, int try_again = 0; tor_assert(chan); - tor_assert(chan->state == CHANNEL_STATE_OPENING || - chan->state == CHANNEL_STATE_OPEN || - chan->state == CHANNEL_STATE_MAINT); + tor_assert(CHANNEL_CAN_HANDLE_CELLS(chan)); log_debug(LD_CHANNEL, "Setting cell_handler callback for channel %p to %p", @@ -1089,9 +1153,8 @@ channel_mark_for_close(channel_t *chan) tor_assert(chan->close != NULL); /* If it's already in CLOSING, CLOSED or ERROR, this is a no-op */ - if (chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR) return; + if (CHANNEL_CONDEMNED(chan)) + return; log_debug(LD_CHANNEL, "Closing channel %p (global ID " U64_FORMAT ") " @@ -1170,9 +1233,8 @@ channel_close_from_lower_layer(channel_t *chan) tor_assert(chan != NULL); /* If it's already in CLOSING, CLOSED or ERROR, this is a no-op */ - if (chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR) return; + if (CHANNEL_CONDEMNED(chan)) + return; log_debug(LD_CHANNEL, "Closing channel %p (global ID " U64_FORMAT ") " @@ -1230,9 +1292,8 @@ channel_close_for_error(channel_t *chan) tor_assert(chan != NULL); /* If it's already in CLOSING, CLOSED or ERROR, this is a no-op */ - if (chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR) return; + if (CHANNEL_CONDEMNED(chan)) + return; log_debug(LD_CHANNEL, "Closing channel %p due to lower-layer error", @@ -1288,18 +1349,16 @@ void channel_closed(channel_t *chan) { tor_assert(chan); - tor_assert(chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR); + tor_assert(CHANNEL_CONDEMNED(chan)); /* No-op if already inactive */ - if (chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR) return; + if (CHANNEL_FINISHED(chan)) + return; /* Inform any pending (not attached) circs that they should * give up. */ if (! chan->has_been_open) - circuit_n_chan_done(chan, 0); + circuit_n_chan_done(chan, 0, 0); /* Now close all the attached circuits on it. */ circuit_unlink_all_from_channel(chan, END_CIRC_REASON_CHANNEL_CLOSED); @@ -1357,10 +1416,7 @@ channel_clear_identity_digest(channel_t *chan) "global ID " U64_FORMAT, chan, U64_PRINTF_ARG(chan->global_identifier)); - state_not_in_map = - (chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR); + state_not_in_map = CHANNEL_CONDEMNED(chan); if (!state_not_in_map && chan->registered && !tor_digest_is_zero(chan->identity_digest)) @@ -1393,10 +1449,8 @@ channel_set_identity_digest(channel_t *chan, identity_digest ? hex_str(identity_digest, DIGEST_LEN) : "(null)"); - state_not_in_map = - (chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR); + state_not_in_map = CHANNEL_CONDEMNED(chan); + was_in_digest_map = !state_not_in_map && chan->registered && @@ -1446,10 +1500,7 @@ channel_clear_remote_end(channel_t *chan) "global ID " U64_FORMAT, chan, U64_PRINTF_ARG(chan->global_identifier)); - state_not_in_map = - (chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR); + state_not_in_map = CHANNEL_CONDEMNED(chan); if (!state_not_in_map && chan->registered && !tor_digest_is_zero(chan->identity_digest)) @@ -1485,10 +1536,8 @@ channel_set_remote_end(channel_t *chan, identity_digest ? hex_str(identity_digest, DIGEST_LEN) : "(null)"); - state_not_in_map = - (chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR); + state_not_in_map = CHANNEL_CONDEMNED(chan); + was_in_digest_map = !state_not_in_map && chan->registered && @@ -1548,7 +1597,7 @@ cell_queue_entry_dup(cell_queue_entry_t *q) * them) or not (we should free). */ -static void +STATIC void cell_queue_entry_free(cell_queue_entry_t *q, int handed_off) { if (!q) return; @@ -1666,6 +1715,36 @@ cell_queue_entry_new_var(var_cell_t *var_cell) } /** + * Ask how big the cell contained in a cell_queue_entry_t is + */ + +static size_t +channel_get_cell_queue_entry_size(channel_t *chan, cell_queue_entry_t *q) +{ + size_t rv = 0; + + tor_assert(chan); + tor_assert(q); + + switch (q->type) { + case CELL_QUEUE_FIXED: + rv = get_cell_network_size(chan->wide_circ_ids); + break; + case CELL_QUEUE_VAR: + rv = get_var_cell_header_size(chan->wide_circ_ids) + + (q->u.var.var_cell ? q->u.var.var_cell->payload_len : 0); + break; + case CELL_QUEUE_PACKED: + rv = get_cell_network_size(chan->wide_circ_ids); + break; + default: + tor_assert(1); + } + + return rv; +} + +/** * Write to a channel based on a cell_queue_entry_t * * Given a cell_queue_entry_t filled out by the caller, try to send the cell @@ -1677,14 +1756,13 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) { int result = 0, sent = 0; cell_queue_entry_t *tmp = NULL; + size_t cell_bytes; tor_assert(chan); tor_assert(q); /* Assert that the state makes sense for a cell write */ - tor_assert(chan->state == CHANNEL_STATE_OPENING || - chan->state == CHANNEL_STATE_OPEN || - chan->state == CHANNEL_STATE_MAINT); + tor_assert(CHANNEL_CAN_HANDLE_CELLS(chan)); { circid_t circ_id; @@ -1693,9 +1771,12 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) } } + /* For statistical purposes, figure out how big this cell is */ + cell_bytes = channel_get_cell_queue_entry_size(chan, q); + /* Can we send it right out? If so, try */ if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) && - chan->state == CHANNEL_STATE_OPEN) { + CHANNEL_IS_OPEN(chan)) { /* Pick the right write function for this cell type and save the result */ switch (q->type) { case CELL_QUEUE_FIXED: @@ -1726,6 +1807,13 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) channel_timestamp_drained(chan); /* Update the counter */ ++(chan->n_cells_xmitted); + chan->n_bytes_xmitted += cell_bytes; + /* Update global counters */ + ++n_channel_cells_queued; + ++n_channel_cells_passed_to_lower_layer; + n_channel_bytes_queued += cell_bytes; + n_channel_bytes_passed_to_lower_layer += cell_bytes; + channel_assert_counter_consistency(); } } @@ -1737,8 +1825,16 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) */ tmp = cell_queue_entry_dup(q); TOR_SIMPLEQ_INSERT_TAIL(&chan->outgoing_queue, tmp, next); + /* Update global counters */ + ++n_channel_cells_queued; + ++n_channel_cells_in_queues; + n_channel_bytes_queued += cell_bytes; + n_channel_bytes_in_queues += cell_bytes; + channel_assert_counter_consistency(); + /* Update channel queue size */ + chan->bytes_in_queue += cell_bytes; /* Try to process the queue? */ - if (chan->state == CHANNEL_STATE_OPEN) channel_flush_cells(chan); + if (CHANNEL_IS_OPEN(chan)) channel_flush_cells(chan); } } @@ -1759,7 +1855,7 @@ channel_write_cell(channel_t *chan, cell_t *cell) tor_assert(chan); tor_assert(cell); - if (chan->state == CHANNEL_STATE_CLOSING) { + if (CHANNEL_IS_CLOSING(chan)) { log_debug(LD_CHANNEL, "Discarding cell_t %p on closing channel %p with " "global ID "U64_FORMAT, cell, chan, U64_PRINTF_ARG(chan->global_identifier)); @@ -1775,6 +1871,9 @@ channel_write_cell(channel_t *chan, cell_t *cell) q.type = CELL_QUEUE_FIXED; q.u.fixed.cell = cell; channel_write_cell_queue_entry(chan, &q); + + /* Update the queue size estimate */ + channel_update_xmit_queue_size(chan); } /** @@ -1793,7 +1892,7 @@ channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell) tor_assert(chan); tor_assert(packed_cell); - if (chan->state == CHANNEL_STATE_CLOSING) { + if (CHANNEL_IS_CLOSING(chan)) { log_debug(LD_CHANNEL, "Discarding packed_cell_t %p on closing channel %p " "with global ID "U64_FORMAT, packed_cell, chan, U64_PRINTF_ARG(chan->global_identifier)); @@ -1810,6 +1909,9 @@ channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell) q.type = CELL_QUEUE_PACKED; q.u.packed.packed_cell = packed_cell; channel_write_cell_queue_entry(chan, &q); + + /* Update the queue size estimate */ + channel_update_xmit_queue_size(chan); } /** @@ -1829,7 +1931,7 @@ channel_write_var_cell(channel_t *chan, var_cell_t *var_cell) tor_assert(chan); tor_assert(var_cell); - if (chan->state == CHANNEL_STATE_CLOSING) { + if (CHANNEL_IS_CLOSING(chan)) { log_debug(LD_CHANNEL, "Discarding var_cell_t %p on closing channel %p " "with global ID "U64_FORMAT, var_cell, chan, U64_PRINTF_ARG(chan->global_identifier)); @@ -1846,6 +1948,9 @@ channel_write_var_cell(channel_t *chan, var_cell_t *var_cell) q.type = CELL_QUEUE_VAR; q.u.var.var_cell = var_cell; channel_write_cell_queue_entry(chan, &q); + + /* Update the queue size estimate */ + channel_update_xmit_queue_size(chan); } /** @@ -1941,6 +2046,41 @@ channel_change_state(channel_t *chan, channel_state_t to_state) } } + /* + * If we're going to a closed/closing state, we don't need scheduling any + * more; in CHANNEL_STATE_MAINT we can't accept writes. + */ + if (to_state == CHANNEL_STATE_CLOSING || + to_state == CHANNEL_STATE_CLOSED || + to_state == CHANNEL_STATE_ERROR) { + scheduler_release_channel(chan); + } else if (to_state == CHANNEL_STATE_MAINT) { + scheduler_channel_doesnt_want_writes(chan); + } + + /* + * If we're closing, this channel no longer counts toward the global + * estimated queue size; if we're open, it now does. + */ + if ((to_state == CHANNEL_STATE_CLOSING || + to_state == CHANNEL_STATE_CLOSED || + to_state == CHANNEL_STATE_ERROR) && + (from_state == CHANNEL_STATE_OPEN || + from_state == CHANNEL_STATE_MAINT)) { + estimated_total_queue_size -= chan->bytes_in_queue; + } + + /* + * If we're opening, this channel now does count toward the global + * estimated queue size. + */ + if ((to_state == CHANNEL_STATE_OPEN || + to_state == CHANNEL_STATE_MAINT) && + !(from_state == CHANNEL_STATE_OPEN || + from_state == CHANNEL_STATE_MAINT)) { + estimated_total_queue_size += chan->bytes_in_queue; + } + /* Tell circuits if we opened and stuff */ if (to_state == CHANNEL_STATE_OPEN) { channel_do_open_actions(chan); @@ -2056,12 +2196,13 @@ channel_listener_change_state(channel_listener_t *chan_l, #define MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED 256 -ssize_t -channel_flush_some_cells(channel_t *chan, ssize_t num_cells) +MOCK_IMPL(ssize_t, +channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) { unsigned int unlimited = 0; ssize_t flushed = 0; int num_cells_from_circs, clamped_num_cells; + int q_len_before, q_len_after; tor_assert(chan); @@ -2069,7 +2210,7 @@ channel_flush_some_cells(channel_t *chan, ssize_t num_cells) if (!unlimited && num_cells <= flushed) goto done; /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */ - if (chan->state == CHANNEL_STATE_OPEN) { + if (CHANNEL_IS_OPEN(chan)) { /* Try to flush as much as we can that's already queued */ flushed += channel_flush_some_cells_from_outgoing_queue(chan, (unlimited ? -1 : num_cells - flushed)); @@ -2087,14 +2228,45 @@ channel_flush_some_cells(channel_t *chan, ssize_t num_cells) clamped_num_cells = (int)(num_cells - flushed); } } + + /* + * Keep track of the change in queue size; we have to count cells + * channel_flush_from_first_active_circuit() writes out directly, + * but not double-count ones we might get later in + * channel_flush_some_cells_from_outgoing_queue() + */ + q_len_before = chan_cell_queue_len(&(chan->outgoing_queue)); + /* Try to get more cells from any active circuits */ num_cells_from_circs = channel_flush_from_first_active_circuit( chan, clamped_num_cells); - /* If it claims we got some, process the queue again */ + q_len_after = chan_cell_queue_len(&(chan->outgoing_queue)); + + /* + * If it claims we got some, adjust the flushed counter and consider + * processing the queue again + */ if (num_cells_from_circs > 0) { - flushed += channel_flush_some_cells_from_outgoing_queue(chan, - (unlimited ? -1 : num_cells - flushed)); + /* + * Adjust flushed by the number of cells counted in + * num_cells_from_circs that didn't go to the cell queue. + */ + + if (q_len_after > q_len_before) { + num_cells_from_circs -= (q_len_after - q_len_before); + if (num_cells_from_circs < 0) num_cells_from_circs = 0; + } + + flushed += num_cells_from_circs; + + /* Now process the queue if necessary */ + + if ((q_len_after > q_len_before) && + (unlimited || (flushed < num_cells))) { + flushed += channel_flush_some_cells_from_outgoing_queue(chan, + (unlimited ? -1 : num_cells - flushed)); + } } } } @@ -2117,6 +2289,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, unsigned int unlimited = 0; ssize_t flushed = 0; cell_queue_entry_t *q = NULL; + size_t cell_size; + int free_q = 0, handed_off = 0; tor_assert(chan); tor_assert(chan->write_cell); @@ -2127,11 +2301,15 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, if (!unlimited && num_cells <= flushed) return 0; /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */ - if (chan->state == CHANNEL_STATE_OPEN) { + if (CHANNEL_IS_OPEN(chan)) { while ((unlimited || num_cells > flushed) && NULL != (q = TOR_SIMPLEQ_FIRST(&chan->outgoing_queue))) { + free_q = 0; + handed_off = 0; if (1) { + /* Figure out how big it is for statistical purposes */ + cell_size = channel_get_cell_queue_entry_size(chan, q); /* * Okay, we have a good queue entry, try to give it to the lower * layer. @@ -2144,8 +2322,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, ++flushed; channel_timestamp_xmit(chan); ++(chan->n_cells_xmitted); - cell_queue_entry_free(q, 1); - q = NULL; + chan->n_bytes_xmitted += cell_size; + free_q = 1; + handed_off = 1; } /* Else couldn't write it; leave it on the queue */ } else { @@ -2156,8 +2335,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, "(global ID " U64_FORMAT ").", chan, U64_PRINTF_ARG(chan->global_identifier)); /* Throw it away */ - cell_queue_entry_free(q, 0); - q = NULL; + free_q = 1; + handed_off = 0; } break; case CELL_QUEUE_PACKED: @@ -2167,8 +2346,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, ++flushed; channel_timestamp_xmit(chan); ++(chan->n_cells_xmitted); - cell_queue_entry_free(q, 1); - q = NULL; + chan->n_bytes_xmitted += cell_size; + free_q = 1; + handed_off = 1; } /* Else couldn't write it; leave it on the queue */ } else { @@ -2179,8 +2359,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, "(global ID " U64_FORMAT ").", chan, U64_PRINTF_ARG(chan->global_identifier)); /* Throw it away */ - cell_queue_entry_free(q, 0); - q = NULL; + free_q = 1; + handed_off = 0; } break; case CELL_QUEUE_VAR: @@ -2190,8 +2370,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, ++flushed; channel_timestamp_xmit(chan); ++(chan->n_cells_xmitted); - cell_queue_entry_free(q, 1); - q = NULL; + chan->n_bytes_xmitted += cell_size; + free_q = 1; + handed_off = 1; } /* Else couldn't write it; leave it on the queue */ } else { @@ -2202,8 +2383,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, "(global ID " U64_FORMAT ").", chan, U64_PRINTF_ARG(chan->global_identifier)); /* Throw it away */ - cell_queue_entry_free(q, 0); - q = NULL; + free_q = 1; + handed_off = 0; } break; default: @@ -2213,12 +2394,32 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, "(global ID " U64_FORMAT "; ignoring it." " Someone should fix this.", q->type, chan, U64_PRINTF_ARG(chan->global_identifier)); - cell_queue_entry_free(q, 0); - q = NULL; + free_q = 1; + handed_off = 0; } - /* if q got NULLed out, we used it and should remove the queue entry */ - if (!q) TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next); + /* + * if free_q is set, we used it and should remove the queue entry; + * we have to do the free down here so TOR_SIMPLEQ_REMOVE_HEAD isn't + * accessing freed memory + */ + if (free_q) { + TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next); + /* + * ...and we handed a cell off to the lower layer, so we should + * update the counters. + */ + ++n_channel_cells_passed_to_lower_layer; + --n_channel_cells_in_queues; + n_channel_bytes_passed_to_lower_layer += cell_size; + n_channel_bytes_in_queues -= cell_size; + channel_assert_counter_consistency(); + /* Update the channel's queue size too */ + chan->bytes_in_queue -= cell_size; + /* Finally, free q */ + cell_queue_entry_free(q, handed_off); + q = NULL; + } /* No cell removed from list, so we can't go on any further */ else break; } @@ -2230,6 +2431,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, channel_timestamp_drained(chan); } + /* Update the estimate queue size */ + channel_update_xmit_queue_size(chan); + return flushed; } @@ -2352,8 +2556,9 @@ void channel_do_open_actions(channel_t *chan) { tor_addr_t remote_addr; - int started_here, not_using = 0; + int started_here; time_t now = time(NULL); + int close_origin_circuits = 0; tor_assert(chan); @@ -2370,8 +2575,7 @@ channel_do_open_actions(channel_t *chan) log_debug(LD_OR, "New entry guard was reachable, but closing this " "connection so we can retry the earlier entry guards."); - circuit_n_chan_done(chan, 0); - not_using = 1; + close_origin_circuits = 1; } router_set_status(chan->identity_digest, 1); } else { @@ -2379,6 +2583,7 @@ channel_do_open_actions(channel_t *chan) if (!router_get_by_id_digest(chan->identity_digest)) { if (channel_get_addr_if_possible(chan, &remote_addr)) { char *transport_name = NULL; + channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan); if (chan->get_transport_name(chan, &transport_name) < 0) transport_name = NULL; @@ -2386,12 +2591,16 @@ channel_do_open_actions(channel_t *chan) &remote_addr, transport_name, now); tor_free(transport_name); + /* Notify the DoS subsystem of a new client. */ + if (tlschan && tlschan->conn) { + dos_new_client_conn(tlschan->conn); + } } /* Otherwise the underlying transport can't tell us this, so skip it */ } } - if (!not_using) circuit_n_chan_done(chan, 1); + circuit_n_chan_done(chan, 1, close_origin_circuits); } /** @@ -2462,9 +2671,8 @@ channel_process_cells(channel_t *chan) { cell_queue_entry_t *q; tor_assert(chan); - tor_assert(chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_MAINT || - chan->state == CHANNEL_STATE_OPEN); + tor_assert(CHANNEL_IS_CLOSING(chan) || CHANNEL_IS_MAINT(chan) || + CHANNEL_IS_OPEN(chan)); log_debug(LD_CHANNEL, "Processing as many incoming cells as we can for channel %p", @@ -2479,6 +2687,11 @@ channel_process_cells(channel_t *chan) /* * Process cells until we're done or find one we have no current handler * for. + * + * We must free the cells here after calling the handler, since custody + * of the buffer was given to the channel layer when they were queued; + * see comments on memory management in channel_queue_cell() and in + * channel_queue_var_cell() below. */ while (NULL != (q = TOR_SIMPLEQ_FIRST(&chan->incoming_queue))) { tor_assert(q); @@ -2496,6 +2709,7 @@ channel_process_cells(channel_t *chan) q->u.fixed.cell, chan, U64_PRINTF_ARG(chan->global_identifier)); chan->cell_handler(chan, q->u.fixed.cell); + tor_free(q->u.fixed.cell); tor_free(q); } else if (q->type == CELL_QUEUE_VAR && chan->var_cell_handler) { @@ -2508,6 +2722,7 @@ channel_process_cells(channel_t *chan) q->u.var.var_cell, chan, U64_PRINTF_ARG(chan->global_identifier)); chan->var_cell_handler(chan, q->u.var.var_cell); + tor_free(q->u.var.var_cell); tor_free(q); } else { /* Can't handle this one */ @@ -2528,10 +2743,11 @@ channel_queue_cell(channel_t *chan, cell_t *cell) { int need_to_queue = 0; cell_queue_entry_t *q; + cell_t *cell_copy = NULL; tor_assert(chan); tor_assert(cell); - tor_assert(chan->state == CHANNEL_STATE_OPEN); + tor_assert(CHANNEL_IS_OPEN(chan)); /* Do we need to queue it, or can we just call the handler right away? */ if (!(chan->cell_handler)) need_to_queue = 1; @@ -2541,8 +2757,9 @@ channel_queue_cell(channel_t *chan, cell_t *cell) /* Timestamp for receiving */ channel_timestamp_recv(chan); - /* Update the counter */ + /* Update the counters */ ++(chan->n_cells_recved); + chan->n_bytes_recved += get_cell_network_size(chan->wide_circ_ids); /* If we don't need to queue we can just call cell_handler */ if (!need_to_queue) { @@ -2554,8 +2771,19 @@ channel_queue_cell(channel_t *chan, cell_t *cell) U64_PRINTF_ARG(chan->global_identifier)); chan->cell_handler(chan, cell); } else { - /* Otherwise queue it and then process the queue if possible. */ - q = cell_queue_entry_new_fixed(cell); + /* + * Otherwise queue it and then process the queue if possible. + * + * We queue a copy, not the original pointer - it might have been on the + * stack in connection_or_process_cells_from_inbuf() (or another caller + * if we ever have a subclass other than channel_tls_t), or be freed + * there after we return. This is the uncommon case; the non-copying + * fast path occurs in the if (!need_to_queue) case above when the + * upper layer has installed cell handlers. + */ + cell_copy = tor_malloc_zero(sizeof(cell_t)); + memcpy(cell_copy, cell, sizeof(cell_t)); + q = cell_queue_entry_new_fixed(cell_copy); log_debug(LD_CHANNEL, "Queueing incoming cell_t %p for channel %p " "(global ID " U64_FORMAT ")", @@ -2581,10 +2809,11 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell) { int need_to_queue = 0; cell_queue_entry_t *q; + var_cell_t *cell_copy = NULL; tor_assert(chan); tor_assert(var_cell); - tor_assert(chan->state == CHANNEL_STATE_OPEN); + tor_assert(CHANNEL_IS_OPEN(chan)); /* Do we need to queue it, or can we just call the handler right away? */ if (!(chan->var_cell_handler)) need_to_queue = 1; @@ -2596,6 +2825,8 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell) /* Update the counter */ ++(chan->n_cells_recved); + chan->n_bytes_recved += get_var_cell_header_size(chan->wide_circ_ids) + + var_cell->payload_len; /* If we don't need to queue we can just call cell_handler */ if (!need_to_queue) { @@ -2607,8 +2838,18 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell) U64_PRINTF_ARG(chan->global_identifier)); chan->var_cell_handler(chan, var_cell); } else { - /* Otherwise queue it and then process the queue if possible. */ - q = cell_queue_entry_new_var(var_cell); + /* + * Otherwise queue it and then process the queue if possible. + * + * We queue a copy, not the original pointer - it might have been on the + * stack in connection_or_process_cells_from_inbuf() (or another caller + * if we ever have a subclass other than channel_tls_t), or be freed + * there after we return. This is the uncommon case; the non-copying + * fast path occurs in the if (!need_to_queue) case above when the + * upper layer has installed cell handlers. + */ + cell_copy = var_cell_copy(var_cell); + q = cell_queue_entry_new_var(cell_copy); log_debug(LD_CHANNEL, "Queueing incoming var_cell_t %p for channel %p " "(global ID " U64_FORMAT ")", @@ -2645,7 +2886,20 @@ packed_cell_is_destroy(channel_t *chan, return 0; } -/** DOCDOC */ +/** + * Assert that the global channel stats counters are internally consistent + */ + +static void +channel_assert_counter_consistency(void) +{ + tor_assert(n_channel_cells_queued == + (n_channel_cells_in_queues + n_channel_cells_passed_to_lower_layer)); + tor_assert(n_channel_bytes_queued == + (n_channel_bytes_in_queues + n_channel_bytes_passed_to_lower_layer)); +} + +/* DOCDOC */ static int is_destroy_cell(channel_t *chan, const cell_queue_entry_t *q, circid_t *circid_out) @@ -2692,10 +2946,7 @@ channel_send_destroy(circid_t circ_id, channel_t *chan, int reason) } /* Check to make sure we can send on this channel first */ - if (!(chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR) && - chan->cmux) { + if (!CHANNEL_CONDEMNED(chan) && chan->cmux) { channel_note_destroy_pending(chan, circ_id); circuitmux_append_destroy_cell(chan, chan->cmux, circ_id, reason); log_debug(LD_OR, @@ -2727,6 +2978,19 @@ channel_dumpstats(int severity) { if (all_channels && smartlist_len(all_channels) > 0) { tor_log(severity, LD_GENERAL, + "Channels have queued " U64_FORMAT " bytes in " U64_FORMAT " cells, " + "and handed " U64_FORMAT " bytes in " U64_FORMAT " cells to the lower" + " layer.", + U64_PRINTF_ARG(n_channel_bytes_queued), + U64_PRINTF_ARG(n_channel_cells_queued), + U64_PRINTF_ARG(n_channel_bytes_passed_to_lower_layer), + U64_PRINTF_ARG(n_channel_cells_passed_to_lower_layer)); + tor_log(severity, LD_GENERAL, + "There are currently " U64_FORMAT " bytes in " U64_FORMAT " cells " + "in channel queues.", + U64_PRINTF_ARG(n_channel_bytes_in_queues), + U64_PRINTF_ARG(n_channel_cells_in_queues)); + tor_log(severity, LD_GENERAL, "Dumping statistics about %d channels:", smartlist_len(all_channels)); tor_log(severity, LD_GENERAL, @@ -2870,11 +3134,10 @@ channel_free_list(smartlist_t *channels, int mark_for_close) if (curr->cmux) { circuitmux_detach_all_circuits(curr->cmux, NULL); } + SMARTLIST_DEL_CURRENT(channels, curr); channel_unregister(curr); if (mark_for_close) { - if (!(curr->state == CHANNEL_STATE_CLOSING || - curr->state == CHANNEL_STATE_CLOSED || - curr->state == CHANNEL_STATE_ERROR)) { + if (!CHANNEL_CONDEMNED(curr)) { channel_mark_for_close(curr); } channel_force_free(curr); @@ -3088,9 +3351,7 @@ channel_get_for_extend(const char *digest, tor_assert(tor_memeq(chan->identity_digest, digest, DIGEST_LEN)); - if (chan->state == CHANNEL_STATE_CLOSING || - chan->state == CHANNEL_STATE_CLOSED || - chan->state == CHANNEL_STATE_ERROR) + if (CHANNEL_CONDEMNED(chan)) continue; /* Never return a channel on which the other end appears to be @@ -3100,7 +3361,7 @@ channel_get_for_extend(const char *digest, } /* Never return a non-open connection. */ - if (chan->state != CHANNEL_STATE_OPEN) { + if (!CHANNEL_IS_OPEN(chan)) { /* If the address matches, don't launch a new connection for this * circuit. */ if (channel_matches_target_addr_for_extend(chan, target_addr)) @@ -3200,7 +3461,7 @@ channel_listener_describe_transport(channel_listener_t *chan_l) /** * Return the number of entries in <b>queue</b> */ -static int +STATIC int chan_cell_queue_len(const chan_cell_queue_t *queue) { int r = 0; @@ -3216,8 +3477,8 @@ chan_cell_queue_len(const chan_cell_queue_t *queue) * Dump statistics for one channel to the log */ -void -channel_dump_statistics(channel_t *chan, int severity) +MOCK_IMPL(void, +channel_dump_statistics, (channel_t *chan, int severity)) { double avg, interval, age; time_t now = time(NULL); @@ -3280,7 +3541,7 @@ channel_dump_statistics(channel_t *chan, int severity) have_remote_addr = channel_get_addr_if_possible(chan, &remote_addr); if (have_remote_addr) { char *actual = tor_strdup(channel_get_actual_remote_descr(chan)); - remote_addr_str = tor_dup_addr(&remote_addr); + remote_addr_str = tor_addr_to_str_dup(&remote_addr); tor_log(severity, LD_GENERAL, " * Channel " U64_FORMAT " says its remote address" " is %s, and gives a canonical description of \"%s\" and an " @@ -3369,12 +3630,22 @@ channel_dump_statistics(channel_t *chan, int severity) /* Describe counters and rates */ tor_log(severity, LD_GENERAL, " * Channel " U64_FORMAT " has received " - U64_FORMAT " cells and transmitted " U64_FORMAT, + U64_FORMAT " bytes in " U64_FORMAT " cells and transmitted " + U64_FORMAT " bytes in " U64_FORMAT " cells", U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(chan->n_bytes_recved), U64_PRINTF_ARG(chan->n_cells_recved), + U64_PRINTF_ARG(chan->n_bytes_xmitted), U64_PRINTF_ARG(chan->n_cells_xmitted)); if (now > chan->timestamp_created && chan->timestamp_created > 0) { + if (chan->n_bytes_recved > 0) { + avg = (double)(chan->n_bytes_recved) / age; + tor_log(severity, LD_GENERAL, + " * Channel " U64_FORMAT " has averaged %f " + "bytes received per second", + U64_PRINTF_ARG(chan->global_identifier), avg); + } if (chan->n_cells_recved > 0) { avg = (double)(chan->n_cells_recved) / age; if (avg >= 1.0) { @@ -3390,6 +3661,13 @@ channel_dump_statistics(channel_t *chan, int severity) U64_PRINTF_ARG(chan->global_identifier), interval); } } + if (chan->n_bytes_xmitted > 0) { + avg = (double)(chan->n_bytes_xmitted) / age; + tor_log(severity, LD_GENERAL, + " * Channel " U64_FORMAT " has averaged %f " + "bytes transmitted per second", + U64_PRINTF_ARG(chan->global_identifier), avg); + } if (chan->n_cells_xmitted > 0) { avg = (double)(chan->n_cells_xmitted) / age; if (avg >= 1.0) { @@ -3567,8 +3845,8 @@ channel_get_canonical_remote_descr(channel_t *chan) * supports this operation, and return 1. Return 0 if the underlying transport * doesn't let us do this. */ -int -channel_get_addr_if_possible(channel_t *chan, tor_addr_t *addr_out) +MOCK_IMPL(int, +channel_get_addr_if_possible,(channel_t *chan, tor_addr_t *addr_out)) { tor_assert(chan); tor_assert(addr_out); @@ -3807,6 +4085,50 @@ channel_mark_outgoing(channel_t *chan) chan->is_incoming = 0; } +/************************ + * Flow control queries * + ***********************/ + +/* + * Get the latest estimate for the total queue size of all open channels + */ + +uint64_t +channel_get_global_queue_estimate(void) +{ + return estimated_total_queue_size; +} + +/* + * Estimate the number of writeable cells + * + * Ask the lower layer for an estimate of how many cells it can accept, and + * then subtract the length of our outgoing_queue, if any, to produce an + * estimate of the number of cells this channel can accept for writes. + */ + +int +channel_num_cells_writeable(channel_t *chan) +{ + int result; + + tor_assert(chan); + tor_assert(chan->num_cells_writeable); + + if (chan->state == CHANNEL_STATE_OPEN) { + /* Query lower layer */ + result = chan->num_cells_writeable(chan); + /* Subtract cell queue length, if any */ + result -= chan_cell_queue_len(&chan->outgoing_queue); + if (result < 0) result = 0; + } else { + /* No cells are writeable in any other state */ + result = 0; + } + + return result; +} + /********************* * Timestamp updates * ********************/ @@ -4175,10 +4497,10 @@ channel_num_circuits(channel_t *chan) * This is called when setting up a channel and replaces the old * connection_or_set_circid_type() */ -void -channel_set_circid_type(channel_t *chan, - crypto_pk_t *identity_rcvd, - int consider_identity) +MOCK_IMPL(void, +channel_set_circid_type,(channel_t *chan, + crypto_pk_t *identity_rcvd, + int consider_identity)) { int started_here; crypto_pk_t *our_identity; @@ -4209,3 +4531,87 @@ channel_set_circid_type(channel_t *chan, } } +/** + * Update the estimated number of bytes queued to transmit for this channel, + * and notify the scheduler. The estimate includes both the channel queue and + * the queue size reported by the lower layer, and an overhead estimate + * optionally provided by the lower layer. + */ + +void +channel_update_xmit_queue_size(channel_t *chan) +{ + uint64_t queued, adj; + double overhead; + + tor_assert(chan); + tor_assert(chan->num_bytes_queued); + + /* + * First, get the number of bytes we have queued without factoring in + * lower-layer overhead. + */ + queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue; + /* Next, adjust by the overhead factor, if any is available */ + if (chan->get_overhead_estimate) { + overhead = chan->get_overhead_estimate(chan); + if (overhead >= 1.0) { + queued = (uint64_t)(queued * overhead); + } else { + /* Ignore silly overhead factors */ + log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead); + } + } + + /* Now, compare to the previous estimate */ + if (queued > chan->bytes_queued_for_xmit) { + adj = queued - chan->bytes_queued_for_xmit; + log_debug(LD_CHANNEL, + "Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT + " from " U64_FORMAT " to " U64_FORMAT, + U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(adj), + U64_PRINTF_ARG(chan->bytes_queued_for_xmit), + U64_PRINTF_ARG(queued)); + /* Update the channel's estimate */ + chan->bytes_queued_for_xmit = queued; + + /* Update the global queue size estimate if appropriate */ + if (chan->state == CHANNEL_STATE_OPEN || + chan->state == CHANNEL_STATE_MAINT) { + estimated_total_queue_size += adj; + log_debug(LD_CHANNEL, + "Increasing global queue size by " U64_FORMAT " for channel " + U64_FORMAT ", new size is " U64_FORMAT, + U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(estimated_total_queue_size)); + /* Tell the scheduler we're increasing the queue size */ + scheduler_adjust_queue_size(chan, 1, adj); + } + } else if (queued < chan->bytes_queued_for_xmit) { + adj = chan->bytes_queued_for_xmit - queued; + log_debug(LD_CHANNEL, + "Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT + " from " U64_FORMAT " to " U64_FORMAT, + U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(adj), + U64_PRINTF_ARG(chan->bytes_queued_for_xmit), + U64_PRINTF_ARG(queued)); + /* Update the channel's estimate */ + chan->bytes_queued_for_xmit = queued; + + /* Update the global queue size estimate if appropriate */ + if (chan->state == CHANNEL_STATE_OPEN || + chan->state == CHANNEL_STATE_MAINT) { + estimated_total_queue_size -= adj; + log_debug(LD_CHANNEL, + "Decreasing global queue size by " U64_FORMAT " for channel " + U64_FORMAT ", new size is " U64_FORMAT, + U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(estimated_total_queue_size)); + /* Tell the scheduler we're decreasing the queue size */ + scheduler_adjust_queue_size(chan, -1, adj); + } + } +} + |