diff options
39 files changed, 4558 insertions, 177 deletions
diff --git a/changes/global_scheduler b/changes/global_scheduler new file mode 100644 index 0000000000..d0b8305ceb --- /dev/null +++ b/changes/global_scheduler @@ -0,0 +1,4 @@ + o Major changes: + - Implement a new inter-cmux comparison API, a global high/low watermark + mechanism and a global scheduler loop for transmission prioritization + across all channels as well as among circuits on one channel. diff --git a/scripts/maint/checkSpace.pl b/scripts/maint/checkSpace.pl index b529103367..c785d89567 100755 --- a/scripts/maint/checkSpace.pl +++ b/scripts/maint/checkSpace.pl @@ -128,7 +128,8 @@ for $fn (@ARGV) { if ($1 ne "if" and $1 ne "while" and $1 ne "for" and $1 ne "switch" and $1 ne "return" and $1 ne "int" and $1 ne "elsif" and $1 ne "WINAPI" and $2 ne "WINAPI" and - $1 ne "void" and $1 ne "__attribute__" and $1 ne "op") { + $1 ne "void" and $1 ne "__attribute__" and $1 ne "op" and + $1 ne "size_t" and $1 ne "double") { print " fn ():$fn:$.\n"; } } diff --git a/src/common/compat_libevent.c b/src/common/compat_libevent.c index 4cfe5cc93b..85ed58456e 100644 --- a/src/common/compat_libevent.c +++ b/src/common/compat_libevent.c @@ -283,8 +283,8 @@ tor_libevent_initialize(tor_libevent_cfg *torcfg) } /** Return the current Libevent event base that we're set up to use. */ -struct event_base * -tor_libevent_get_base(void) +MOCK_IMPL(struct event_base *, +tor_libevent_get_base, (void)) { return the_event_base; } diff --git a/src/common/compat_libevent.h b/src/common/compat_libevent.h index c5c78b822d..57d0c4da1b 100644 --- a/src/common/compat_libevent.h +++ b/src/common/compat_libevent.h @@ -72,7 +72,7 @@ typedef struct tor_libevent_cfg { } tor_libevent_cfg; void tor_libevent_initialize(tor_libevent_cfg *cfg); -struct event_base *tor_libevent_get_base(void); +MOCK_DECL(struct event_base *, tor_libevent_get_base, (void)); const char *tor_libevent_get_method(void); void tor_check_libevent_version(const char *m, int server, const char **badness_out); diff --git a/src/common/torlog.h b/src/common/torlog.h index fa7266c199..483a97935f 100644 --- a/src/common/torlog.h +++ b/src/common/torlog.h @@ -97,8 +97,10 @@ #define LD_HEARTBEAT (1u<<20) /** Abstract channel_t code */ #define LD_CHANNEL (1u<<21) +/** Scheduler */ +#define LD_SCHED (1u<<22) /** Number of logging domains in the code. */ -#define N_LOGGING_DOMAINS 22 +#define N_LOGGING_DOMAINS 23 /** This log message is not safe to send to a callback-based logger * immediately. Used as a flag, not a log domain. */ diff --git a/src/or/Makefile.nmake b/src/or/Makefile.nmake index 523bf3306b..2ac98cd372 100644 --- a/src/or/Makefile.nmake +++ b/src/or/Makefile.nmake @@ -63,6 +63,7 @@ LIBTOR_OBJECTS = \ routerlist.obj \ routerparse.obj \ routerset.obj \ + scheduler.obj \ statefile.obj \ status.obj \ transports.obj diff --git a/src/or/buffers.c b/src/or/buffers.c index bd33fe451d..4cdc03bc03 100644 --- a/src/or/buffers.c +++ b/src/or/buffers.c @@ -562,8 +562,8 @@ buf_clear(buf_t *buf) } /** Return the number of bytes stored in <b>buf</b> */ -size_t -buf_datalen(const buf_t *buf) +MOCK_IMPL(size_t, +buf_datalen, (const buf_t *buf)) { return buf->datalen; } diff --git a/src/or/buffers.h b/src/or/buffers.h index 9c2c7d0e9d..4687fbefd7 100644 --- a/src/or/buffers.h +++ b/src/or/buffers.h @@ -24,7 +24,7 @@ void buf_shrink(buf_t *buf); size_t buf_shrink_freelists(int free_all); void buf_dump_freelist_sizes(int severity); -size_t buf_datalen(const buf_t *buf); +MOCK_DECL(size_t, buf_datalen, (const buf_t *buf)); size_t buf_allocation(const buf_t *buf); size_t buf_slack(const buf_t *buf); diff --git a/src/or/channel.c b/src/or/channel.c index 13a122662a..5ee0748802 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -13,6 +13,9 @@ #define TOR_CHANNEL_INTERNAL_ +/* This one's for stuff only channel.c and the test suite should see */ +#define CHANNEL_PRIVATE_ + #include "or.h" #include "channel.h" #include "channeltls.h" @@ -29,29 +32,7 @@ #include "rephist.h" #include "router.h" #include "routerlist.h" - -/* Cell queue structure */ - -typedef struct cell_queue_entry_s cell_queue_entry_t; -struct cell_queue_entry_s { - TOR_SIMPLEQ_ENTRY(cell_queue_entry_s) next; - enum { - CELL_QUEUE_FIXED, - CELL_QUEUE_VAR, - CELL_QUEUE_PACKED - } type; - union { - struct { - cell_t *cell; - } fixed; - struct { - var_cell_t *var_cell; - } var; - struct { - packed_cell_t *packed_cell; - } packed; - } u; -}; +#include "scheduler.h" /* Global lists of channels */ @@ -76,6 +57,60 @@ static smartlist_t *finished_listeners = NULL; /* Counter for ID numbers */ static uint64_t n_channels_allocated = 0; +/* + * Channel global byte/cell counters, for statistics and for scheduler high + * /low-water marks. + */ + +/* + * Total number of cells ever given to any channel with the + * channel_write_*_cell() functions. + */ + +static uint64_t n_channel_cells_queued = 0; + +/* + * Total number of cells ever passed to a channel lower layer with the + * write_*_cell() methods. + */ + +static uint64_t n_channel_cells_passed_to_lower_layer = 0; + +/* + * Current number of cells in all channel queues; should be + * n_channel_cells_queued - n_channel_cells_passed_to_lower_layer. + */ + +static uint64_t n_channel_cells_in_queues = 0; + +/* + * Total number of bytes for all cells ever queued to a channel and + * counted in n_channel_cells_queued. + */ + +static uint64_t n_channel_bytes_queued = 0; + +/* + * Total number of bytes for all cells ever passed to a channel lower layer + * and counted in n_channel_cells_passed_to_lower_layer. + */ + +static uint64_t n_channel_bytes_passed_to_lower_layer = 0; + +/* + * Current number of bytes in all channel queues; should be + * n_channel_bytes_queued - n_channel_bytes_passed_to_lower_layer. + */ + +static uint64_t n_channel_bytes_in_queues = 0; + +/* + * Current total estimated queue size *including lower layer queues and + * transmit overhead* + */ + +STATIC uint64_t estimated_total_queue_size = 0; + /* Digest->channel map * * Similar to the one used in connection_or.c, this maps from the identity @@ -123,6 +158,8 @@ cell_queue_entry_new_var(var_cell_t *var_cell); static int is_destroy_cell(channel_t *chan, const cell_queue_entry_t *q, circid_t *circid_out); +static void channel_assert_counter_consistency(void); + /* Functions to maintain the digest map */ static void channel_add_to_digest_map(channel_t *chan); static void channel_remove_from_digest_map(channel_t *chan); @@ -140,6 +177,8 @@ channel_free_list(smartlist_t *channels, int mark_for_close); static void channel_listener_free_list(smartlist_t *channels, int mark_for_close); static void channel_listener_force_free(channel_listener_t *chan_l); +static size_t channel_get_cell_queue_entry_size(channel_t *chan, + cell_queue_entry_t *q); static void channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q); @@ -746,6 +785,9 @@ channel_init(channel_t *chan) /* It hasn't been open yet. */ chan->has_been_open = 0; + + /* Scheduler state is idle */ + chan->scheduler_state = SCHED_CHAN_IDLE; } /** @@ -788,6 +830,9 @@ channel_free(channel_t *chan) "Freeing channel " U64_FORMAT " at %p", U64_PRINTF_ARG(chan->global_identifier), chan); + /* Get this one out of the scheduler */ + scheduler_release_channel(chan); + /* * Get rid of cmux policy before we do anything, so cmux policies don't * see channels in weird half-freed states. @@ -863,6 +908,9 @@ channel_force_free(channel_t *chan) "Force-freeing channel " U64_FORMAT " at %p", U64_PRINTF_ARG(chan->global_identifier), chan); + /* Get this one out of the scheduler */ + scheduler_release_channel(chan); + /* * Get rid of cmux policy before we do anything, so cmux policies don't * see channels in weird half-freed states. @@ -1666,6 +1714,36 @@ cell_queue_entry_new_var(var_cell_t *var_cell) } /** + * Ask how big the cell contained in a cell_queue_entry_t is + */ + +static size_t +channel_get_cell_queue_entry_size(channel_t *chan, cell_queue_entry_t *q) +{ + size_t rv = 0; + + tor_assert(chan); + tor_assert(q); + + switch (q->type) { + case CELL_QUEUE_FIXED: + rv = get_cell_network_size(chan->wide_circ_ids); + break; + case CELL_QUEUE_VAR: + rv = get_var_cell_header_size(chan->wide_circ_ids) + + (q->u.var.var_cell ? q->u.var.var_cell->payload_len : 0); + break; + case CELL_QUEUE_PACKED: + rv = get_cell_network_size(chan->wide_circ_ids); + break; + default: + tor_assert(1); + } + + return rv; +} + +/** * Write to a channel based on a cell_queue_entry_t * * Given a cell_queue_entry_t filled out by the caller, try to send the cell @@ -1677,6 +1755,7 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) { int result = 0, sent = 0; cell_queue_entry_t *tmp = NULL; + size_t cell_bytes; tor_assert(chan); tor_assert(q); @@ -1693,6 +1772,9 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) } } + /* For statistical purposes, figure out how big this cell is */ + cell_bytes = channel_get_cell_queue_entry_size(chan, q); + /* Can we send it right out? If so, try */ if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) && chan->state == CHANNEL_STATE_OPEN) { @@ -1726,6 +1808,13 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) channel_timestamp_drained(chan); /* Update the counter */ ++(chan->n_cells_xmitted); + chan->n_bytes_xmitted += cell_bytes; + /* Update global counters */ + ++n_channel_cells_queued; + ++n_channel_cells_passed_to_lower_layer; + n_channel_bytes_queued += cell_bytes; + n_channel_bytes_passed_to_lower_layer += cell_bytes; + channel_assert_counter_consistency(); } } @@ -1737,6 +1826,14 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) */ tmp = cell_queue_entry_dup(q); TOR_SIMPLEQ_INSERT_TAIL(&chan->outgoing_queue, tmp, next); + /* Update global counters */ + ++n_channel_cells_queued; + ++n_channel_cells_in_queues; + n_channel_bytes_queued += cell_bytes; + n_channel_bytes_in_queues += cell_bytes; + channel_assert_counter_consistency(); + /* Update channel queue size */ + chan->bytes_in_queue += cell_bytes; /* Try to process the queue? */ if (chan->state == CHANNEL_STATE_OPEN) channel_flush_cells(chan); } @@ -1775,6 +1872,9 @@ channel_write_cell(channel_t *chan, cell_t *cell) q.type = CELL_QUEUE_FIXED; q.u.fixed.cell = cell; channel_write_cell_queue_entry(chan, &q); + + /* Update the queue size estimate */ + channel_update_xmit_queue_size(chan); } /** @@ -1810,6 +1910,9 @@ channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell) q.type = CELL_QUEUE_PACKED; q.u.packed.packed_cell = packed_cell; channel_write_cell_queue_entry(chan, &q); + + /* Update the queue size estimate */ + channel_update_xmit_queue_size(chan); } /** @@ -1846,6 +1949,9 @@ channel_write_var_cell(channel_t *chan, var_cell_t *var_cell) q.type = CELL_QUEUE_VAR; q.u.var.var_cell = var_cell; channel_write_cell_queue_entry(chan, &q); + + /* Update the queue size estimate */ + channel_update_xmit_queue_size(chan); } /** @@ -1941,6 +2047,41 @@ channel_change_state(channel_t *chan, channel_state_t to_state) } } + /* + * If we're going to a closed/closing state, we don't need scheduling any + * more; in CHANNEL_STATE_MAINT we can't accept writes. + */ + if (to_state == CHANNEL_STATE_CLOSING || + to_state == CHANNEL_STATE_CLOSED || + to_state == CHANNEL_STATE_ERROR) { + scheduler_release_channel(chan); + } else if (to_state == CHANNEL_STATE_MAINT) { + scheduler_channel_doesnt_want_writes(chan); + } + + /* + * If we're closing, this channel no longer counts toward the global + * estimated queue size; if we're open, it now does. + */ + if ((to_state == CHANNEL_STATE_CLOSING || + to_state == CHANNEL_STATE_CLOSED || + to_state == CHANNEL_STATE_ERROR) && + (from_state == CHANNEL_STATE_OPEN || + from_state == CHANNEL_STATE_MAINT)) { + estimated_total_queue_size -= chan->bytes_in_queue; + } + + /* + * If we're opening, this channel now does count toward the global + * estimated queue size. + */ + if ((to_state == CHANNEL_STATE_OPEN || + to_state == CHANNEL_STATE_MAINT) && + !(from_state == CHANNEL_STATE_OPEN || + from_state == CHANNEL_STATE_MAINT)) { + estimated_total_queue_size += chan->bytes_in_queue; + } + /* Tell circuits if we opened and stuff */ if (to_state == CHANNEL_STATE_OPEN) { channel_do_open_actions(chan); @@ -2056,12 +2197,13 @@ channel_listener_change_state(channel_listener_t *chan_l, #define MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED 256 -ssize_t -channel_flush_some_cells(channel_t *chan, ssize_t num_cells) +MOCK_IMPL(ssize_t, +channel_flush_some_cells, (channel_t *chan, ssize_t num_cells)) { unsigned int unlimited = 0; ssize_t flushed = 0; int num_cells_from_circs, clamped_num_cells; + int q_len_before, q_len_after; tor_assert(chan); @@ -2087,14 +2229,45 @@ channel_flush_some_cells(channel_t *chan, ssize_t num_cells) clamped_num_cells = (int)(num_cells - flushed); } } + + /* + * Keep track of the change in queue size; we have to count cells + * channel_flush_from_first_active_circuit() writes out directly, + * but not double-count ones we might get later in + * channel_flush_some_cells_from_outgoing_queue() + */ + q_len_before = chan_cell_queue_len(&(chan->outgoing_queue)); + /* Try to get more cells from any active circuits */ num_cells_from_circs = channel_flush_from_first_active_circuit( chan, clamped_num_cells); - /* If it claims we got some, process the queue again */ + q_len_after = chan_cell_queue_len(&(chan->outgoing_queue)); + + /* + * If it claims we got some, adjust the flushed counter and consider + * processing the queue again + */ if (num_cells_from_circs > 0) { - flushed += channel_flush_some_cells_from_outgoing_queue(chan, - (unlimited ? -1 : num_cells - flushed)); + /* + * Adjust flushed by the number of cells counted in + * num_cells_from_circs that didn't go to the cell queue. + */ + + if (q_len_after > q_len_before) { + num_cells_from_circs -= (q_len_after - q_len_before); + if (num_cells_from_circs < 0) num_cells_from_circs = 0; + } + + flushed += num_cells_from_circs; + + /* Now process the queue if necessary */ + + if ((q_len_after > q_len_before) && + (unlimited || (flushed < num_cells))) { + flushed += channel_flush_some_cells_from_outgoing_queue(chan, + (unlimited ? -1 : num_cells - flushed)); + } } } } @@ -2117,6 +2290,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, unsigned int unlimited = 0; ssize_t flushed = 0; cell_queue_entry_t *q = NULL; + size_t cell_size; + int free_q = 0, handed_off = 0; tor_assert(chan); tor_assert(chan->write_cell); @@ -2130,8 +2305,12 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, if (chan->state == CHANNEL_STATE_OPEN) { while ((unlimited || num_cells > flushed) && NULL != (q = TOR_SIMPLEQ_FIRST(&chan->outgoing_queue))) { + free_q = 0; + handed_off = 0; if (1) { + /* Figure out how big it is for statistical purposes */ + cell_size = channel_get_cell_queue_entry_size(chan, q); /* * Okay, we have a good queue entry, try to give it to the lower * layer. @@ -2144,8 +2323,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, ++flushed; channel_timestamp_xmit(chan); ++(chan->n_cells_xmitted); - cell_queue_entry_free(q, 1); - q = NULL; + chan->n_bytes_xmitted += cell_size; + free_q = 1; + handed_off = 1; } /* Else couldn't write it; leave it on the queue */ } else { @@ -2156,8 +2336,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, "(global ID " U64_FORMAT ").", chan, U64_PRINTF_ARG(chan->global_identifier)); /* Throw it away */ - cell_queue_entry_free(q, 0); - q = NULL; + free_q = 1; + handed_off = 0; } break; case CELL_QUEUE_PACKED: @@ -2167,8 +2347,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, ++flushed; channel_timestamp_xmit(chan); ++(chan->n_cells_xmitted); - cell_queue_entry_free(q, 1); - q = NULL; + chan->n_bytes_xmitted += cell_size; + free_q = 1; + handed_off = 1; } /* Else couldn't write it; leave it on the queue */ } else { @@ -2179,8 +2360,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, "(global ID " U64_FORMAT ").", chan, U64_PRINTF_ARG(chan->global_identifier)); /* Throw it away */ - cell_queue_entry_free(q, 0); - q = NULL; + free_q = 1; + handed_off = 0; } break; case CELL_QUEUE_VAR: @@ -2190,8 +2371,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, ++flushed; channel_timestamp_xmit(chan); ++(chan->n_cells_xmitted); - cell_queue_entry_free(q, 1); - q = NULL; + chan->n_bytes_xmitted += cell_size; + free_q = 1; + handed_off = 1; } /* Else couldn't write it; leave it on the queue */ } else { @@ -2202,8 +2384,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, "(global ID " U64_FORMAT ").", chan, U64_PRINTF_ARG(chan->global_identifier)); /* Throw it away */ - cell_queue_entry_free(q, 0); - q = NULL; + free_q = 1; + handed_off = 0; } break; default: @@ -2213,12 +2395,32 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, "(global ID " U64_FORMAT "; ignoring it." " Someone should fix this.", q->type, chan, U64_PRINTF_ARG(chan->global_identifier)); - cell_queue_entry_free(q, 0); - q = NULL; + free_q = 1; + handed_off = 0; } - /* if q got NULLed out, we used it and should remove the queue entry */ - if (!q) TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next); + /* + * if free_q is set, we used it and should remove the queue entry; + * we have to do the free down here so TOR_SIMPLEQ_REMOVE_HEAD isn't + * accessing freed memory + */ + if (free_q) { + TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next); + /* + * ...and we handed a cell off to the lower layer, so we should + * update the counters. + */ + ++n_channel_cells_passed_to_lower_layer; + --n_channel_cells_in_queues; + n_channel_bytes_passed_to_lower_layer += cell_size; + n_channel_bytes_in_queues -= cell_size; + channel_assert_counter_consistency(); + /* Update the channel's queue size too */ + chan->bytes_in_queue -= cell_size; + /* Finally, free q */ + cell_queue_entry_free(q, handed_off); + q = NULL; + } /* No cell removed from list, so we can't go on any further */ else break; } @@ -2230,6 +2432,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, channel_timestamp_drained(chan); } + /* Update the estimate queue size */ + channel_update_xmit_queue_size(chan); + return flushed; } @@ -2541,8 +2746,9 @@ channel_queue_cell(channel_t *chan, cell_t *cell) /* Timestamp for receiving */ channel_timestamp_recv(chan); - /* Update the counter */ + /* Update the counters */ ++(chan->n_cells_recved); + chan->n_bytes_recved += get_cell_network_size(chan->wide_circ_ids); /* If we don't need to queue we can just call cell_handler */ if (!need_to_queue) { @@ -2596,6 +2802,8 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell) /* Update the counter */ ++(chan->n_cells_recved); + chan->n_bytes_recved += get_var_cell_header_size(chan->wide_circ_ids) + + var_cell->payload_len; /* If we don't need to queue we can just call cell_handler */ if (!need_to_queue) { @@ -2645,6 +2853,19 @@ packed_cell_is_destroy(channel_t *chan, return 0; } +/** + * Assert that the global channel stats counters are internally consistent + */ + +static void +channel_assert_counter_consistency(void) +{ + tor_assert(n_channel_cells_queued == + (n_channel_cells_in_queues + n_channel_cells_passed_to_lower_layer)); + tor_assert(n_channel_bytes_queued == + (n_channel_bytes_in_queues + n_channel_bytes_passed_to_lower_layer)); +} + /** DOCDOC */ static int is_destroy_cell(channel_t *chan, @@ -2727,6 +2948,19 @@ channel_dumpstats(int severity) { if (all_channels && smartlist_len(all_channels) > 0) { tor_log(severity, LD_GENERAL, + "Channels have queued " U64_FORMAT " bytes in " U64_FORMAT " cells, " + "and handed " U64_FORMAT " bytes in " U64_FORMAT " cells to the lower" + " layer.", + U64_PRINTF_ARG(n_channel_bytes_queued), + U64_PRINTF_ARG(n_channel_cells_queued), + U64_PRINTF_ARG(n_channel_bytes_passed_to_lower_layer), + U64_PRINTF_ARG(n_channel_cells_passed_to_lower_layer)); + tor_log(severity, LD_GENERAL, + "There are currently " U64_FORMAT " bytes in " U64_FORMAT " cells " + "in channel queues.", + U64_PRINTF_ARG(n_channel_bytes_in_queues), + U64_PRINTF_ARG(n_channel_cells_in_queues)); + tor_log(severity, LD_GENERAL, "Dumping statistics about %d channels:", smartlist_len(all_channels)); tor_log(severity, LD_GENERAL, @@ -3200,7 +3434,7 @@ channel_listener_describe_transport(channel_listener_t *chan_l) /** * Return the number of entries in <b>queue</b> */ -static int +STATIC int chan_cell_queue_len(const chan_cell_queue_t *queue) { int r = 0; @@ -3216,8 +3450,8 @@ chan_cell_queue_len(const chan_cell_queue_t *queue) * Dump statistics for one channel to the log */ -void -channel_dump_statistics(channel_t *chan, int severity) +MOCK_IMPL(void, +channel_dump_statistics, (channel_t *chan, int severity)) { double avg, interval, age; time_t now = time(NULL); @@ -3369,12 +3603,22 @@ channel_dump_statistics(channel_t *chan, int severity) /* Describe counters and rates */ tor_log(severity, LD_GENERAL, " * Channel " U64_FORMAT " has received " - U64_FORMAT " cells and transmitted " U64_FORMAT, + U64_FORMAT " bytes in " U64_FORMAT " cells and transmitted " + U64_FORMAT " bytes in " U64_FORMAT " cells", U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(chan->n_bytes_recved), U64_PRINTF_ARG(chan->n_cells_recved), + U64_PRINTF_ARG(chan->n_bytes_xmitted), U64_PRINTF_ARG(chan->n_cells_xmitted)); if (now > chan->timestamp_created && chan->timestamp_created > 0) { + if (chan->n_bytes_recved > 0) { + avg = (double)(chan->n_bytes_recved) / age; + tor_log(severity, LD_GENERAL, + " * Channel " U64_FORMAT " has averaged %f " + "bytes received per second", + U64_PRINTF_ARG(chan->global_identifier), avg); + } if (chan->n_cells_recved > 0) { avg = (double)(chan->n_cells_recved) / age; if (avg >= 1.0) { @@ -3390,6 +3634,13 @@ channel_dump_statistics(channel_t *chan, int severity) U64_PRINTF_ARG(chan->global_identifier), interval); } } + if (chan->n_bytes_xmitted > 0) { + avg = (double)(chan->n_bytes_xmitted) / age; + tor_log(severity, LD_GENERAL, + " * Channel " U64_FORMAT " has averaged %f " + "bytes transmitted per second", + U64_PRINTF_ARG(chan->global_identifier), avg); + } if (chan->n_cells_xmitted > 0) { avg = (double)(chan->n_cells_xmitted) / age; if (avg >= 1.0) { @@ -3807,6 +4058,50 @@ channel_mark_outgoing(channel_t *chan) chan->is_incoming = 0; } +/************************ + * Flow control queries * + ***********************/ + +/* + * Get the latest estimate for the total queue size of all open channels + */ + +uint64_t +channel_get_global_queue_estimate(void) +{ + return estimated_total_queue_size; +} + +/* + * Estimate the number of writeable cells + * + * Ask the lower layer for an estimate of how many cells it can accept, and + * then subtract the length of our outgoing_queue, if any, to produce an + * estimate of the number of cells this channel can accept for writes. + */ + +int +channel_num_cells_writeable(channel_t *chan) +{ + int result; + + tor_assert(chan); + tor_assert(chan->num_cells_writeable); + + if (chan->state == CHANNEL_STATE_OPEN) { + /* Query lower layer */ + result = chan->num_cells_writeable(chan); + /* Subtract cell queue length, if any */ + result -= chan_cell_queue_len(&chan->outgoing_queue); + if (result < 0) result = 0; + } else { + /* No cells are writeable in any other state */ + result = 0; + } + + return result; +} + /********************* * Timestamp updates * ********************/ @@ -4209,3 +4504,87 @@ channel_set_circid_type(channel_t *chan, } } +/** + * Update the estimated number of bytes queued to transmit for this channel, + * and notify the scheduler. The estimate includes both the channel queue and + * the queue size reported by the lower layer, and an overhead estimate + * optionally provided by the lower layer. + */ + +void +channel_update_xmit_queue_size(channel_t *chan) +{ + uint64_t queued, adj; + double overhead; + + tor_assert(chan); + tor_assert(chan->num_bytes_queued); + + /* + * First, get the number of bytes we have queued without factoring in + * lower-layer overhead. + */ + queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue; + /* Next, adjust by the overhead factor, if any is available */ + if (chan->get_overhead_estimate) { + overhead = chan->get_overhead_estimate(chan); + if (overhead >= 1.0f) { + queued *= overhead; + } else { + /* Ignore silly overhead factors */ + log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead); + } + } + + /* Now, compare to the previous estimate */ + if (queued > chan->bytes_queued_for_xmit) { + adj = queued - chan->bytes_queued_for_xmit; + log_debug(LD_CHANNEL, + "Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT + " from " U64_FORMAT " to " U64_FORMAT, + U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(adj), + U64_PRINTF_ARG(chan->bytes_queued_for_xmit), + U64_PRINTF_ARG(queued)); + /* Update the channel's estimate */ + chan->bytes_queued_for_xmit = queued; + + /* Update the global queue size estimate if appropriate */ + if (chan->state == CHANNEL_STATE_OPEN || + chan->state == CHANNEL_STATE_MAINT) { + estimated_total_queue_size += adj; + log_debug(LD_CHANNEL, + "Increasing global queue size by " U64_FORMAT " for channel " + U64_FORMAT ", new size is " U64_FORMAT, + U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(estimated_total_queue_size)); + /* Tell the scheduler we're increasing the queue size */ + scheduler_adjust_queue_size(chan, 1, adj); + } + } else if (queued < chan->bytes_queued_for_xmit) { + adj = chan->bytes_queued_for_xmit - queued; + log_debug(LD_CHANNEL, + "Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT + " from " U64_FORMAT " to " U64_FORMAT, + U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(adj), + U64_PRINTF_ARG(chan->bytes_queued_for_xmit), + U64_PRINTF_ARG(queued)); + /* Update the channel's estimate */ + chan->bytes_queued_for_xmit = queued; + + /* Update the global queue size estimate if appropriate */ + if (chan->state == CHANNEL_STATE_OPEN || + chan->state == CHANNEL_STATE_MAINT) { + estimated_total_queue_size -= adj; + log_debug(LD_CHANNEL, + "Decreasing global queue size by " U64_FORMAT " for channel " + U64_FORMAT ", new size is " U64_FORMAT, + U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), + U64_PRINTF_ARG(estimated_total_queue_size)); + /* Tell the scheduler we're decreasing the queue size */ + scheduler_adjust_queue_size(chan, -1, adj); + } + } +} + diff --git a/src/or/channel.h b/src/or/channel.h index 4cd8f4391e..5c2a1a35e2 100644 --- a/src/or/channel.h +++ b/src/or/channel.h @@ -57,6 +57,32 @@ struct channel_s { CHANNEL_CLOSE_FOR_ERROR } reason_for_closing; + /** State variable for use by the scheduler */ + enum { + /* + * The channel is not open, or it has a full output buffer but no queued + * cells. + */ + SCHED_CHAN_IDLE = 0, + /* + * The channel has space on its output buffer to write, but no queued + * cells. + */ + SCHED_CHAN_WAITING_FOR_CELLS, + /* + * The scheduler has queued cells but no output buffer space to write. + */ + SCHED_CHAN_WAITING_TO_WRITE, + /* + * The scheduler has both queued cells and output buffer space, and is + * eligible for the scheduler loop. + */ + SCHED_CHAN_PENDING + } scheduler_state; + + /** Heap index for use by the scheduler */ + int sched_heap_idx; + /** Timestamps for both cell channels and listeners */ time_t timestamp_created; /* Channel created */ time_t timestamp_active; /* Any activity */ @@ -79,6 +105,11 @@ struct channel_s { /* Methods implemented by the lower layer */ /** + * Ask the lower layer for an estimate of the average overhead for + * transmissions on this channel. + */ + double (*get_overhead_estimate)(channel_t *); + /* * Ask the underlying transport what the remote endpoint address is, in * a tor_addr_t. This is optional and subclasses may leave this NULL. * If they implement it, they should write the address out to the @@ -110,7 +141,11 @@ struct channel_s { int (*matches_extend_info)(channel_t *, extend_info_t *); /** Check if this channel matches a target address when extending */ int (*matches_target)(channel_t *, const tor_addr_t *); - /** Write a cell to an open channel */ + /* Ask the lower layer how many bytes it has queued but not yet sent */ + size_t (*num_bytes_queued)(channel_t *); + /* Ask the lower layer how many cells can be written */ + int (*num_cells_writeable)(channel_t *); + /* Write a cell to an open channel */ int (*write_cell)(channel_t *, cell_t *); /** Write a packed cell to an open channel */ int (*write_packed_cell)(channel_t *, packed_cell_t *); @@ -198,8 +233,16 @@ struct channel_s { uint64_t dirreq_id; /** Channel counters for cell channels */ - uint64_t n_cells_recved; - uint64_t n_cells_xmitted; + uint64_t n_cells_recved, n_bytes_recved; + uint64_t n_cells_xmitted, n_bytes_xmitted; + + /** Our current contribution to the scheduler's total xmit queue */ + uint64_t bytes_queued_for_xmit; + + /** Number of bytes in this channel's cell queue; does not include + * lower-layer queueing. + */ + uint64_t bytes_in_queue; }; struct channel_listener_s { @@ -311,6 +354,34 @@ void channel_set_cmux_policy_everywhere(circuitmux_policy_t *pol); #ifdef TOR_CHANNEL_INTERNAL_ +#ifdef CHANNEL_PRIVATE_ +/* Cell queue structure (here rather than channel.c for test suite use) */ + +typedef struct cell_queue_entry_s cell_queue_entry_t; +struct cell_queue_entry_s { + TOR_SIMPLEQ_ENTRY(cell_queue_entry_s) next; + enum { + CELL_QUEUE_FIXED, + CELL_QUEUE_VAR, + CELL_QUEUE_PACKED + } type; + union { + struct { + cell_t *cell; + } fixed; + struct { + var_cell_t *var_cell; + } var; + struct { + packed_cell_t *packed_cell; + } packed; + } u; +}; + +/* Cell queue functions for benefit of test suite */ +STATIC int chan_cell_queue_len(const chan_cell_queue_t *queue); +#endif + /* Channel operations for subclasses and internal use only */ /* Initialize a newly allocated channel - do this first in subclass @@ -384,7 +455,8 @@ void channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell); void channel_flush_cells(channel_t *chan); /* Request from lower layer for more cells if available */ -ssize_t channel_flush_some_cells(channel_t *chan, ssize_t num_cells); +MOCK_DECL(ssize_t, channel_flush_some_cells, + (channel_t *chan, ssize_t num_cells)); /* Query if data available on this channel */ int channel_more_to_flush(channel_t *chan); @@ -435,7 +507,7 @@ channel_t * channel_next_with_digest(channel_t *chan); */ const char * channel_describe_transport(channel_t *chan); -void channel_dump_statistics(channel_t *chan, int severity); +MOCK_DECL(void, channel_dump_statistics, (channel_t *chan, int severity)); void channel_dump_transport_statistics(channel_t *chan, int severity); const char * channel_get_actual_remote_descr(channel_t *chan); const char * channel_get_actual_remote_address(channel_t *chan); @@ -458,6 +530,7 @@ unsigned int channel_num_circuits(channel_t *chan); void channel_set_circid_type(channel_t *chan, crypto_pk_t *identity_rcvd, int consider_identity); void channel_timestamp_client(channel_t *chan); +void channel_update_xmit_queue_size(channel_t *chan); const char * channel_listener_describe_transport(channel_listener_t *chan_l); void channel_listener_dump_statistics(channel_listener_t *chan_l, @@ -465,6 +538,10 @@ void channel_listener_dump_statistics(channel_listener_t *chan_l, void channel_listener_dump_transport_statistics(channel_listener_t *chan_l, int severity); +/* Flow control queries */ +uint64_t channel_get_global_queue_estimate(void); +int channel_num_cells_writeable(channel_t *chan); + /* Timestamp queries */ time_t channel_when_created(channel_t *chan); time_t channel_when_last_active(channel_t *chan); diff --git a/src/or/channeltls.c b/src/or/channeltls.c index db044aee56..719a153dd6 100644 --- a/src/or/channeltls.c +++ b/src/or/channeltls.c @@ -25,6 +25,7 @@ #include "relay.h" #include "router.h" #include "routerlist.h" +#include "scheduler.h" /** How many CELL_PADDING cells have we received, ever? */ uint64_t stats_n_padding_cells_processed = 0; @@ -54,6 +55,7 @@ static void channel_tls_common_init(channel_tls_t *tlschan); static void channel_tls_close_method(channel_t *chan); static const char * channel_tls_describe_transport_method(channel_t *chan); static void channel_tls_free_method(channel_t *chan); +static double channel_tls_get_overhead_estimate_method(channel_t *chan); static int channel_tls_get_remote_addr_method(channel_t *chan, tor_addr_t *addr_out); static int @@ -67,6 +69,8 @@ channel_tls_matches_extend_info_method(channel_t *chan, extend_info_t *extend_info); static int channel_tls_matches_target_method(channel_t *chan, const tor_addr_t *target); +static int channel_tls_num_cells_writeable_method(channel_t *chan); +static size_t channel_tls_num_bytes_queued_method(channel_t *chan); static int channel_tls_write_cell_method(channel_t *chan, cell_t *cell); static int channel_tls_write_packed_cell_method(channel_t *chan, @@ -116,6 +120,7 @@ channel_tls_common_init(channel_tls_t *tlschan) chan->close = channel_tls_close_method; chan->describe_transport = channel_tls_describe_transport_method; chan->free = channel_tls_free_method; + chan->get_overhead_estimate = channel_tls_get_overhead_estimate_method; chan->get_remote_addr = channel_tls_get_remote_addr_method; chan->get_remote_descr = channel_tls_get_remote_descr_method; chan->get_transport_name = channel_tls_get_transport_name_method; @@ -123,6 +128,8 @@ channel_tls_common_init(channel_tls_t *tlschan) chan->is_canonical = channel_tls_is_canonical_method; chan->matches_extend_info = channel_tls_matches_extend_info_method; chan->matches_target = channel_tls_matches_target_method; + chan->num_bytes_queued = channel_tls_num_bytes_queued_method; + chan->num_cells_writeable = channel_tls_num_cells_writeable_method; chan->write_cell = channel_tls_write_cell_method; chan->write_packed_cell = channel_tls_write_packed_cell_method; chan->write_var_cell = channel_tls_write_var_cell_method; @@ -435,6 +442,40 @@ channel_tls_free_method(channel_t *chan) } /** + * Get an estimate of the average TLS overhead for the upper layer + */ + +static double +channel_tls_get_overhead_estimate_method(channel_t *chan) +{ + double overhead = 1.0f; + channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan); + + tor_assert(tlschan); + tor_assert(tlschan->conn); + + /* Just return 1.0f if we don't have sensible data */ + if (tlschan->conn->bytes_xmitted > 0 && + tlschan->conn->bytes_xmitted_by_tls >= + tlschan->conn->bytes_xmitted) { + overhead = ((double)(tlschan->conn->bytes_xmitted_by_tls)) / + ((double)(tlschan->conn->bytes_xmitted)); + + /* + * Never estimate more than 2.0; otherwise we get silly large estimates + * at the very start of a new TLS connection. + */ + if (overhead > 2.0f) overhead = 2.0f; + } + + log_debug(LD_CHANNEL, + "Estimated overhead ratio for TLS chan " U64_FORMAT " is %f", + U64_PRINTF_ARG(chan->global_identifier), overhead); + + return overhead; +} + +/** * Get the remote address of a channel_tls_t * * This implements the get_remote_addr method for channel_tls_t; copy the @@ -673,6 +714,50 @@ channel_tls_matches_target_method(channel_t *chan, } /** + * Tell the upper layer how many bytes we have queued and not yet + * sent. + */ + +static size_t +channel_tls_num_bytes_queued_method(channel_t *chan) +{ + channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan); + + tor_assert(tlschan); + tor_assert(tlschan->conn); + + return connection_get_outbuf_len(TO_CONN(tlschan->conn)); +} + +/** + * Tell the upper layer how many cells we can accept to write + * + * This implements the num_cells_writeable method for channel_tls_t; it + * returns an estimate of the number of cells we can accept with + * channel_tls_write_*_cell(). + */ + +static int +channel_tls_num_cells_writeable_method(channel_t *chan) +{ + size_t outbuf_len; + int n; + channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan); + size_t cell_network_size; + + tor_assert(tlschan); + tor_assert(tlschan->conn); + + cell_network_size = get_cell_network_size(tlschan->conn->wide_circ_ids); + outbuf_len = connection_get_outbuf_len(TO_CONN(tlschan->conn)); + /* Get the number of cells */ + n = CEIL_DIV(OR_CONN_HIGHWATER - outbuf_len, cell_network_size); + if (n < 0) n = 0; + + return n; +} + +/** * Write a cell to a channel_tls_t * * This implements the write_cell method for channel_tls_t; given a @@ -867,6 +952,10 @@ channel_tls_handle_state_change_on_orconn(channel_tls_t *chan, * CHANNEL_STATE_MAINT on this. */ channel_change_state(base_chan, CHANNEL_STATE_OPEN); + /* We might have just become writeable; check and tell the scheduler */ + if (connection_or_num_cells_writeable(conn) > 0) { + scheduler_channel_wants_writes(base_chan); + } } else { /* * Not open, so from CHANNEL_STATE_OPEN we go to CHANNEL_STATE_MAINT, @@ -878,58 +967,6 @@ channel_tls_handle_state_change_on_orconn(channel_tls_t *chan, } } -/** - * Flush cells from a channel_tls_t - * - * Try to flush up to about num_cells cells, and return how many we flushed. - */ - -ssize_t -channel_tls_flush_some_cells(channel_tls_t *chan, ssize_t num_cells) -{ - ssize_t flushed = 0; - - tor_assert(chan); - - if (flushed >= num_cells) goto done; - - /* - * If channel_tls_t ever buffers anything below the channel_t layer, flush - * that first here. - */ - - flushed += channel_flush_some_cells(TLS_CHAN_TO_BASE(chan), - num_cells - flushed); - - /* - * If channel_tls_t ever buffers anything below the channel_t layer, check - * how much we actually got and push it on down here. - */ - - done: - return flushed; -} - -/** - * Check if a channel_tls_t has anything to flush - * - * Return true if there is any more to flush on this channel (cells in queue - * or active circuits). - */ - -int -channel_tls_more_to_flush(channel_tls_t *chan) -{ - tor_assert(chan); - - /* - * If channel_tls_t ever buffers anything below channel_t, the - * check for that should go here first. - */ - - return channel_more_to_flush(TLS_CHAN_TO_BASE(chan)); -} - #ifdef KEEP_TIMING_STATS /** diff --git a/src/or/channeltls.h b/src/or/channeltls.h index 06be6fa555..133ad43bb4 100644 --- a/src/or/channeltls.h +++ b/src/or/channeltls.h @@ -40,8 +40,6 @@ channel_t * channel_tls_to_base(channel_tls_t *tlschan); channel_tls_t * channel_tls_from_base(channel_t *chan); /* Things for connection_or.c to call back into */ -ssize_t channel_tls_flush_some_cells(channel_tls_t *chan, ssize_t num_cells); -int channel_tls_more_to_flush(channel_tls_t *chan); void channel_tls_handle_cell(cell_t *cell, or_connection_t *conn); void channel_tls_handle_state_change_on_orconn(channel_tls_t *chan, or_connection_t *conn, diff --git a/src/or/circuitbuild.c b/src/or/circuitbuild.c index 34934dc519..ddb186bea4 100644 --- a/src/or/circuitbuild.c +++ b/src/or/circuitbuild.c @@ -14,6 +14,7 @@ #include "or.h" #include "channel.h" #include "circpathbias.h" +#define CIRCUITBUILD_PRIVATE #include "circuitbuild.h" #include "circuitlist.h" #include "circuitstats.h" diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c index c7287f921d..0e88b47676 100644 --- a/src/or/circuitlist.c +++ b/src/or/circuitlist.c @@ -302,8 +302,8 @@ channel_note_destroy_pending(channel_t *chan, circid_t id) /** Called to indicate that a DESTROY is no longer pending on <b>chan</b> with * circuit ID <b>id</b> -- typically, because it has been sent. */ -void -channel_note_destroy_not_pending(channel_t *chan, circid_t id) +MOCK_IMPL(void, channel_note_destroy_not_pending, + (channel_t *chan, circid_t id)) { circuit_t *circ = circuit_get_by_circid_channel_even_if_marked(id,chan); if (circ) { diff --git a/src/or/circuitlist.h b/src/or/circuitlist.h index addaa725d4..ea1076d53f 100644 --- a/src/or/circuitlist.h +++ b/src/or/circuitlist.h @@ -72,7 +72,8 @@ void circuit_free_all(void); void circuits_handle_oom(size_t current_allocation); void channel_note_destroy_pending(channel_t *chan, circid_t id); -void channel_note_destroy_not_pending(channel_t *chan, circid_t id); +MOCK_DECL(void, channel_note_destroy_not_pending, + (channel_t *chan, circid_t id)); #ifdef CIRCUITLIST_PRIVATE STATIC void circuit_free(circuit_t *circ); diff --git a/src/or/circuitmux.c b/src/or/circuitmux.c index 663711c6c0..443dad0a54 100644 --- a/src/or/circuitmux.c +++ b/src/or/circuitmux.c @@ -621,8 +621,8 @@ circuitmux_clear_policy(circuitmux_t *cmux) * Return the policy currently installed on a circuitmux_t */ -const circuitmux_policy_t * -circuitmux_get_policy(circuitmux_t *cmux) +MOCK_IMPL(const circuitmux_policy_t *, +circuitmux_get_policy, (circuitmux_t *cmux)) { tor_assert(cmux); @@ -896,8 +896,8 @@ circuitmux_num_cells_for_circuit(circuitmux_t *cmux, circuit_t *circ) * Query total number of available cells on a circuitmux */ -unsigned int -circuitmux_num_cells(circuitmux_t *cmux) +MOCK_IMPL(unsigned int, +circuitmux_num_cells, (circuitmux_t *cmux)) { tor_assert(cmux); @@ -1951,3 +1951,51 @@ circuitmux_count_queued_destroy_cells(const channel_t *chan, return n_destroy_cells; } +/** + * Compare cmuxes to see which is more preferred; return < 0 if + * cmux_1 has higher priority (i.e., cmux_1 < cmux_2 in the scheduler's + * sort order), > 0 if cmux_2 has higher priority, or 0 if they are + * equally preferred. + * + * If the cmuxes have different cmux policies or the policy does not + * support the cmp_cmux method, return 0. + */ + +MOCK_IMPL(int, +circuitmux_compare_muxes, (circuitmux_t *cmux_1, circuitmux_t *cmux_2)) +{ + const circuitmux_policy_t *policy; + + tor_assert(cmux_1); + tor_assert(cmux_2); + + if (cmux_1 == cmux_2) { + /* Equivalent because they're the same cmux */ + return 0; + } + + if (cmux_1->policy && cmux_2->policy) { + if (cmux_1->policy == cmux_2->policy) { + policy = cmux_1->policy; + + if (policy->cmp_cmux) { + /* Okay, we can compare! */ + return policy->cmp_cmux(cmux_1, cmux_1->policy_data, + cmux_2, cmux_2->policy_data); + } else { + /* + * Equivalent because the policy doesn't know how to compare between + * muxes. + */ + return 0; + } + } else { + /* Equivalent because they have different policies */ + return 0; + } + } else { + /* Equivalent because one or both are missing a policy */ + return 0; + } +} + diff --git a/src/or/circuitmux.h b/src/or/circuitmux.h index eade2486a2..53092cd66c 100644 --- a/src/or/circuitmux.h +++ b/src/or/circuitmux.h @@ -57,6 +57,9 @@ struct circuitmux_policy_s { /* Choose a circuit */ circuit_t * (*pick_active_circuit)(circuitmux_t *cmux, circuitmux_policy_data_t *pol_data); + /* Optional: channel comparator for use by the scheduler */ + int (*cmp_cmux)(circuitmux_t *cmux_1, circuitmux_policy_data_t *pol_data_1, + circuitmux_t *cmux_2, circuitmux_policy_data_t *pol_data_2); }; /* @@ -105,7 +108,8 @@ void circuitmux_free(circuitmux_t *cmux); /* Policy control */ void circuitmux_clear_policy(circuitmux_t *cmux); -const circuitmux_policy_t * circuitmux_get_policy(circuitmux_t *cmux); +MOCK_DECL(const circuitmux_policy_t *, + circuitmux_get_policy, (circuitmux_t *cmux)); void circuitmux_set_policy(circuitmux_t *cmux, const circuitmux_policy_t *pol); @@ -117,7 +121,7 @@ int circuitmux_is_circuit_attached(circuitmux_t *cmux, circuit_t *circ); int circuitmux_is_circuit_active(circuitmux_t *cmux, circuit_t *circ); unsigned int circuitmux_num_cells_for_circuit(circuitmux_t *cmux, circuit_t *circ); -unsigned int circuitmux_num_cells(circuitmux_t *cmux); +MOCK_DECL(unsigned int, circuitmux_num_cells, (circuitmux_t *cmux)); unsigned int circuitmux_num_circuits(circuitmux_t *cmux); unsigned int circuitmux_num_active_circuits(circuitmux_t *cmux); @@ -148,5 +152,9 @@ void circuitmux_append_destroy_cell(channel_t *chan, void circuitmux_mark_destroyed_circids_usable(circuitmux_t *cmux, channel_t *chan); +/* Optional interchannel comparisons for scheduling */ +MOCK_DECL(int, circuitmux_compare_muxes, + (circuitmux_t *cmux_1, circuitmux_t *cmux_2)); + #endif /* TOR_CIRCUITMUX_H */ diff --git a/src/or/circuitmux_ewma.c b/src/or/circuitmux_ewma.c index 49d899e5e7..0d7d6ef197 100644 --- a/src/or/circuitmux_ewma.c +++ b/src/or/circuitmux_ewma.c @@ -187,6 +187,9 @@ ewma_notify_xmit_cells(circuitmux_t *cmux, static circuit_t * ewma_pick_active_circuit(circuitmux_t *cmux, circuitmux_policy_data_t *pol_data); +static int +ewma_cmp_cmux(circuitmux_t *cmux_1, circuitmux_policy_data_t *pol_data_1, + circuitmux_t *cmux_2, circuitmux_policy_data_t *pol_data_2); /*** EWMA global variables ***/ @@ -209,7 +212,8 @@ circuitmux_policy_t ewma_policy = { /*.notify_circ_inactive =*/ ewma_notify_circ_inactive, /*.notify_set_n_cells =*/ NULL, /* EWMA doesn't need this */ /*.notify_xmit_cells =*/ ewma_notify_xmit_cells, - /*.pick_active_circuit =*/ ewma_pick_active_circuit + /*.pick_active_circuit =*/ ewma_pick_active_circuit, + /*.cmp_cmux =*/ ewma_cmp_cmux }; /*** EWMA method implementations using the below EWMA helper functions ***/ @@ -453,6 +457,58 @@ ewma_pick_active_circuit(circuitmux_t *cmux, return circ; } +/** + * Compare two EWMA cmuxes, and return -1, 0 or 1 to indicate which should + * be more preferred - see circuitmux_compare_muxes() of circuitmux.c. + */ + +static int +ewma_cmp_cmux(circuitmux_t *cmux_1, circuitmux_policy_data_t *pol_data_1, + circuitmux_t *cmux_2, circuitmux_policy_data_t *pol_data_2) +{ + ewma_policy_data_t *p1 = NULL, *p2 = NULL; + cell_ewma_t *ce1 = NULL, *ce2 = NULL; + + tor_assert(cmux_1); + tor_assert(pol_data_1); + tor_assert(cmux_2); + tor_assert(pol_data_2); + + p1 = TO_EWMA_POL_DATA(pol_data_1); + p2 = TO_EWMA_POL_DATA(pol_data_1); + + if (p1 != p2) { + /* Get the head cell_ewma_t from each queue */ + if (smartlist_len(p1->active_circuit_pqueue) > 0) { + ce1 = smartlist_get(p1->active_circuit_pqueue, 0); + } + + if (smartlist_len(p2->active_circuit_pqueue) > 0) { + ce2 = smartlist_get(p2->active_circuit_pqueue, 0); + } + + /* Got both of them? */ + if (ce1 != NULL && ce2 != NULL) { + /* Pick whichever one has the better best circuit */ + return compare_cell_ewma_counts(ce1, ce2); + } else { + if (ce1 != NULL ) { + /* We only have a circuit on cmux_1, so prefer it */ + return -1; + } else if (ce2 != NULL) { + /* We only have a circuit on cmux_2, so prefer it */ + return 1; + } else { + /* No circuits at all; no preference */ + return 0; + } + } + } else { + /* We got identical params */ + return 0; + } +} + /** Helper for sorting cell_ewma_t values in their priority queue. */ static int compare_cell_ewma_counts(const void *p1, const void *p2) diff --git a/src/or/config.c b/src/or/config.c index 46c090e65e..ced4288f00 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -43,6 +43,7 @@ #include "util.h" #include "routerlist.h" #include "routerset.h" +#include "scheduler.h" #include "statefile.h" #include "transports.h" #include "ext_orport.h" @@ -368,6 +369,9 @@ static config_var_t option_vars_[] = { V(ServerDNSSearchDomains, BOOL, "0"), V(ServerDNSTestAddresses, CSV, "www.google.com,www.mit.edu,www.yahoo.com,www.slashdot.org"), + V(SchedulerLowWaterMark, MEMUNIT, "16 kB"), + V(SchedulerHighWaterMark, MEMUNIT, "32 kB"), + V(SchedulerMaxFlushCells, UINT, "16"), V(ShutdownWaitLength, INTERVAL, "30 seconds"), V(SocksListenAddress, LINELIST, NULL), V(SocksPolicy, LINELIST, NULL), @@ -1045,6 +1049,14 @@ options_act_reversible(const or_options_t *old_options, char **msg) if (running_tor && !libevent_initialized) { init_libevent(options); libevent_initialized = 1; + + /* + * Initialize the scheduler - this has to come after + * options_init_from_torrc() sets up libevent - why yes, that seems + * completely sensible to hide the libevent setup in the option parsing + * code! It also needs to happen before init_keys(), so it needs to + * happen here too. How yucky. */ + scheduler_init(); } /* Adjust the port configuration so we can launch listeners. */ @@ -1526,6 +1538,25 @@ options_act(const or_options_t *old_options) return -1; } + /* Set up scheduler thresholds */ + if (options->SchedulerLowWaterMark > 0 && + options->SchedulerHighWaterMark > options->SchedulerLowWaterMark) { + scheduler_set_watermarks(options->SchedulerLowWaterMark, + options->SchedulerHighWaterMark, + (options->SchedulerMaxFlushCells > 0) ? + options->SchedulerMaxFlushCells : 16); + } else { + if (options->SchedulerLowWaterMark == 0) { + log_warn(LD_GENERAL, "Bad SchedulerLowWaterMark option"); + } + + if (options->SchedulerHighWaterMark <= options->SchedulerLowWaterMark) { + log_warn(LD_GENERAL, "Bad SchedulerHighWaterMark option"); + } + + return -1; + } + /* Set up accounting */ if (accounting_parse_options(options, 0)<0) { log_warn(LD_CONFIG,"Error in accounting options"); @@ -2269,8 +2300,8 @@ resolve_my_address(int warn_severity, const or_options_t *options, /** Return true iff <b>addr</b> is judged to be on the same network as us, or * on a private network. */ -int -is_local_addr(const tor_addr_t *addr) +MOCK_IMPL(int, +is_local_addr, (const tor_addr_t *addr)) { if (tor_addr_is_internal(addr, 0)) return 1; diff --git a/src/or/config.h b/src/or/config.h index 6cc81ab948..133b472eb2 100644 --- a/src/or/config.h +++ b/src/or/config.h @@ -33,7 +33,7 @@ void reset_last_resolved_addr(void); int resolve_my_address(int warn_severity, const or_options_t *options, uint32_t *addr_out, const char **method_out, char **hostname_out); -int is_local_addr(const tor_addr_t *addr); +MOCK_DECL(int, is_local_addr, (const tor_addr_t *addr)); void options_init(or_options_t *options); #define OPTIONS_DUMP_MINIMAL 1 diff --git a/src/or/connection.c b/src/or/connection.c index c9c371c001..ce3fda89c1 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -3839,6 +3839,8 @@ connection_handle_write_impl(connection_t *conn, int force) tor_tls_get_n_raw_bytes(or_conn->tls, &n_read, &n_written); log_debug(LD_GENERAL, "After TLS write of %d: %ld read, %ld written", result, (long)n_read, (long)n_written); + or_conn->bytes_xmitted += result; + or_conn->bytes_xmitted_by_tls += n_written; /* So we notice bytes were written even on error */ /* XXXX024 This cast is safe since we can never write INT_MAX bytes in a * single set of TLS operations. But it looks kinda ugly. If we refactor diff --git a/src/or/connection_or.c b/src/or/connection_or.c index 29b88041b7..f228450723 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -38,6 +38,8 @@ #include "router.h" #include "routerlist.h" #include "ext_orport.h" +#include "scheduler.h" + #ifdef USE_BUFFEREVENTS #include <event2/bufferevent_ssl.h> #endif @@ -574,48 +576,51 @@ connection_or_process_inbuf(or_connection_t *conn) return ret; } -/** When adding cells to an OR connection's outbuf, keep adding until the - * outbuf is at least this long, or we run out of cells. */ -#define OR_CONN_HIGHWATER (32*1024) - -/** Add cells to an OR connection's outbuf whenever the outbuf's data length - * drops below this size. */ -#define OR_CONN_LOWWATER (16*1024) - /** Called whenever we have flushed some data on an or_conn: add more data * from active circuits. */ int connection_or_flushed_some(or_connection_t *conn) { - size_t datalen, temp; - ssize_t n, flushed; - size_t cell_network_size = get_cell_network_size(conn->wide_circ_ids); + size_t datalen; + + /* The channel will want to update its estimated queue size */ + channel_update_xmit_queue_size(TLS_CHAN_TO_BASE(conn->chan)); /* If we're under the low water mark, add cells until we're just over the * high water mark. */ datalen = connection_get_outbuf_len(TO_CONN(conn)); if (datalen < OR_CONN_LOWWATER) { - while ((conn->chan) && channel_tls_more_to_flush(conn->chan)) { - /* Compute how many more cells we want at most */ - n = CEIL_DIV(OR_CONN_HIGHWATER - datalen, cell_network_size); - /* Bail out if we don't want any more */ - if (n <= 0) break; - /* We're still here; try to flush some more cells */ - flushed = channel_tls_flush_some_cells(conn->chan, n); - /* Bail out if it says it didn't flush anything */ - if (flushed <= 0) break; - /* How much in the outbuf now? */ - temp = connection_get_outbuf_len(TO_CONN(conn)); - /* Bail out if we didn't actually increase the outbuf size */ - if (temp <= datalen) break; - /* Update datalen for the next iteration */ - datalen = temp; - } + /* Let the scheduler know */ + scheduler_channel_wants_writes(TLS_CHAN_TO_BASE(conn->chan)); } return 0; } +/** This is for channeltls.c to ask how many cells we could accept if + * they were available. */ +ssize_t +connection_or_num_cells_writeable(or_connection_t *conn) +{ + size_t datalen, cell_network_size; + ssize_t n = 0; + + tor_assert(conn); + + /* + * If we're under the high water mark, we're potentially + * writeable; note this is different from the calculation above + * used to trigger when to start writing after we've stopped. + */ + datalen = connection_get_outbuf_len(TO_CONN(conn)); + if (datalen < OR_CONN_HIGHWATER) { + cell_network_size = get_cell_network_size(conn->wide_circ_ids); + n = CEIL_DIV(OR_CONN_HIGHWATER - datalen, cell_network_size); + } + + return n; +} + /** Connection <b>conn</b> has finished writing and has no bytes left on * its outbuf. * @@ -1169,10 +1174,10 @@ connection_or_notify_error(or_connection_t *conn, * * Return the launched conn, or NULL if it failed. */ -or_connection_t * -connection_or_connect(const tor_addr_t *_addr, uint16_t port, - const char *id_digest, - channel_tls_t *chan) + +MOCK_IMPL(or_connection_t *, +connection_or_connect, (const tor_addr_t *_addr, uint16_t port, + const char *id_digest, channel_tls_t *chan)) { or_connection_t *conn; const or_options_t *options = get_options(); diff --git a/src/or/connection_or.h b/src/or/connection_or.h index 1ceabdaa4b..b82896e26d 100644 --- a/src/or/connection_or.h +++ b/src/or/connection_or.h @@ -24,6 +24,7 @@ void connection_or_set_bad_connections(const char *digest, int force); void connection_or_block_renegotiation(or_connection_t *conn); int connection_or_reached_eof(or_connection_t *conn); int connection_or_process_inbuf(or_connection_t *conn); +ssize_t connection_or_num_cells_writeable(or_connection_t *conn); int connection_or_flushed_some(or_connection_t *conn); int connection_or_finished_flushing(or_connection_t *conn); int connection_or_finished_connecting(or_connection_t *conn); @@ -36,9 +37,10 @@ void connection_or_connect_failed(or_connection_t *conn, int reason, const char *msg); void connection_or_notify_error(or_connection_t *conn, int reason, const char *msg); -or_connection_t *connection_or_connect(const tor_addr_t *addr, uint16_t port, - const char *id_digest, - channel_tls_t *chan); +MOCK_DECL(or_connection_t *, + connection_or_connect, + (const tor_addr_t *addr, uint16_t port, + const char *id_digest, channel_tls_t *chan)); void connection_or_close_normally(or_connection_t *orconn, int flush); void connection_or_close_for_error(or_connection_t *orconn, int flush); diff --git a/src/or/include.am b/src/or/include.am index 0f53f007f0..643f7ce001 100644 --- a/src/or/include.am +++ b/src/or/include.am @@ -74,6 +74,7 @@ LIBTOR_A_SOURCES = \ src/or/routerlist.c \ src/or/routerparse.c \ src/or/routerset.c \ + src/or/scheduler.c \ src/or/statefile.c \ src/or/status.c \ src/or/onion_ntor.c \ @@ -179,6 +180,7 @@ ORHEADERS = \ src/or/routerlist.h \ src/or/routerset.h \ src/or/routerparse.h \ + src/or/scheduler.h \ src/or/statefile.h \ src/or/status.h diff --git a/src/or/main.c b/src/or/main.c index 54c739c4fd..64ccbd496c 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -53,6 +53,7 @@ #include "router.h" #include "routerlist.h" #include "routerparse.h" +#include "scheduler.h" #include "statefile.h" #include "status.h" #include "util_process.h" @@ -2583,6 +2584,7 @@ tor_free_all(int postfork) channel_tls_free_all(); channel_free_all(); connection_free_all(); + scheduler_free_all(); buf_shrink_freelists(1); memarea_clear_freelist(); nodelist_free_all(); diff --git a/src/or/or.h b/src/or/or.h index bff7267d37..ccb29ee7df 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1431,6 +1431,18 @@ typedef struct or_handshake_state_t { /** Length of Extended ORPort connection identifier. */ #define EXT_OR_CONN_ID_LEN DIGEST_LEN /* 20 */ +/* + * OR_CONN_HIGHWATER and OR_CONN_LOWWATER moved from connection_or.c so + * channeltls.c can see them too. + */ + +/** When adding cells to an OR connection's outbuf, keep adding until the + * outbuf is at least this long, or we run out of cells. */ +#define OR_CONN_HIGHWATER (32*1024) + +/** Add cells to an OR connection's outbuf whenever the outbuf's data length + * drops below this size. */ +#define OR_CONN_LOWWATER (16*1024) /** Subtype of connection_t for an "OR connection" -- that is, one that speaks * cells over TLS. */ @@ -1522,6 +1534,12 @@ typedef struct or_connection_t { /** Last emptied write token bucket in msec since midnight; only used if * TB_EMPTY events are enabled. */ uint32_t write_emptied_time; + + /* + * Count the number of bytes flushed out on this orconn, and the number of + * bytes TLS actually sent - used for overhead estimation for scheduling. + */ + uint64_t bytes_xmitted, bytes_xmitted_by_tls; } or_connection_t; /** Subtype of connection_t for an "edge connection" -- that is, an entry (ap) @@ -4230,6 +4248,18 @@ typedef struct { /** How long (seconds) do we keep a guard before picking a new one? */ int GuardLifetime; + /** Low-water mark for global scheduler - start sending when estimated + * queued size falls below this threshold. + */ + uint32_t SchedulerLowWaterMark; + /** High-water mark for global scheduler - stop sending when estimated + * queued size exceeds this threshold. + */ + uint32_t SchedulerHighWaterMark; + /** Flush size for global scheduler - flush this many cells at a time + * when sending. + */ + unsigned int SchedulerMaxFlushCells; } or_options_t; /** Persistent state for an onion router, as saved to disk. */ diff --git a/src/or/relay.c b/src/or/relay.c index 05c7b3c955..b95e5841e7 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -39,6 +39,7 @@ #include "router.h" #include "routerlist.h" #include "routerparse.h" +#include "scheduler.h" static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, @@ -2591,8 +2592,8 @@ packed_cell_get_circid(const packed_cell_t *cell, int wide_circ_ids) * queue of the first active circuit on <b>chan</b>, and write them to * <b>chan</b>->outbuf. Return the number of cells written. Advance * the active circuit pointer to the next active circuit in the ring. */ -int -channel_flush_from_first_active_circuit(channel_t *chan, int max) +MOCK_IMPL(int, +channel_flush_from_first_active_circuit, (channel_t *chan, int max)) { circuitmux_t *cmux = NULL; int n_flushed = 0; @@ -2868,14 +2869,8 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, log_debug(LD_GENERAL, "Made a circuit active."); } - if (!channel_has_queued_writes(chan)) { - /* There is no data at all waiting to be sent on the outbuf. Add a - * cell, so that we can notice when it gets flushed, flushed_some can - * get called, and we can start putting more data onto the buffer then. - */ - log_debug(LD_GENERAL, "Primed a buffer."); - channel_flush_from_first_active_circuit(chan, 1); - } + /* New way: mark this as having waiting cells for the scheduler */ + scheduler_channel_has_waiting_cells(chan); } /** Append an encoded value of <b>addr</b> to <b>payload_out</b>, which must diff --git a/src/or/relay.h b/src/or/relay.h index 73c399154d..351516aada 100644 --- a/src/or/relay.h +++ b/src/or/relay.h @@ -64,7 +64,8 @@ void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, cell_t *cell, cell_direction_t direction, streamid_t fromstream); void channel_unlink_all_circuits(channel_t *chan, smartlist_t *detached_out); -int channel_flush_from_first_active_circuit(channel_t *chan, int max); +MOCK_DECL(int, channel_flush_from_first_active_circuit, + (channel_t *chan, int max)); void assert_circuit_mux_okay(channel_t *chan); void update_circuit_on_cmux_(circuit_t *circ, cell_direction_t direction, const char *file, int lineno); diff --git a/src/or/scheduler.c b/src/or/scheduler.c new file mode 100644 index 0000000000..c2ede846bf --- /dev/null +++ b/src/or/scheduler.c @@ -0,0 +1,708 @@ +/* * Copyright (c) 2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file scheduler.c + * \brief Relay scheduling system + **/ + +#include "or.h" + +#define TOR_CHANNEL_INTERNAL_ /* For channel_flush_some_cells() */ +#include "channel.h" + +#include "compat_libevent.h" +#define SCHEDULER_PRIVATE_ +#include "scheduler.h" + +#ifdef HAVE_EVENT2_EVENT_H +#include <event2/event.h> +#else +#include <event.h> +#endif + +/* + * Scheduler high/low watermarks + */ + +static uint32_t sched_q_low_water = 16384; +static uint32_t sched_q_high_water = 32768; + +/* + * Maximum cells to flush in a single call to channel_flush_some_cells(); + * setting this low means more calls, but too high and we could overshoot + * sched_q_high_water. + */ + +static uint32_t sched_max_flush_cells = 16; + +/* + * Write scheduling works by keeping track of which channels can + * accept cells, and have cells to write. From the scheduler's perspective, + * a channel can be in four possible states: + * + * 1.) Not open for writes, no cells to send + * - Not much to do here, and the channel will have scheduler_state == + * SCHED_CHAN_IDLE + * - Transitions from: + * - Open for writes/has cells by simultaneously draining all circuit + * queues and filling the output buffer. + * - Transitions to: + * - Not open for writes/has cells by arrival of cells on an attached + * circuit (this would be driven from append_cell_to_circuit_queue()) + * - Open for writes/no cells by a channel type specific path; + * driven from connection_or_flushed_some() for channel_tls_t. + * + * 2.) Open for writes, no cells to send + * - Not much here either; this will be the state an idle but open channel + * can be expected to settle in. It will have scheduler_state == + * SCHED_CHAN_WAITING_FOR_CELLS + * - Transitions from: + * - Not open for writes/no cells by flushing some of the output + * buffer. + * - Open for writes/has cells by the scheduler moving cells from + * circuit queues to channel output queue, but not having enough + * to fill the output queue. + * - Transitions to: + * - Open for writes/has cells by arrival of new cells on an attached + * circuit, in append_cell_to_circuit_queue() + * + * 3.) Not open for writes, cells to send + * - This is the state of a busy circuit limited by output bandwidth; + * cells have piled up in the circuit queues waiting to be relayed. + * The channel will have scheduler_state == SCHED_CHAN_WAITING_TO_WRITE. + * - Transitions from: + * - Not open for writes/no cells by arrival of cells on an attached + * circuit + * - Open for writes/has cells by filling an output buffer without + * draining all cells from attached circuits + * - Transitions to: + * - Opens for writes/has cells by draining some of the output buffer + * via the connection_or_flushed_some() path (for channel_tls_t). + * + * 4.) Open for writes, cells to send + * - This connection is ready to relay some cells and waiting for + * the scheduler to choose it. The channel will have scheduler_state == + * SCHED_CHAN_PENDING. + * - Transitions from: + * - Not open for writes/has cells by the connection_or_flushed_some() + * path + * - Open for writes/no cells by the append_cell_to_circuit_queue() + * path + * - Transitions to: + * - Not open for writes/no cells by draining all circuit queues and + * simultaneously filling the output buffer. + * - Not open for writes/has cells by writing enough cells to fill the + * output buffer + * - Open for writes/no cells by draining all attached circuit queues + * without also filling the output buffer + * + * Other event-driven parts of the code move channels between these scheduling + * states by calling scheduler functions; the scheduler only runs on open-for- + * writes/has-cells channels and is the only path for those to transition to + * other states. The scheduler_run() function gives us the opportunity to do + * scheduling work, and is called from other scheduler functions whenever a + * state transition occurs, and periodically from the main event loop. + */ + +/* Scheduler global data structures */ + +/* + * We keep a list of channels that are pending - i.e, have cells to write + * and can accept them to send. The enum scheduler_state in channel_t + * is reserved for our use. + */ + +/* Pqueue of channels that can write and have cells (pending work) */ +STATIC smartlist_t *channels_pending = NULL; + +/* + * This event runs the scheduler from its callback, and is manually + * activated whenever a channel enters open for writes/cells to send. + */ + +STATIC struct event *run_sched_ev = NULL; + +/* + * Queue heuristic; this is not the queue size, but an 'effective queuesize' + * that ages out contributions from stalled channels. + */ + +STATIC uint64_t queue_heuristic = 0; + +/* + * Timestamp for last queue heuristic update + */ + +STATIC time_t queue_heuristic_timestamp = 0; + +/* Scheduler static function declarations */ + +static void scheduler_evt_callback(evutil_socket_t fd, + short events, void *arg); +static int scheduler_more_work(void); +static void scheduler_retrigger(void); +#if 0 +static void scheduler_trigger(void); +#endif + +/* Scheduler function implementations */ + +/** Free everything and shut down the scheduling system */ + +void +scheduler_free_all(void) +{ + log_debug(LD_SCHED, "Shutting down scheduler"); + + if (run_sched_ev) { + event_del(run_sched_ev); + tor_event_free(run_sched_ev); + run_sched_ev = NULL; + } + + if (channels_pending) { + smartlist_free(channels_pending); + channels_pending = NULL; + } +} + +/** + * Comparison function to use when sorting pending channels + */ + +MOCK_IMPL(STATIC int, +scheduler_compare_channels, (const void *c1_v, const void *c2_v)) +{ + channel_t *c1 = NULL, *c2 = NULL; + /* These are a workaround for -Wbad-function-cast throwing a fit */ + const circuitmux_policy_t *p1, *p2; + uintptr_t p1_i, p2_i; + + tor_assert(c1_v); + tor_assert(c2_v); + + c1 = (channel_t *)(c1_v); + c2 = (channel_t *)(c2_v); + + tor_assert(c1); + tor_assert(c2); + + if (c1 != c2) { + if (circuitmux_get_policy(c1->cmux) == + circuitmux_get_policy(c2->cmux)) { + /* Same cmux policy, so use the mux comparison */ + return circuitmux_compare_muxes(c1->cmux, c2->cmux); + } else { + /* + * Different policies; not important to get this edge case perfect + * because the current code never actually gives different channels + * different cmux policies anyway. Just use this arbitrary but + * definite choice. + */ + p1 = circuitmux_get_policy(c1->cmux); + p2 = circuitmux_get_policy(c2->cmux); + p1_i = (uintptr_t)p1; + p2_i = (uintptr_t)p2; + + return (p1_i < p2_i) ? -1 : 1; + } + } else { + /* c1 == c2, so always equal */ + return 0; + } +} + +/* + * Scheduler event callback; this should get triggered once per event loop + * if any scheduling work was created during the event loop. + */ + +static void +scheduler_evt_callback(evutil_socket_t fd, short events, void *arg) +{ + (void)fd; + (void)events; + (void)arg; + log_debug(LD_SCHED, "Scheduler event callback called"); + + tor_assert(run_sched_ev); + + /* Run the scheduler */ + scheduler_run(); + + /* Do we have more work to do? */ + if (scheduler_more_work()) scheduler_retrigger(); +} + +/** Mark a channel as no longer ready to accept writes */ + +MOCK_IMPL(void, +scheduler_channel_doesnt_want_writes,(channel_t *chan)) +{ + tor_assert(chan); + + tor_assert(channels_pending); + + /* If it's already in pending, we can put it in waiting_to_write */ + if (chan->scheduler_state == SCHED_CHAN_PENDING) { + /* + * It's in channels_pending, so it shouldn't be in any of + * the other lists. It can't write any more, so it goes to + * channels_waiting_to_write. + */ + smartlist_pqueue_remove(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx), + chan); + chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p went from pending " + "to waiting_to_write", + U64_PRINTF_ARG(chan->global_identifier), chan); + } else { + /* + * It's not in pending, so it can't become waiting_to_write; it's + * either not in any of the lists (nothing to do) or it's already in + * waiting_for_cells (remove it, can't write any more). + */ + if (chan->scheduler_state == SCHED_CHAN_WAITING_FOR_CELLS) { + chan->scheduler_state = SCHED_CHAN_IDLE; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p left waiting_for_cells", + U64_PRINTF_ARG(chan->global_identifier), chan); + } + } +} + +/** Mark a channel as having waiting cells */ + +MOCK_IMPL(void, +scheduler_channel_has_waiting_cells,(channel_t *chan)) +{ + int became_pending = 0; + + tor_assert(chan); + tor_assert(channels_pending); + + /* First, check if this one also writeable */ + if (chan->scheduler_state == SCHED_CHAN_WAITING_FOR_CELLS) { + /* + * It's in channels_waiting_for_cells, so it shouldn't be in any of + * the other lists. It has waiting cells now, so it goes to + * channels_pending. + */ + chan->scheduler_state = SCHED_CHAN_PENDING; + smartlist_pqueue_add(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx), + chan); + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p went from waiting_for_cells " + "to pending", + U64_PRINTF_ARG(chan->global_identifier), chan); + became_pending = 1; + } else { + /* + * It's not in waiting_for_cells, so it can't become pending; it's + * either not in any of the lists (we add it to waiting_to_write) + * or it's already in waiting_to_write or pending (we do nothing) + */ + if (!(chan->scheduler_state == SCHED_CHAN_WAITING_TO_WRITE || + chan->scheduler_state == SCHED_CHAN_PENDING)) { + chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p entered waiting_to_write", + U64_PRINTF_ARG(chan->global_identifier), chan); + } + } + + /* + * If we made a channel pending, we potentially have scheduling work + * to do. + */ + if (became_pending) scheduler_retrigger(); +} + +/** Set up the scheduling system */ + +void +scheduler_init(void) +{ + log_debug(LD_SCHED, "Initting scheduler"); + + tor_assert(!run_sched_ev); + run_sched_ev = tor_event_new(tor_libevent_get_base(), -1, + 0, scheduler_evt_callback, NULL); + + channels_pending = smartlist_new(); + queue_heuristic = 0; + queue_heuristic_timestamp = approx_time(); +} + +/** Check if there's more scheduling work */ + +static int +scheduler_more_work(void) +{ + tor_assert(channels_pending); + + return ((scheduler_get_queue_heuristic() < sched_q_low_water) && + ((smartlist_len(channels_pending) > 0))) ? 1 : 0; +} + +/** Retrigger the scheduler in a way safe to use from the callback */ + +static void +scheduler_retrigger(void) +{ + tor_assert(run_sched_ev); + event_active(run_sched_ev, EV_TIMEOUT, 1); +} + +/** Notify the scheduler of a channel being closed */ + +MOCK_IMPL(void, +scheduler_release_channel,(channel_t *chan)) +{ + tor_assert(chan); + tor_assert(channels_pending); + + if (chan->scheduler_state == SCHED_CHAN_PENDING) { + smartlist_pqueue_remove(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx), + chan); + } + + chan->scheduler_state = SCHED_CHAN_IDLE; +} + +/** Run the scheduling algorithm if necessary */ + +MOCK_IMPL(void, +scheduler_run, (void)) +{ + int n_cells, n_chans_before, n_chans_after; + uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after; + ssize_t flushed, flushed_this_time; + smartlist_t *to_readd = NULL; + channel_t *chan = NULL; + + log_debug(LD_SCHED, "We have a chance to run the scheduler"); + + if (scheduler_get_queue_heuristic() < sched_q_low_water) { + n_chans_before = smartlist_len(channels_pending); + q_len_before = channel_get_global_queue_estimate(); + q_heur_before = scheduler_get_queue_heuristic(); + + while (scheduler_get_queue_heuristic() <= sched_q_high_water && + smartlist_len(channels_pending) > 0) { + /* Pop off a channel */ + chan = smartlist_pqueue_pop(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx)); + tor_assert(chan); + + /* Figure out how many cells we can write */ + n_cells = channel_num_cells_writeable(chan); + if (n_cells > 0) { + log_debug(LD_SCHED, + "Scheduler saw pending channel " U64_FORMAT " at %p with " + "%d cells writeable", + U64_PRINTF_ARG(chan->global_identifier), chan, n_cells); + + flushed = 0; + while (flushed < n_cells && + scheduler_get_queue_heuristic() <= sched_q_high_water) { + flushed_this_time = + channel_flush_some_cells(chan, + MIN(sched_max_flush_cells, + n_cells - flushed)); + if (flushed_this_time <= 0) break; + flushed += flushed_this_time; + } + + if (flushed < n_cells) { + /* We ran out of cells to flush */ + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "entered waiting_for_cells from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + /* The channel may still have some cells */ + if (channel_more_to_flush(chan)) { + /* The channel goes to either pending or waiting_to_write */ + if (channel_num_cells_writeable(chan) > 0) { + /* Add it back to pending later */ + if (!to_readd) to_readd = smartlist_new(); + smartlist_add(to_readd, chan); + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "is still pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + /* It's waiting to be able to write more */ + chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "entered waiting_to_write from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } + } else { + /* No cells left; it can go to idle or waiting_for_cells */ + if (channel_num_cells_writeable(chan) > 0) { + /* + * It can still accept writes, so it goes to + * waiting_for_cells + */ + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "entered waiting_for_cells from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + /* + * We exactly filled up the output queue with all available + * cells; go to idle. + */ + chan->scheduler_state = SCHED_CHAN_IDLE; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "become idle from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } + } + } + + log_debug(LD_SCHED, + "Scheduler flushed %d cells onto pending channel " + U64_FORMAT " at %p", + (int)flushed, U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + log_info(LD_SCHED, + "Scheduler saw pending channel " U64_FORMAT " at %p with " + "no cells writeable", + U64_PRINTF_ARG(chan->global_identifier), chan); + /* Put it back to WAITING_TO_WRITE */ + chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; + } + } + + /* Readd any channels we need to */ + if (to_readd) { + SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, chan) { + chan->scheduler_state = SCHED_CHAN_PENDING; + smartlist_pqueue_add(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx), + chan); + } SMARTLIST_FOREACH_END(chan); + smartlist_free(to_readd); + } + + n_chans_after = smartlist_len(channels_pending); + q_len_after = channel_get_global_queue_estimate(); + q_heur_after = scheduler_get_queue_heuristic(); + log_debug(LD_SCHED, + "Scheduler handled %d of %d pending channels, queue size from " + U64_FORMAT " to " U64_FORMAT ", queue heuristic from " + U64_FORMAT " to " U64_FORMAT, + n_chans_before - n_chans_after, n_chans_before, + U64_PRINTF_ARG(q_len_before), U64_PRINTF_ARG(q_len_after), + U64_PRINTF_ARG(q_heur_before), U64_PRINTF_ARG(q_heur_after)); + } +} + +/** Trigger the scheduling event so we run the scheduler later */ + +#if 0 +static void +scheduler_trigger(void) +{ + log_debug(LD_SCHED, "Triggering scheduler event"); + + tor_assert(run_sched_ev); + + event_add(run_sched_ev, EV_TIMEOUT, 1); +} +#endif + +/** Mark a channel as ready to accept writes */ + +void +scheduler_channel_wants_writes(channel_t *chan) +{ + int became_pending = 0; + + tor_assert(chan); + tor_assert(channels_pending); + + /* If it's already in waiting_to_write, we can put it in pending */ + if (chan->scheduler_state == SCHED_CHAN_WAITING_TO_WRITE) { + /* + * It can write now, so it goes to channels_pending. + */ + smartlist_pqueue_add(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx), + chan); + chan->scheduler_state = SCHED_CHAN_PENDING; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p went from waiting_to_write " + "to pending", + U64_PRINTF_ARG(chan->global_identifier), chan); + became_pending = 1; + } else { + /* + * It's not in SCHED_CHAN_WAITING_TO_WRITE, so it can't become pending; + * it's either idle and goes to WAITING_FOR_CELLS, or it's a no-op. + */ + if (!(chan->scheduler_state == SCHED_CHAN_WAITING_FOR_CELLS || + chan->scheduler_state == SCHED_CHAN_PENDING)) { + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p entered waiting_for_cells", + U64_PRINTF_ARG(chan->global_identifier), chan); + } + } + + /* + * If we made a channel pending, we potentially have scheduling work + * to do. + */ + if (became_pending) scheduler_retrigger(); +} + +/** + * Notify the scheduler that a channel's position in the pqueue may have + * changed + */ + +void +scheduler_touch_channel(channel_t *chan) +{ + tor_assert(chan); + + if (chan->scheduler_state == SCHED_CHAN_PENDING) { + /* Remove and re-add it */ + smartlist_pqueue_remove(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx), + chan); + smartlist_pqueue_add(channels_pending, + scheduler_compare_channels, + STRUCT_OFFSET(channel_t, sched_heap_idx), + chan); + } + /* else no-op, since it isn't in the queue */ +} + +/** + * Notify the scheduler of a queue size adjustment, to recalculate the + * queue heuristic. + */ + +void +scheduler_adjust_queue_size(channel_t *chan, char dir, uint64_t adj) +{ + time_t now = approx_time(); + + log_debug(LD_SCHED, + "Queue size adjustment by %s" U64_FORMAT " for channel " + U64_FORMAT, + (dir >= 0) ? "+" : "-", + U64_PRINTF_ARG(adj), + U64_PRINTF_ARG(chan->global_identifier)); + + /* Get the queue heuristic up to date */ + scheduler_update_queue_heuristic(now); + + /* Adjust as appropriate */ + if (dir >= 0) { + /* Increasing it */ + queue_heuristic += adj; + } else { + /* Decreasing it */ + if (queue_heuristic > adj) queue_heuristic -= adj; + else queue_heuristic = 0; + } + + log_debug(LD_SCHED, + "Queue heuristic is now " U64_FORMAT, + U64_PRINTF_ARG(queue_heuristic)); +} + +/** + * Query the current value of the queue heuristic + */ + +STATIC uint64_t +scheduler_get_queue_heuristic(void) +{ + time_t now = approx_time(); + + scheduler_update_queue_heuristic(now); + + return queue_heuristic; +} + +/** + * Adjust the queue heuristic value to the present time + */ + +STATIC void +scheduler_update_queue_heuristic(time_t now) +{ + time_t diff; + + if (queue_heuristic_timestamp == 0) { + /* + * Nothing we can sensibly do; must not have been initted properly. + * Oh well. + */ + queue_heuristic_timestamp = now; + } else if (queue_heuristic_timestamp < now) { + diff = now - queue_heuristic_timestamp; + /* + * This is a simple exponential age-out; the other proposed alternative + * was a linear age-out using the bandwidth history in rephist.c; I'm + * going with this out of concern that if an adversary can jam the + * scheduler long enough, it would cause the bandwidth to drop to + * zero and render the aging mechanism ineffective thereafter. + */ + if (0 <= diff && diff < 64) queue_heuristic >>= diff; + else queue_heuristic = 0; + + queue_heuristic_timestamp = now; + + log_debug(LD_SCHED, + "Queue heuristic is now " U64_FORMAT, + U64_PRINTF_ARG(queue_heuristic)); + } + /* else no update needed, or time went backward */ +} + +/** + * Set scheduler watermarks and flush size + */ + +void +scheduler_set_watermarks(uint32_t lo, uint32_t hi, uint32_t max_flush) +{ + /* Sanity assertions - caller should ensure these are true */ + tor_assert(lo > 0); + tor_assert(hi > lo); + tor_assert(max_flush > 0); + + sched_q_low_water = lo; + sched_q_high_water = hi; + sched_max_flush_cells = max_flush; +} diff --git a/src/or/scheduler.h b/src/or/scheduler.h new file mode 100644 index 0000000000..404776b18b --- /dev/null +++ b/src/or/scheduler.h @@ -0,0 +1,50 @@ +/* * Copyright (c) 2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file scheduler.h + * \brief Header file for scheduler.c + **/ + +#ifndef TOR_SCHEDULER_H +#define TOR_SCHEDULER_H + +#include "or.h" +#include "channel.h" +#include "testsupport.h" + +/* Global-visibility scheduler functions */ + +/* Set up and shut down the scheduler from main.c */ +void scheduler_free_all(void); +void scheduler_init(void); +MOCK_DECL(void, scheduler_run, (void)); + +/* Mark channels as having cells or wanting/not wanting writes */ +MOCK_DECL(void,scheduler_channel_doesnt_want_writes,(channel_t *chan)); +MOCK_DECL(void,scheduler_channel_has_waiting_cells,(channel_t *chan)); +void scheduler_channel_wants_writes(channel_t *chan); + +/* Notify the scheduler of a channel being closed */ +MOCK_DECL(void,scheduler_release_channel,(channel_t *chan)); + +/* Notify scheduler of queue size adjustments */ +void scheduler_adjust_queue_size(channel_t *chan, char dir, uint64_t adj); + +/* Notify scheduler that a channel's queue position may have changed */ +void scheduler_touch_channel(channel_t *chan); + +/* Adjust the watermarks from config file*/ +void scheduler_set_watermarks(uint32_t lo, uint32_t hi, uint32_t max_flush); + +/* Things only scheduler.c and its test suite should see */ + +#ifdef SCHEDULER_PRIVATE_ +MOCK_DECL(STATIC int, scheduler_compare_channels, + (const void *c1_v, const void *c2_v)); +STATIC uint64_t scheduler_get_queue_heuristic(void); +STATIC void scheduler_update_queue_heuristic(time_t now); +#endif + +#endif /* !defined(TOR_SCHEDULER_H) */ + diff --git a/src/test/Makefile.nmake b/src/test/Makefile.nmake index f6ee7f3f53..0435617683 100644 --- a/src/test/Makefile.nmake +++ b/src/test/Makefile.nmake @@ -11,11 +11,12 @@ LIBS = ..\..\..\build-alpha\lib\libevent.lib \ ws2_32.lib advapi32.lib shell32.lib \ crypt32.lib gdi32.lib user32.lib -TEST_OBJECTS = test.obj test_addr.obj test_containers.obj \ +TEST_OBJECTS = test.obj test_addr.obj test_channel.obj test_channeltls.obj \ + test_containers.obj \ test_controller_events.obj test_crypto.obj test_data.obj test_dir.obj \ test_checkdir.obj test_microdesc.obj test_pt.obj test_util.obj test_config.obj \ - test_cell_formats.obj test_replay.obj test_introduce.obj tinytest.obj \ - test_hs.obj + test_cell_formats.obj test_relay.obj test_replay.obj \ + test_scheduler.obj test_introduce.obj test_hs.obj tinytest.obj tinytest.obj: ..\ext\tinytest.c $(CC) $(CFLAGS) /D snprintf=_snprintf /c ..\ext\tinytest.c diff --git a/src/test/fakechans.h b/src/test/fakechans.h new file mode 100644 index 0000000000..b129ab49a5 --- /dev/null +++ b/src/test/fakechans.h @@ -0,0 +1,25 @@ + /* Copyright (c) 2014, The Tor Project, Inc. */ + /* See LICENSE for licensing information */ + +#ifndef TOR_FAKECHANS_H +#define TOR_FAKECHANS_H + +/** + * \file fakechans.h + * \brief Declarations for fake channels for test suite use + */ + +void make_fake_cell(cell_t *c); +void make_fake_var_cell(var_cell_t *c); +channel_t * new_fake_channel(void); + +/* Also exposes some a mock used by both test_channel.c and test_relay.c */ +void scheduler_channel_has_waiting_cells_mock(channel_t *ch); +void scheduler_release_channel_mock(channel_t *ch); + +/* Query some counters used by the exposed mocks */ +int get_mock_scheduler_has_waiting_cells_count(void); +int get_mock_scheduler_release_channel_count(void); + +#endif /* !defined(TOR_FAKECHANS_H) */ + diff --git a/src/test/include.am b/src/test/include.am index 9abf3094eb..51c9f73739 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -20,6 +20,8 @@ src_test_test_SOURCES = \ src/test/test_addr.c \ src/test/test_buffers.c \ src/test/test_cell_formats.c \ + src/test/test_channel.c \ + src/test/test_channeltls.c \ src/test/test_circuitlist.c \ src/test/test_circuitmux.c \ src/test/test_containers.c \ @@ -39,8 +41,10 @@ src_test_test_SOURCES = \ src/test/test_options.c \ src/test/test_pt.c \ src/test/test_relaycell.c \ + src/test/test_relay.c \ src/test/test_replay.c \ src/test/test_routerkeys.c \ + src/test/test_scheduler.c \ src/test/test_socks.c \ src/test/test_util.c \ src/test/test_config.c \ diff --git a/src/test/test.c b/src/test/test.c index 07901ab107..fbe5625300 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -1308,6 +1308,11 @@ extern struct testcase_t accounting_tests[]; extern struct testcase_t policy_tests[]; extern struct testcase_t status_tests[]; extern struct testcase_t routerset_tests[]; +extern struct testcase_t router_tests[]; +extern struct testcase_t channel_tests[]; +extern struct testcase_t channeltls_tests[]; +extern struct testcase_t relay_tests[]; +extern struct testcase_t scheduler_tests[]; static struct testgroup_t testgroups[] = { { "", test_array }, @@ -1342,6 +1347,10 @@ static struct testgroup_t testgroups[] = { { "policy/" , policy_tests }, { "status/" , status_tests }, { "routerset/" , routerset_tests }, + { "channel/", channel_tests }, + { "channeltls/", channeltls_tests }, + { "relay/" , relay_tests }, + { "scheduler/", scheduler_tests }, END_OF_GROUPS }; diff --git a/src/test/test_channel.c b/src/test/test_channel.c new file mode 100644 index 0000000000..49590f52c0 --- /dev/null +++ b/src/test/test_channel.c @@ -0,0 +1,1669 @@ +/* Copyright (c) 2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#define TOR_CHANNEL_INTERNAL_ +#define CHANNEL_PRIVATE_ +#include "or.h" +#include "channel.h" +/* For channel_note_destroy_not_pending */ +#include "circuitlist.h" +#include "circuitmux.h" +/* For var_cell_free */ +#include "connection_or.h" +/* For packed_cell stuff */ +#define RELAY_PRIVATE +#include "relay.h" +/* For init/free stuff */ +#include "scheduler.h" + +/* Test suite stuff */ +#include "test.h" +#include "fakechans.h" + +/* This comes from channel.c */ +extern uint64_t estimated_total_queue_size; + +static int test_chan_accept_cells = 0; +static int test_chan_fixed_cells_recved = 0; +static int test_chan_var_cells_recved = 0; +static int test_cells_written = 0; +static int test_destroy_not_pending_calls = 0; +static int test_doesnt_want_writes_count = 0; +static int test_dumpstats_calls = 0; +static int test_has_waiting_cells_count = 0; +static double test_overhead_estimate = 1.0f; +static int test_releases_count = 0; +static circuitmux_t *test_target_cmux = NULL; +static unsigned int test_cmux_cells = 0; +static channel_t *dump_statistics_mock_target = NULL; +static int dump_statistics_mock_matches = 0; + +static void chan_test_channel_dump_statistics_mock( + channel_t *chan, int severity); +static int chan_test_channel_flush_from_first_active_circuit_mock( + channel_t *chan, int max); +static unsigned int chan_test_circuitmux_num_cells_mock(circuitmux_t *cmux); +static void channel_note_destroy_not_pending_mock(channel_t *ch, + circid_t circid); +static void chan_test_cell_handler(channel_t *ch, + cell_t *cell); +static const char * chan_test_describe_transport(channel_t *ch); +static void chan_test_dumpstats(channel_t *ch, int severity); +static void chan_test_var_cell_handler(channel_t *ch, + var_cell_t *var_cell); +static void chan_test_close(channel_t *ch); +static void chan_test_error(channel_t *ch); +static void chan_test_finish_close(channel_t *ch); +static const char * chan_test_get_remote_descr(channel_t *ch, int flags); +static int chan_test_is_canonical(channel_t *ch, int req); +static size_t chan_test_num_bytes_queued(channel_t *ch); +static int chan_test_num_cells_writeable(channel_t *ch); +static int chan_test_write_cell(channel_t *ch, cell_t *cell); +static int chan_test_write_packed_cell(channel_t *ch, + packed_cell_t *packed_cell); +static int chan_test_write_var_cell(channel_t *ch, var_cell_t *var_cell); +static void scheduler_channel_doesnt_want_writes_mock(channel_t *ch); + +static void test_channel_dumpstats(void *arg); +static void test_channel_flush(void *arg); +static void test_channel_flushmux(void *arg); +static void test_channel_incoming(void *arg); +static void test_channel_lifecycle(void *arg); +static void test_channel_multi(void *arg); +static void test_channel_queue_size(void *arg); +static void test_channel_write(void *arg); + +static void +channel_note_destroy_not_pending_mock(channel_t *ch, + circid_t circid) +{ + (void)ch; + (void)circid; + + ++test_destroy_not_pending_calls; +} + +static const char * +chan_test_describe_transport(channel_t *ch) +{ + tt_assert(ch != NULL); + + done: + return "Fake channel for unit tests"; +} + +/** + * Mock for channel_dump_statistics(); if the channel matches the + * target, bump a counter - otherwise ignore. + */ + +static void +chan_test_channel_dump_statistics_mock(channel_t *chan, int severity) +{ + tt_assert(chan != NULL); + + (void)severity; + + if (chan != NULL && chan == dump_statistics_mock_target) { + ++dump_statistics_mock_matches; + } + + done: + return; +} + +/** + * If the target cmux is the cmux for chan, make fake cells up to the + * target number of cells and write them to chan. Otherwise, invoke + * the real channel_flush_from_first_active_circuit(). + */ + +static int +chan_test_channel_flush_from_first_active_circuit_mock(channel_t *chan, + int max) +{ + int result = 0, c = 0; + packed_cell_t *cell = NULL; + + tt_assert(chan != NULL); + if (test_target_cmux != NULL && + test_target_cmux == chan->cmux) { + while (c <= max && test_cmux_cells > 0) { + cell = packed_cell_new(); + channel_write_packed_cell(chan, cell); + ++c; + --test_cmux_cells; + } + result = c; + } else { + result = channel_flush_from_first_active_circuit__real(chan, max); + } + + done: + return result; +} + +/** + * If we have a target cmux set and this matches it, lie about how + * many cells we have according to the number indicated; otherwise + * pass to the real circuitmux_num_cells(). + */ + +static unsigned int +chan_test_circuitmux_num_cells_mock(circuitmux_t *cmux) +{ + unsigned int result = 0; + + tt_assert(cmux != NULL); + if (cmux != NULL) { + if (cmux == test_target_cmux) { + result = test_cmux_cells; + } else { + result = circuitmux_num_cells__real(cmux); + } + } + + done: + + return result; +} + +/* + * Handle an incoming fixed-size cell for unit tests + */ + +static void +chan_test_cell_handler(channel_t *ch, + cell_t *cell) +{ + tt_assert(ch); + tt_assert(cell); + + tor_free(cell); + ++test_chan_fixed_cells_recved; + + done: + return; +} + +/* + * Fake transport-specific stats call + */ + +static void +chan_test_dumpstats(channel_t *ch, int severity) +{ + tt_assert(ch != NULL); + + (void)severity; + + ++test_dumpstats_calls; + + done: + return; +} + +/* + * Handle an incoming variable-size cell for unit tests + */ + +static void +chan_test_var_cell_handler(channel_t *ch, + var_cell_t *var_cell) +{ + tt_assert(ch); + tt_assert(var_cell); + + tor_free(var_cell); + ++test_chan_var_cells_recved; + + done: + return; +} + +static void +chan_test_close(channel_t *ch) +{ + tt_assert(ch); + + done: + return; +} + +/* + * Close a channel through the error path + */ + +static void +chan_test_error(channel_t *ch) +{ + tt_assert(ch); + tt_assert(!(ch->state == CHANNEL_STATE_CLOSING || + ch->state == CHANNEL_STATE_ERROR || + ch->state == CHANNEL_STATE_CLOSED)); + + channel_close_for_error(ch); + + done: + return; +} + +/* + * Finish closing a channel from CHANNEL_STATE_CLOSING + */ + +static void +chan_test_finish_close(channel_t *ch) +{ + tt_assert(ch); + tt_assert(ch->state == CHANNEL_STATE_CLOSING); + + channel_closed(ch); + + done: + return; +} + +static const char * +chan_test_get_remote_descr(channel_t *ch, int flags) +{ + tt_assert(ch); + tt_int_op(flags & ~(GRD_FLAG_ORIGINAL | GRD_FLAG_ADDR_ONLY), ==, 0); + + done: + return "Fake channel for unit tests; no real endpoint"; +} + +static double +chan_test_get_overhead_estimate(channel_t *ch) +{ + tt_assert(ch); + + done: + return test_overhead_estimate; +} + +static int +chan_test_is_canonical(channel_t *ch, int req) +{ + tt_assert(ch != NULL); + tt_assert(req == 0 || req == 1); + + done: + /* Fake channels are always canonical */ + return 1; +} + +static size_t +chan_test_num_bytes_queued(channel_t *ch) +{ + tt_assert(ch); + + done: + return 0; +} + +static int +chan_test_num_cells_writeable(channel_t *ch) +{ + tt_assert(ch); + + done: + return 32; +} + +static int +chan_test_write_cell(channel_t *ch, cell_t *cell) +{ + int rv = 0; + + tt_assert(ch); + tt_assert(cell); + + if (test_chan_accept_cells) { + /* Free the cell and bump the counter */ + tor_free(cell); + ++test_cells_written; + rv = 1; + } + /* else return 0, we didn't accept it */ + + done: + return rv; +} + +static int +chan_test_write_packed_cell(channel_t *ch, + packed_cell_t *packed_cell) +{ + int rv = 0; + + tt_assert(ch); + tt_assert(packed_cell); + + if (test_chan_accept_cells) { + /* Free the cell and bump the counter */ + packed_cell_free(packed_cell); + ++test_cells_written; + rv = 1; + } + /* else return 0, we didn't accept it */ + + done: + return rv; +} + +static int +chan_test_write_var_cell(channel_t *ch, var_cell_t *var_cell) +{ + int rv = 0; + + tt_assert(ch); + tt_assert(var_cell); + + if (test_chan_accept_cells) { + /* Free the cell and bump the counter */ + var_cell_free(var_cell); + ++test_cells_written; + rv = 1; + } + /* else return 0, we didn't accept it */ + + done: + return rv; +} + +/** + * Fill out c with a new fake cell for test suite use + */ + +void +make_fake_cell(cell_t *c) +{ + tt_assert(c != NULL); + + c->circ_id = 1; + c->command = CELL_RELAY; + memset(c->payload, 0, CELL_PAYLOAD_SIZE); + + done: + return; +} + +/** + * Fill out c with a new fake var_cell for test suite use + */ + +void +make_fake_var_cell(var_cell_t *c) +{ + tt_assert(c != NULL); + + c->circ_id = 1; + c->command = CELL_VERSIONS; + c->payload_len = CELL_PAYLOAD_SIZE / 2; + memset(c->payload, 0, c->payload_len); + + done: + return; +} + +/** + * Set up a new fake channel for the test suite + */ + +channel_t * +new_fake_channel(void) +{ + channel_t *chan = tor_malloc_zero(sizeof(channel_t)); + channel_init(chan); + + chan->close = chan_test_close; + chan->get_overhead_estimate = chan_test_get_overhead_estimate; + chan->num_bytes_queued = chan_test_num_bytes_queued; + chan->num_cells_writeable = chan_test_num_cells_writeable; + chan->write_cell = chan_test_write_cell; + chan->write_packed_cell = chan_test_write_packed_cell; + chan->write_var_cell = chan_test_write_var_cell; + chan->state = CHANNEL_STATE_OPEN; + + return chan; +} + +/** + * Counter query for scheduler_channel_has_waiting_cells_mock() + */ + +int +get_mock_scheduler_has_waiting_cells_count(void) +{ + return test_has_waiting_cells_count; +} + +/** + * Mock for scheduler_channel_has_waiting_cells() + */ + +void +scheduler_channel_has_waiting_cells_mock(channel_t *ch) +{ + (void)ch; + + /* Increment counter */ + ++test_has_waiting_cells_count; + + return; +} + +static void +scheduler_channel_doesnt_want_writes_mock(channel_t *ch) +{ + (void)ch; + + /* Increment counter */ + ++test_doesnt_want_writes_count; + + return; +} + +/** + * Counter query for scheduler_release_channel_mock() + */ + +int +get_mock_scheduler_release_channel_count(void) +{ + return test_releases_count; +} + +/** + * Mock for scheduler_release_channel() + */ + +void +scheduler_release_channel_mock(channel_t *ch) +{ + (void)ch; + + /* Increment counter */ + ++test_releases_count; + + return; +} + +/** + * Test for channel_dumpstats() and limited test for + * channel_dump_statistics() + */ + +static void +test_channel_dumpstats(void *arg) +{ + channel_t *ch = NULL; + cell_t *cell = NULL; + int old_count; + + (void)arg; + + /* Mock these for duration of the test */ + MOCK(scheduler_channel_doesnt_want_writes, + scheduler_channel_doesnt_want_writes_mock); + MOCK(scheduler_release_channel, + scheduler_release_channel_mock); + + /* Set up a new fake channel */ + ch = new_fake_channel(); + tt_assert(ch); + ch->cmux = circuitmux_alloc(); + + /* Try to register it */ + channel_register(ch); + tt_assert(ch->registered); + + /* Set up mock */ + dump_statistics_mock_target = ch; + dump_statistics_mock_matches = 0; + MOCK(channel_dump_statistics, + chan_test_channel_dump_statistics_mock); + + /* Call channel_dumpstats() */ + channel_dumpstats(LOG_DEBUG); + + /* Assert that we hit the mock */ + tt_int_op(dump_statistics_mock_matches, ==, 1); + + /* Close the channel */ + channel_mark_for_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSING); + chan_test_finish_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSED); + + /* Try again and hit the finished channel */ + channel_dumpstats(LOG_DEBUG); + tt_int_op(dump_statistics_mock_matches, ==, 2); + + channel_run_cleanup(); + ch = NULL; + + /* Now we should hit nothing */ + channel_dumpstats(LOG_DEBUG); + tt_int_op(dump_statistics_mock_matches, ==, 2); + + /* Unmock */ + UNMOCK(channel_dump_statistics); + dump_statistics_mock_target = NULL; + dump_statistics_mock_matches = 0; + + /* Now make another channel */ + ch = new_fake_channel(); + tt_assert(ch); + ch->cmux = circuitmux_alloc(); + channel_register(ch); + tt_assert(ch->registered); + /* Lie about its age so dumpstats gets coverage for rate calculations */ + ch->timestamp_created = time(NULL) - 30; + tt_assert(ch->timestamp_created > 0); + tt_assert(time(NULL) > ch->timestamp_created); + + /* Put cells through it both ways to make the counters non-zero */ + cell = tor_malloc_zero(sizeof(*cell)); + make_fake_cell(cell); + test_chan_accept_cells = 1; + old_count = test_cells_written; + channel_write_cell(ch, cell); + cell = NULL; + tt_int_op(test_cells_written, ==, old_count + 1); + tt_assert(ch->n_bytes_xmitted > 0); + tt_assert(ch->n_cells_xmitted > 0); + + /* Receive path */ + channel_set_cell_handlers(ch, + chan_test_cell_handler, + chan_test_var_cell_handler); + tt_ptr_op(channel_get_cell_handler(ch), ==, chan_test_cell_handler); + tt_ptr_op(channel_get_var_cell_handler(ch), ==, chan_test_var_cell_handler); + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + old_count = test_chan_fixed_cells_recved; + channel_queue_cell(ch, cell); + cell = NULL; + tt_int_op(test_chan_fixed_cells_recved, ==, old_count + 1); + tt_assert(ch->n_bytes_recved > 0); + tt_assert(ch->n_cells_recved > 0); + + /* Test channel_dump_statistics */ + ch->describe_transport = chan_test_describe_transport; + ch->dumpstats = chan_test_dumpstats; + ch->get_remote_descr = chan_test_get_remote_descr; + ch->is_canonical = chan_test_is_canonical; + old_count = test_dumpstats_calls; + channel_dump_statistics(ch, LOG_DEBUG); + tt_int_op(test_dumpstats_calls, ==, old_count + 1); + + /* Close the channel */ + channel_mark_for_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSING); + chan_test_finish_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSED); + channel_run_cleanup(); + ch = NULL; + + done: + tor_free(cell); + tor_free(ch); + + UNMOCK(scheduler_channel_doesnt_want_writes); + UNMOCK(scheduler_release_channel); + + return; +} + +static void +test_channel_flush(void *arg) +{ + channel_t *ch = NULL; + cell_t *cell = NULL; + packed_cell_t *p_cell = NULL; + var_cell_t *v_cell = NULL; + int init_count; + + (void)arg; + +#ifdef ENABLE_MEMPOOLS + init_cell_pool(); +#endif /* ENABLE_MEMPOOLS */ + + ch = new_fake_channel(); + tt_assert(ch); + + /* Cache the original count */ + init_count = test_cells_written; + + /* Stop accepting so we can queue some */ + test_chan_accept_cells = 0; + + /* Queue a regular cell */ + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + channel_write_cell(ch, cell); + /* It should be queued, so assert that we didn't write it */ + tt_int_op(test_cells_written, ==, init_count); + + /* Queue a var cell */ + v_cell = tor_malloc_zero(sizeof(var_cell_t) + CELL_PAYLOAD_SIZE); + make_fake_var_cell(v_cell); + channel_write_var_cell(ch, v_cell); + /* It should be queued, so assert that we didn't write it */ + tt_int_op(test_cells_written, ==, init_count); + + /* Try a packed cell now */ + p_cell = packed_cell_new(); + tt_assert(p_cell); + channel_write_packed_cell(ch, p_cell); + /* It should be queued, so assert that we didn't write it */ + tt_int_op(test_cells_written, ==, init_count); + + /* Now allow writes through again */ + test_chan_accept_cells = 1; + + /* ...and flush */ + channel_flush_cells(ch); + + /* All three should have gone through */ + tt_int_op(test_cells_written, ==, init_count + 3); + + done: + tor_free(ch); +#ifdef ENABLE_MEMPOOLS + free_cell_pool(); +#endif /* ENABLE_MEMPOOLS */ + + return; +} + +/** + * Channel flush tests that require cmux mocking + */ + +static void +test_channel_flushmux(void *arg) +{ + channel_t *ch = NULL; + int old_count, q_len_before, q_len_after; + ssize_t result; + + (void)arg; + +#ifdef ENABLE_MEMPOOLS + init_cell_pool(); +#endif /* ENABLE_MEMPOOLS */ + + /* Install mocks we need for this test */ + MOCK(channel_flush_from_first_active_circuit, + chan_test_channel_flush_from_first_active_circuit_mock); + MOCK(circuitmux_num_cells, + chan_test_circuitmux_num_cells_mock); + + ch = new_fake_channel(); + tt_assert(ch); + ch->cmux = circuitmux_alloc(); + + old_count = test_cells_written; + + test_target_cmux = ch->cmux; + test_cmux_cells = 1; + + /* Enable cell acceptance */ + test_chan_accept_cells = 1; + + result = channel_flush_some_cells(ch, 1); + + tt_int_op(result, ==, 1); + tt_int_op(test_cells_written, ==, old_count + 1); + tt_int_op(test_cmux_cells, ==, 0); + + /* Now try it without accepting to force them into the queue */ + test_chan_accept_cells = 0; + test_cmux_cells = 1; + q_len_before = chan_cell_queue_len(&(ch->outgoing_queue)); + + result = channel_flush_some_cells(ch, 1); + + /* We should not have actually flushed any */ + tt_int_op(result, ==, 0); + tt_int_op(test_cells_written, ==, old_count + 1); + /* But we should have gotten to the fake cellgen loop */ + tt_int_op(test_cmux_cells, ==, 0); + /* ...and we should have a queued cell */ + q_len_after = chan_cell_queue_len(&(ch->outgoing_queue)); + tt_int_op(q_len_after, ==, q_len_before + 1); + + /* Now accept cells again and drain the queue */ + test_chan_accept_cells = 1; + channel_flush_cells(ch); + tt_int_op(test_cells_written, ==, old_count + 2); + tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), ==, 0); + + test_target_cmux = NULL; + test_cmux_cells = 0; + + done: + tor_free(ch); + + UNMOCK(channel_flush_from_first_active_circuit); + UNMOCK(circuitmux_num_cells); + + test_chan_accept_cells = 0; + +#ifdef ENABLE_MEMPOOLS + free_cell_pool(); +#endif /* ENABLE_MEMPOOLS */ + + return; +} + +static void +test_channel_incoming(void *arg) +{ + channel_t *ch = NULL; + cell_t *cell = NULL; + var_cell_t *var_cell = NULL; + int old_count; + + (void)arg; + + /* Mock these for duration of the test */ + MOCK(scheduler_channel_doesnt_want_writes, + scheduler_channel_doesnt_want_writes_mock); + MOCK(scheduler_release_channel, + scheduler_release_channel_mock); + + /* Accept cells to lower layer */ + test_chan_accept_cells = 1; + /* Use default overhead factor */ + test_overhead_estimate = 1.0f; + + ch = new_fake_channel(); + tt_assert(ch); + /* Start it off in OPENING */ + ch->state = CHANNEL_STATE_OPENING; + /* We'll need a cmux */ + ch->cmux = circuitmux_alloc(); + + /* Install incoming cell handlers */ + channel_set_cell_handlers(ch, + chan_test_cell_handler, + chan_test_var_cell_handler); + /* Test cell handler getters */ + tt_ptr_op(channel_get_cell_handler(ch), ==, chan_test_cell_handler); + tt_ptr_op(channel_get_var_cell_handler(ch), ==, chan_test_var_cell_handler); + + /* Try to register it */ + channel_register(ch); + tt_assert(ch->registered); + + /* Open it */ + channel_change_state(ch, CHANNEL_STATE_OPEN); + tt_int_op(ch->state, ==, CHANNEL_STATE_OPEN); + + /* Receive a fixed cell */ + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + old_count = test_chan_fixed_cells_recved; + channel_queue_cell(ch, cell); + cell = NULL; + tt_int_op(test_chan_fixed_cells_recved, ==, old_count + 1); + + /* Receive a variable-size cell */ + var_cell = tor_malloc_zero(sizeof(var_cell_t) + CELL_PAYLOAD_SIZE); + make_fake_var_cell(var_cell); + old_count = test_chan_var_cells_recved; + channel_queue_var_cell(ch, var_cell); + var_cell = NULL; + tt_int_op(test_chan_var_cells_recved, ==, old_count + 1); + + /* Close it */ + channel_mark_for_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSING); + chan_test_finish_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSED); + channel_run_cleanup(); + ch = NULL; + + done: + tor_free(ch); + tor_free(cell); + tor_free(var_cell); + + UNMOCK(scheduler_channel_doesnt_want_writes); + UNMOCK(scheduler_release_channel); + + return; +} + +/** + * Normal channel lifecycle test: + * + * OPENING->OPEN->MAINT->OPEN->CLOSING->CLOSED + */ + +static void +test_channel_lifecycle(void *arg) +{ + channel_t *ch1 = NULL, *ch2 = NULL; + cell_t *cell = NULL; + int old_count, init_doesnt_want_writes_count; + int init_releases_count; + + (void)arg; + + /* Mock these for the whole lifecycle test */ + MOCK(scheduler_channel_doesnt_want_writes, + scheduler_channel_doesnt_want_writes_mock); + MOCK(scheduler_release_channel, + scheduler_release_channel_mock); + + /* Cache some initial counter values */ + init_doesnt_want_writes_count = test_doesnt_want_writes_count; + init_releases_count = test_releases_count; + + /* Accept cells to lower layer */ + test_chan_accept_cells = 1; + /* Use default overhead factor */ + test_overhead_estimate = 1.0f; + + ch1 = new_fake_channel(); + tt_assert(ch1); + /* Start it off in OPENING */ + ch1->state = CHANNEL_STATE_OPENING; + /* We'll need a cmux */ + ch1->cmux = circuitmux_alloc(); + + /* Try to register it */ + channel_register(ch1); + tt_assert(ch1->registered); + + /* Try to write a cell through (should queue) */ + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + old_count = test_cells_written; + channel_write_cell(ch1, cell); + tt_int_op(old_count, ==, test_cells_written); + + /* Move it to OPEN and flush */ + channel_change_state(ch1, CHANNEL_STATE_OPEN); + + /* Queue should drain */ + tt_int_op(old_count + 1, ==, test_cells_written); + + /* Get another one */ + ch2 = new_fake_channel(); + tt_assert(ch2); + ch2->state = CHANNEL_STATE_OPENING; + ch2->cmux = circuitmux_alloc(); + + /* Register */ + channel_register(ch2); + tt_assert(ch2->registered); + + /* Check counters */ + tt_int_op(test_doesnt_want_writes_count, ==, init_doesnt_want_writes_count); + tt_int_op(test_releases_count, ==, init_releases_count); + + /* Move ch1 to MAINT */ + channel_change_state(ch1, CHANNEL_STATE_MAINT); + tt_int_op(test_doesnt_want_writes_count, ==, + init_doesnt_want_writes_count + 1); + tt_int_op(test_releases_count, ==, init_releases_count); + + /* Move ch2 to OPEN */ + channel_change_state(ch2, CHANNEL_STATE_OPEN); + tt_int_op(test_doesnt_want_writes_count, ==, + init_doesnt_want_writes_count + 1); + tt_int_op(test_releases_count, ==, init_releases_count); + + /* Move ch1 back to OPEN */ + channel_change_state(ch1, CHANNEL_STATE_OPEN); + tt_int_op(test_doesnt_want_writes_count, ==, + init_doesnt_want_writes_count + 1); + tt_int_op(test_releases_count, ==, init_releases_count); + + /* Mark ch2 for close */ + channel_mark_for_close(ch2); + tt_int_op(ch2->state, ==, CHANNEL_STATE_CLOSING); + tt_int_op(test_doesnt_want_writes_count, ==, + init_doesnt_want_writes_count + 1); + tt_int_op(test_releases_count, ==, init_releases_count + 1); + + /* Shut down channels */ + channel_free_all(); + ch1 = ch2 = NULL; + tt_int_op(test_doesnt_want_writes_count, ==, + init_doesnt_want_writes_count + 1); + /* channel_free() calls scheduler_release_channel() */ + tt_int_op(test_releases_count, ==, init_releases_count + 4); + + done: + tor_free(ch1); + tor_free(ch2); + + UNMOCK(scheduler_channel_doesnt_want_writes); + UNMOCK(scheduler_release_channel); + + return; +} + +/** + * Weird channel lifecycle test: + * + * OPENING->CLOSING->CLOSED + * OPENING->OPEN->CLOSING->ERROR + * OPENING->OPEN->MAINT->CLOSING->CLOSED + * OPENING->OPEN->MAINT->CLOSING->ERROR + */ + +static void +test_channel_lifecycle_2(void *arg) +{ + channel_t *ch = NULL; + + (void)arg; + + /* Mock these for the whole lifecycle test */ + MOCK(scheduler_channel_doesnt_want_writes, + scheduler_channel_doesnt_want_writes_mock); + MOCK(scheduler_release_channel, + scheduler_release_channel_mock); + + /* Accept cells to lower layer */ + test_chan_accept_cells = 1; + /* Use default overhead factor */ + test_overhead_estimate = 1.0f; + + ch = new_fake_channel(); + tt_assert(ch); + /* Start it off in OPENING */ + ch->state = CHANNEL_STATE_OPENING; + /* The full lifecycle test needs a cmux */ + ch->cmux = circuitmux_alloc(); + + /* Try to register it */ + channel_register(ch); + tt_assert(ch->registered); + + /* Try to close it */ + channel_mark_for_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSING); + + /* Finish closing it */ + chan_test_finish_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSED); + channel_run_cleanup(); + ch = NULL; + + /* Now try OPENING->OPEN->CLOSING->ERROR */ + ch = new_fake_channel(); + tt_assert(ch); + ch->state = CHANNEL_STATE_OPENING; + ch->cmux = circuitmux_alloc(); + channel_register(ch); + tt_assert(ch->registered); + + /* Finish opening it */ + channel_change_state(ch, CHANNEL_STATE_OPEN); + + /* Error exit from lower layer */ + chan_test_error(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSING); + chan_test_finish_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_ERROR); + channel_run_cleanup(); + ch = NULL; + + /* OPENING->OPEN->MAINT->CLOSING->CLOSED close from maintenance state */ + ch = new_fake_channel(); + tt_assert(ch); + ch->state = CHANNEL_STATE_OPENING; + ch->cmux = circuitmux_alloc(); + channel_register(ch); + tt_assert(ch->registered); + + /* Finish opening it */ + channel_change_state(ch, CHANNEL_STATE_OPEN); + tt_int_op(ch->state, ==, CHANNEL_STATE_OPEN); + + /* Go to maintenance state */ + channel_change_state(ch, CHANNEL_STATE_MAINT); + tt_int_op(ch->state, ==, CHANNEL_STATE_MAINT); + + /* Lower layer close */ + channel_mark_for_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSING); + + /* Finish */ + chan_test_finish_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSED); + channel_run_cleanup(); + ch = NULL; + + /* + * OPENING->OPEN->MAINT->CLOSING->CLOSED lower-layer close during + * maintenance state + */ + ch = new_fake_channel(); + tt_assert(ch); + ch->state = CHANNEL_STATE_OPENING; + ch->cmux = circuitmux_alloc(); + channel_register(ch); + tt_assert(ch->registered); + + /* Finish opening it */ + channel_change_state(ch, CHANNEL_STATE_OPEN); + tt_int_op(ch->state, ==, CHANNEL_STATE_OPEN); + + /* Go to maintenance state */ + channel_change_state(ch, CHANNEL_STATE_MAINT); + tt_int_op(ch->state, ==, CHANNEL_STATE_MAINT); + + /* Lower layer close */ + channel_close_from_lower_layer(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSING); + + /* Finish */ + chan_test_finish_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSED); + channel_run_cleanup(); + ch = NULL; + + /* OPENING->OPEN->MAINT->CLOSING->ERROR */ + ch = new_fake_channel(); + tt_assert(ch); + ch->state = CHANNEL_STATE_OPENING; + ch->cmux = circuitmux_alloc(); + channel_register(ch); + tt_assert(ch->registered); + + /* Finish opening it */ + channel_change_state(ch, CHANNEL_STATE_OPEN); + tt_int_op(ch->state, ==, CHANNEL_STATE_OPEN); + + /* Go to maintenance state */ + channel_change_state(ch, CHANNEL_STATE_MAINT); + tt_int_op(ch->state, ==, CHANNEL_STATE_MAINT); + + /* Lower layer close */ + chan_test_error(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_CLOSING); + + /* Finish */ + chan_test_finish_close(ch); + tt_int_op(ch->state, ==, CHANNEL_STATE_ERROR); + channel_run_cleanup(); + ch = NULL; + + /* Shut down channels */ + channel_free_all(); + + done: + tor_free(ch); + + UNMOCK(scheduler_channel_doesnt_want_writes); + UNMOCK(scheduler_release_channel); + + return; +} + +static void +test_channel_multi(void *arg) +{ + channel_t *ch1 = NULL, *ch2 = NULL; + uint64_t global_queue_estimate; + cell_t *cell = NULL; + + (void)arg; + + /* Accept cells to lower layer */ + test_chan_accept_cells = 1; + /* Use default overhead factor */ + test_overhead_estimate = 1.0f; + + ch1 = new_fake_channel(); + tt_assert(ch1); + ch2 = new_fake_channel(); + tt_assert(ch2); + + /* Initial queue size update */ + channel_update_xmit_queue_size(ch1); + tt_int_op(ch1->bytes_queued_for_xmit, ==, 0); + channel_update_xmit_queue_size(ch2); + tt_int_op(ch2->bytes_queued_for_xmit, ==, 0); + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 0); + + /* Queue some cells, check queue estimates */ + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + channel_write_cell(ch1, cell); + + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + channel_write_cell(ch2, cell); + + channel_update_xmit_queue_size(ch1); + channel_update_xmit_queue_size(ch2); + tt_int_op(ch1->bytes_queued_for_xmit, ==, 0); + tt_int_op(ch2->bytes_queued_for_xmit, ==, 0); + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 0); + + /* Stop accepting cells at lower layer */ + test_chan_accept_cells = 0; + + /* Queue some cells and check queue estimates */ + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + channel_write_cell(ch1, cell); + + channel_update_xmit_queue_size(ch1); + tt_int_op(ch1->bytes_queued_for_xmit, ==, 512); + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 512); + + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + channel_write_cell(ch2, cell); + + channel_update_xmit_queue_size(ch2); + tt_int_op(ch2->bytes_queued_for_xmit, ==, 512); + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 1024); + + /* Allow cells through again */ + test_chan_accept_cells = 1; + + /* Flush chan 2 */ + channel_flush_cells(ch2); + + /* Update and check queue sizes */ + channel_update_xmit_queue_size(ch1); + channel_update_xmit_queue_size(ch2); + tt_int_op(ch1->bytes_queued_for_xmit, ==, 512); + tt_int_op(ch2->bytes_queued_for_xmit, ==, 0); + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 512); + + /* Flush chan 1 */ + channel_flush_cells(ch1); + + /* Update and check queue sizes */ + channel_update_xmit_queue_size(ch1); + channel_update_xmit_queue_size(ch2); + tt_int_op(ch1->bytes_queued_for_xmit, ==, 0); + tt_int_op(ch2->bytes_queued_for_xmit, ==, 0); + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 0); + + /* Now block again */ + test_chan_accept_cells = 0; + + /* Queue some cells */ + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + channel_write_cell(ch1, cell); + + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + channel_write_cell(ch2, cell); + + /* Check the estimates */ + channel_update_xmit_queue_size(ch1); + channel_update_xmit_queue_size(ch2); + tt_int_op(ch1->bytes_queued_for_xmit, ==, 512); + tt_int_op(ch2->bytes_queued_for_xmit, ==, 512); + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 1024); + + /* Now close channel 2; it should be subtracted from the global queue */ + MOCK(scheduler_release_channel, scheduler_release_channel_mock); + channel_mark_for_close(ch2); + UNMOCK(scheduler_release_channel); + + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 512); + + /* + * Since the fake channels aren't registered, channel_free_all() can't + * see them properly. + */ + MOCK(scheduler_release_channel, scheduler_release_channel_mock); + channel_mark_for_close(ch1); + UNMOCK(scheduler_release_channel); + + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 0); + + /* Now free everything */ + MOCK(scheduler_release_channel, scheduler_release_channel_mock); + channel_free_all(); + UNMOCK(scheduler_release_channel); + + done: + tor_free(ch1); + tor_free(ch2); + + return; +} + +/** + * Check some hopefully-impossible edge cases in the channel queue we + * can only trigger by doing evil things to the queue directly. + */ + +static void +test_channel_queue_impossible(void *arg) +{ + channel_t *ch = NULL; + cell_t *cell = NULL; + packed_cell_t *packed_cell = NULL; + var_cell_t *var_cell = NULL; + int old_count; + cell_queue_entry_t *q = NULL; + uint64_t global_queue_estimate; + + /* Cache the global queue size (see below) */ + global_queue_estimate = channel_get_global_queue_estimate(); + + (void)arg; + +#ifdef ENABLE_MEMPOOLS + init_cell_pool(); +#endif /* ENABLE_MEMPOOLS */ + + packed_cell = packed_cell_new(); + tt_assert(packed_cell); + + ch = new_fake_channel(); + tt_assert(ch); + + /* We test queueing here; tell it not to accept cells */ + test_chan_accept_cells = 0; + /* ...and keep it from trying to flush the queue */ + ch->state = CHANNEL_STATE_MAINT; + + /* Cache the cell written count */ + old_count = test_cells_written; + + /* Assert that the queue is initially empty */ + tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), ==, 0); + + /* Get a fresh cell and write it to the channel*/ + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + channel_write_cell(ch, cell); + + /* Now it should be queued */ + tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), ==, 1); + q = TOR_SIMPLEQ_FIRST(&(ch->outgoing_queue)); + tt_assert(q); + if (q) { + tt_int_op(q->type, ==, CELL_QUEUE_FIXED); + tt_ptr_op(q->u.fixed.cell, ==, cell); + } + /* Do perverse things to it */ + tor_free(q->u.fixed.cell); + q->u.fixed.cell = NULL; + + /* + * Now change back to open with channel_change_state() and assert that it + * gets thrown away properly. + */ + test_chan_accept_cells = 1; + channel_change_state(ch, CHANNEL_STATE_OPEN); + tt_assert(test_cells_written == old_count); + tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), ==, 0); + + /* Same thing but for a var_cell */ + + test_chan_accept_cells = 0; + ch->state = CHANNEL_STATE_MAINT; + var_cell = tor_malloc_zero(sizeof(var_cell_t) + CELL_PAYLOAD_SIZE); + make_fake_var_cell(var_cell); + channel_write_var_cell(ch, var_cell); + + /* Check that it's queued */ + tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), ==, 1); + q = TOR_SIMPLEQ_FIRST(&(ch->outgoing_queue)); + tt_assert(q); + if (q) { + tt_int_op(q->type, ==, CELL_QUEUE_VAR); + tt_ptr_op(q->u.var.var_cell, ==, var_cell); + } + + /* Remove the cell from the queue entry */ + tor_free(q->u.var.var_cell); + q->u.var.var_cell = NULL; + + /* Let it drain and check that the bad entry is discarded */ + test_chan_accept_cells = 1; + channel_change_state(ch, CHANNEL_STATE_OPEN); + tt_assert(test_cells_written == old_count); + tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), ==, 0); + + /* Same thing with a packed_cell */ + + test_chan_accept_cells = 0; + ch->state = CHANNEL_STATE_MAINT; + packed_cell = packed_cell_new(); + tt_assert(packed_cell); + channel_write_packed_cell(ch, packed_cell); + + /* Check that it's queued */ + tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), ==, 1); + q = TOR_SIMPLEQ_FIRST(&(ch->outgoing_queue)); + tt_assert(q); + if (q) { + tt_int_op(q->type, ==, CELL_QUEUE_PACKED); + tt_ptr_op(q->u.packed.packed_cell, ==, packed_cell); + } + + /* Remove the cell from the queue entry */ + packed_cell_free(q->u.packed.packed_cell); + q->u.packed.packed_cell = NULL; + + /* Let it drain and check that the bad entry is discarded */ + test_chan_accept_cells = 1; + channel_change_state(ch, CHANNEL_STATE_OPEN); + tt_assert(test_cells_written == old_count); + tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), ==, 0); + + /* Unknown cell type case */ + test_chan_accept_cells = 0; + ch->state = CHANNEL_STATE_MAINT; + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + channel_write_cell(ch, cell); + + /* Check that it's queued */ + tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), ==, 1); + q = TOR_SIMPLEQ_FIRST(&(ch->outgoing_queue)); + tt_assert(q); + if (q) { + tt_int_op(q->type, ==, CELL_QUEUE_FIXED); + tt_ptr_op(q->u.fixed.cell, ==, cell); + } + /* Clobber it, including the queue entry type */ + tor_free(q->u.fixed.cell); + q->u.fixed.cell = NULL; + q->type = CELL_QUEUE_PACKED + 1; + + /* Let it drain and check that the bad entry is discarded */ + test_chan_accept_cells = 1; + channel_change_state(ch, CHANNEL_STATE_OPEN); + tt_assert(test_cells_written == old_count); + tt_int_op(chan_cell_queue_len(&(ch->outgoing_queue)), ==, 0); + + done: + tor_free(ch); +#ifdef ENABLE_MEMPOOLS + free_cell_pool(); +#endif /* ENABLE_MEMPOOLS */ + + /* + * Doing that meant that we couldn't correctly adjust the queue size + * for the var cell, so manually reset the global queue size estimate + * so the next test doesn't break if we run with --no-fork. + */ + estimated_total_queue_size = global_queue_estimate; + + return; +} + +static void +test_channel_queue_size(void *arg) +{ + channel_t *ch = NULL; + cell_t *cell = NULL; + int n, old_count; + uint64_t global_queue_estimate; + + (void)arg; + + ch = new_fake_channel(); + tt_assert(ch); + + /* Initial queue size update */ + channel_update_xmit_queue_size(ch); + tt_int_op(ch->bytes_queued_for_xmit, ==, 0); + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 0); + + /* Test the call-through to our fake lower layer */ + n = channel_num_cells_writeable(ch); + /* chan_test_num_cells_writeable() always returns 32 */ + tt_int_op(n, ==, 32); + + /* + * Now we queue some cells and check that channel_num_cells_writeable() + * adjusts properly + */ + + /* tell it not to accept cells */ + test_chan_accept_cells = 0; + /* ...and keep it from trying to flush the queue */ + ch->state = CHANNEL_STATE_MAINT; + + /* Get a fresh cell */ + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + + old_count = test_cells_written; + channel_write_cell(ch, cell); + /* Assert that it got queued, not written through, correctly */ + tt_int_op(test_cells_written, ==, old_count); + + /* Now check chan_test_num_cells_writeable() again */ + n = channel_num_cells_writeable(ch); + tt_int_op(n, ==, 0); /* Should return 0 since we're in CHANNEL_STATE_MAINT */ + + /* Update queue size estimates */ + channel_update_xmit_queue_size(ch); + /* One cell, times an overhead factor of 1.0 */ + tt_int_op(ch->bytes_queued_for_xmit, ==, 512); + /* Try a different overhead factor */ + test_overhead_estimate = 0.5f; + /* This one should be ignored since it's below 1.0 */ + channel_update_xmit_queue_size(ch); + tt_int_op(ch->bytes_queued_for_xmit, ==, 512); + /* Now try a larger one */ + test_overhead_estimate = 2.0f; + channel_update_xmit_queue_size(ch); + tt_int_op(ch->bytes_queued_for_xmit, ==, 1024); + /* Go back to 1.0 */ + test_overhead_estimate = 1.0f; + channel_update_xmit_queue_size(ch); + tt_int_op(ch->bytes_queued_for_xmit, ==, 512); + /* Check the global estimate too */ + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 512); + + /* Go to open */ + old_count = test_cells_written; + channel_change_state(ch, CHANNEL_STATE_OPEN); + + /* + * It should try to write, but we aren't accepting cells right now, so + * it'll requeue + */ + tt_int_op(test_cells_written, ==, old_count); + + /* Check the queue size again */ + channel_update_xmit_queue_size(ch); + tt_int_op(ch->bytes_queued_for_xmit, ==, 512); + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 512); + + /* + * Now the cell is in the queue, and we're open, so we should get 31 + * writeable cells. + */ + n = channel_num_cells_writeable(ch); + tt_int_op(n, ==, 31); + + /* Accept cells again */ + test_chan_accept_cells = 1; + /* ...and re-process the queue */ + old_count = test_cells_written; + channel_flush_cells(ch); + tt_int_op(test_cells_written, ==, old_count + 1); + + /* Should have 32 writeable now */ + n = channel_num_cells_writeable(ch); + tt_int_op(n, ==, 32); + + /* Should have queue size estimate of zero */ + channel_update_xmit_queue_size(ch); + tt_int_op(ch->bytes_queued_for_xmit, ==, 0); + global_queue_estimate = channel_get_global_queue_estimate(); + tt_int_op(global_queue_estimate, ==, 0); + + /* Okay, now we're done with this one */ + MOCK(scheduler_release_channel, scheduler_release_channel_mock); + channel_mark_for_close(ch); + UNMOCK(scheduler_release_channel); + + done: + tor_free(ch); + + return; +} + +static void +test_channel_write(void *arg) +{ + channel_t *ch = NULL; + cell_t *cell = tor_malloc_zero(sizeof(cell_t)); + packed_cell_t *packed_cell = NULL; + var_cell_t *var_cell = + tor_malloc_zero(sizeof(var_cell_t) + CELL_PAYLOAD_SIZE); + int old_count; + + (void)arg; + +#ifdef ENABLE_MEMPOOLS + init_cell_pool(); +#endif /* ENABLE_MEMPOOLS */ + + packed_cell = packed_cell_new(); + tt_assert(packed_cell); + + ch = new_fake_channel(); + tt_assert(ch); + make_fake_cell(cell); + make_fake_var_cell(var_cell); + + /* Tell it to accept cells */ + test_chan_accept_cells = 1; + + old_count = test_cells_written; + channel_write_cell(ch, cell); + tt_assert(test_cells_written == old_count + 1); + + channel_write_var_cell(ch, var_cell); + tt_assert(test_cells_written == old_count + 2); + + channel_write_packed_cell(ch, packed_cell); + tt_assert(test_cells_written == old_count + 3); + + /* Now we test queueing; tell it not to accept cells */ + test_chan_accept_cells = 0; + /* ...and keep it from trying to flush the queue */ + ch->state = CHANNEL_STATE_MAINT; + + /* Get a fresh cell */ + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + + old_count = test_cells_written; + channel_write_cell(ch, cell); + tt_assert(test_cells_written == old_count); + + /* + * Now change back to open with channel_change_state() and assert that it + * gets drained from the queue. + */ + test_chan_accept_cells = 1; + channel_change_state(ch, CHANNEL_STATE_OPEN); + tt_assert(test_cells_written == old_count + 1); + + /* + * Check the note destroy case + */ + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + cell->command = CELL_DESTROY; + + /* Set up the mock */ + MOCK(channel_note_destroy_not_pending, + channel_note_destroy_not_pending_mock); + + old_count = test_destroy_not_pending_calls; + channel_write_cell(ch, cell); + tt_assert(test_destroy_not_pending_calls == old_count + 1); + + /* Now send a non-destroy and check we don't call it */ + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + channel_write_cell(ch, cell); + tt_assert(test_destroy_not_pending_calls == old_count + 1); + + UNMOCK(channel_note_destroy_not_pending); + + /* + * Now switch it to CLOSING so we can test the discard-cells case + * in the channel_write_*() functions. + */ + MOCK(scheduler_release_channel, scheduler_release_channel_mock); + channel_mark_for_close(ch); + UNMOCK(scheduler_release_channel); + + /* Send cells that will drop in the closing state */ + old_count = test_cells_written; + + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + channel_write_cell(ch, cell); + tt_assert(test_cells_written == old_count); + + var_cell = tor_malloc_zero(sizeof(var_cell_t) + CELL_PAYLOAD_SIZE); + make_fake_var_cell(var_cell); + channel_write_var_cell(ch, var_cell); + tt_assert(test_cells_written == old_count); + + packed_cell = packed_cell_new(); + channel_write_packed_cell(ch, packed_cell); + tt_assert(test_cells_written == old_count); + +#ifdef ENABLE_MEMPOOLS + free_cell_pool(); +#endif /* ENABLE_MEMPOOLS */ + + done: + tor_free(ch); + + return; +} + +struct testcase_t channel_tests[] = { + { "dumpstats", test_channel_dumpstats, TT_FORK, NULL, NULL }, + { "flush", test_channel_flush, TT_FORK, NULL, NULL }, + { "flushmux", test_channel_flushmux, TT_FORK, NULL, NULL }, + { "incoming", test_channel_incoming, TT_FORK, NULL, NULL }, + { "lifecycle", test_channel_lifecycle, TT_FORK, NULL, NULL }, + { "lifecycle_2", test_channel_lifecycle_2, TT_FORK, NULL, NULL }, + { "multi", test_channel_multi, TT_FORK, NULL, NULL }, + { "queue_impossible", test_channel_queue_impossible, TT_FORK, NULL, NULL }, + { "queue_size", test_channel_queue_size, TT_FORK, NULL, NULL }, + { "write", test_channel_write, TT_FORK, NULL, NULL }, + END_OF_TESTCASES +}; + diff --git a/src/test/test_channeltls.c b/src/test/test_channeltls.c new file mode 100644 index 0000000000..45e24dfbec --- /dev/null +++ b/src/test/test_channeltls.c @@ -0,0 +1,332 @@ +/* Copyright (c) 2014, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include <math.h> + +#define TOR_CHANNEL_INTERNAL_ +#include "or.h" +#include "address.h" +#include "buffers.h" +#include "channel.h" +#include "channeltls.h" +#include "connection_or.h" +#include "config.h" +/* For init/free stuff */ +#include "scheduler.h" +#include "tortls.h" + +/* Test suite stuff */ +#include "test.h" +#include "fakechans.h" + +/* The channeltls unit tests */ +static void test_channeltls_create(void *arg); +static void test_channeltls_num_bytes_queued(void *arg); +static void test_channeltls_overhead_estimate(void *arg); + +/* Mocks used by channeltls unit tests */ +static size_t tlschan_buf_datalen_mock(const buf_t *buf); +static or_connection_t * tlschan_connection_or_connect_mock( + const tor_addr_t *addr, + uint16_t port, + const char *digest, + channel_tls_t *tlschan); +static int tlschan_is_local_addr_mock(const tor_addr_t *addr); + +/* Fake close method */ +static void tlschan_fake_close_method(channel_t *chan); + +/* Flags controlling behavior of channeltls unit test mocks */ +static int tlschan_local = 0; +static const buf_t * tlschan_buf_datalen_mock_target = NULL; +static size_t tlschan_buf_datalen_mock_size = 0; + +/* Thing to cast to fake tor_tls_t * to appease assert_connection_ok() */ +static int fake_tortls = 0; /* Bleh... */ + +static void +test_channeltls_create(void *arg) +{ + tor_addr_t test_addr; + channel_t *ch = NULL; + const char test_digest[DIGEST_LEN] = { + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, + 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11, 0x12, 0x13, 0x14 }; + + (void)arg; + + /* Set up a fake address to fake-connect to */ + test_addr.family = AF_INET; + test_addr.addr.in_addr.s_addr = htonl(0x01020304); + + /* For this test we always want the address to be treated as non-local */ + tlschan_local = 0; + /* Install is_local_addr() mock */ + MOCK(is_local_addr, tlschan_is_local_addr_mock); + + /* Install mock for connection_or_connect() */ + MOCK(connection_or_connect, tlschan_connection_or_connect_mock); + + /* Try connecting */ + ch = channel_tls_connect(&test_addr, 567, test_digest); + tt_assert(ch != NULL); + + done: + if (ch) { + MOCK(scheduler_release_channel, scheduler_release_channel_mock); + /* + * Use fake close method that doesn't try to do too much to fake + * orconn + */ + ch->close = tlschan_fake_close_method; + channel_mark_for_close(ch); + tor_free(ch); + UNMOCK(scheduler_release_channel); + } + + UNMOCK(connection_or_connect); + UNMOCK(is_local_addr); + + return; +} + +static void +test_channeltls_num_bytes_queued(void *arg) +{ + tor_addr_t test_addr; + channel_t *ch = NULL; + const char test_digest[DIGEST_LEN] = { + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, + 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11, 0x12, 0x13, 0x14 }; + channel_tls_t *tlschan = NULL; + size_t len; + int fake_outbuf = 0, n; + + (void)arg; + + /* Set up a fake address to fake-connect to */ + test_addr.family = AF_INET; + test_addr.addr.in_addr.s_addr = htonl(0x01020304); + + /* For this test we always want the address to be treated as non-local */ + tlschan_local = 0; + /* Install is_local_addr() mock */ + MOCK(is_local_addr, tlschan_is_local_addr_mock); + + /* Install mock for connection_or_connect() */ + MOCK(connection_or_connect, tlschan_connection_or_connect_mock); + + /* Try connecting */ + ch = channel_tls_connect(&test_addr, 567, test_digest); + tt_assert(ch != NULL); + + /* + * Next, we have to test ch->num_bytes_queued, which is + * channel_tls_num_bytes_queued_method. We can't mock + * connection_get_outbuf_len() directly because it's static INLINE + * in connection.h, but we can mock buf_datalen(). Note that + * if bufferevents ever work, this will break with them enabled. + */ + + tt_assert(ch->num_bytes_queued != NULL); + tlschan = BASE_CHAN_TO_TLS(ch); + tt_assert(tlschan != NULL); + if (TO_CONN(tlschan->conn)->outbuf == NULL) { + /* We need an outbuf to make sure buf_datalen() gets called */ + fake_outbuf = 1; + TO_CONN(tlschan->conn)->outbuf = buf_new(); + } + tlschan_buf_datalen_mock_target = TO_CONN(tlschan->conn)->outbuf; + tlschan_buf_datalen_mock_size = 1024; + MOCK(buf_datalen, tlschan_buf_datalen_mock); + len = ch->num_bytes_queued(ch); + tt_int_op(len, ==, tlschan_buf_datalen_mock_size); + /* + * We also cover num_cells_writeable here; since wide_circ_ids = 0 on + * the fake tlschans, cell_network_size returns 512, and so with + * tlschan_buf_datalen_mock_size == 1024, we should be able to write + * ceil((OR_CONN_HIGHWATER - 1024) / 512) = ceil(OR_CONN_HIGHWATER / 512) + * - 2 cells. + */ + n = ch->num_cells_writeable(ch); + tt_int_op(n, ==, CEIL_DIV(OR_CONN_HIGHWATER, 512) - 2); + UNMOCK(buf_datalen); + tlschan_buf_datalen_mock_target = NULL; + tlschan_buf_datalen_mock_size = 0; + if (fake_outbuf) { + buf_free(TO_CONN(tlschan->conn)->outbuf); + TO_CONN(tlschan->conn)->outbuf = NULL; + } + + done: + if (ch) { + MOCK(scheduler_release_channel, scheduler_release_channel_mock); + /* + * Use fake close method that doesn't try to do too much to fake + * orconn + */ + ch->close = tlschan_fake_close_method; + channel_mark_for_close(ch); + tor_free(ch); + UNMOCK(scheduler_release_channel); + } + + UNMOCK(connection_or_connect); + UNMOCK(is_local_addr); + + return; +} + +static void +test_channeltls_overhead_estimate(void *arg) +{ + tor_addr_t test_addr; + channel_t *ch = NULL; + const char test_digest[DIGEST_LEN] = { + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, + 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11, 0x12, 0x13, 0x14 }; + float r; + channel_tls_t *tlschan = NULL; + + (void)arg; + + /* Set up a fake address to fake-connect to */ + test_addr.family = AF_INET; + test_addr.addr.in_addr.s_addr = htonl(0x01020304); + + /* For this test we always want the address to be treated as non-local */ + tlschan_local = 0; + /* Install is_local_addr() mock */ + MOCK(is_local_addr, tlschan_is_local_addr_mock); + + /* Install mock for connection_or_connect() */ + MOCK(connection_or_connect, tlschan_connection_or_connect_mock); + + /* Try connecting */ + ch = channel_tls_connect(&test_addr, 567, test_digest); + tt_assert(ch != NULL); + + /* First case: silly low ratios should get clamped to 1.0f */ + tlschan = BASE_CHAN_TO_TLS(ch); + tt_assert(tlschan != NULL); + tlschan->conn->bytes_xmitted = 128; + tlschan->conn->bytes_xmitted_by_tls = 64; + r = ch->get_overhead_estimate(ch); + tt_assert(fabsf(r - 1.0f) < 1E-12); + + tlschan->conn->bytes_xmitted_by_tls = 127; + r = ch->get_overhead_estimate(ch); + tt_assert(fabsf(r - 1.0f) < 1E-12); + + /* Now middle of the range */ + tlschan->conn->bytes_xmitted_by_tls = 192; + r = ch->get_overhead_estimate(ch); + tt_assert(fabsf(r - 1.5f) < 1E-12); + + /* Now above the 2.0f clamp */ + tlschan->conn->bytes_xmitted_by_tls = 257; + r = ch->get_overhead_estimate(ch); + tt_assert(fabsf(r - 2.0f) < 1E-12); + + tlschan->conn->bytes_xmitted_by_tls = 512; + r = ch->get_overhead_estimate(ch); + tt_assert(fabsf(r - 2.0f) < 1E-12); + + done: + if (ch) { + MOCK(scheduler_release_channel, scheduler_release_channel_mock); + /* + * Use fake close method that doesn't try to do too much to fake + * orconn + */ + ch->close = tlschan_fake_close_method; + channel_mark_for_close(ch); + tor_free(ch); + UNMOCK(scheduler_release_channel); + } + + UNMOCK(connection_or_connect); + UNMOCK(is_local_addr); + + return; +} + +static size_t +tlschan_buf_datalen_mock(const buf_t *buf) +{ + if (buf != NULL && buf == tlschan_buf_datalen_mock_target) { + return tlschan_buf_datalen_mock_size; + } else { + return buf_datalen__real(buf); + } +} + +static or_connection_t * +tlschan_connection_or_connect_mock(const tor_addr_t *addr, + uint16_t port, + const char *digest, + channel_tls_t *tlschan) +{ + or_connection_t *result = NULL; + + tt_assert(addr != NULL); + tt_assert(port != 0); + tt_assert(digest != NULL); + tt_assert(tlschan != NULL); + + /* Make a fake orconn */ + result = tor_malloc_zero(sizeof(*result)); + result->base_.magic = OR_CONNECTION_MAGIC; + result->base_.state = OR_CONN_STATE_OPEN; + result->base_.type = CONN_TYPE_OR; + result->base_.socket_family = addr->family; + result->base_.address = tor_strdup("<fake>"); + memcpy(&(result->base_.addr), addr, sizeof(tor_addr_t)); + result->base_.port = port; + memcpy(result->identity_digest, digest, DIGEST_LEN); + result->chan = tlschan; + memcpy(&(result->real_addr), addr, sizeof(tor_addr_t)); + result->tls = (tor_tls_t *)((void *)(&fake_tortls)); + + done: + return result; +} + +static void +tlschan_fake_close_method(channel_t *chan) +{ + channel_tls_t *tlschan = NULL; + + tt_assert(chan != NULL); + tt_int_op(chan->magic, ==, TLS_CHAN_MAGIC); + + tlschan = BASE_CHAN_TO_TLS(chan); + tt_assert(tlschan != NULL); + + /* Just free the fake orconn */ + tor_free(tlschan->conn); + + channel_closed(chan); + + done: + return; +} + +static int +tlschan_is_local_addr_mock(const tor_addr_t *addr) +{ + tt_assert(addr != NULL); + + done: + return tlschan_local; +} + +struct testcase_t channeltls_tests[] = { + { "create", test_channeltls_create, TT_FORK, NULL, NULL }, + { "num_bytes_queued", test_channeltls_num_bytes_queued, + TT_FORK, NULL, NULL }, + { "overhead_estimate", test_channeltls_overhead_estimate, + TT_FORK, NULL, NULL }, + END_OF_TESTCASES +}; + diff --git a/src/test/test_circuitmux.c b/src/test/test_circuitmux.c index b8590d6d24..e88d18f061 100644 --- a/src/test/test_circuitmux.c +++ b/src/test/test_circuitmux.c @@ -8,6 +8,7 @@ #include "channel.h" #include "circuitmux.h" #include "relay.h" +#include "scheduler.h" #include "test.h" /* XXXX duplicated function from test_circuitlist.c */ @@ -36,6 +37,8 @@ test_cmux_destroy_cell_queue(void *arg) cell_queue_t *cq = NULL; packed_cell_t *pc = NULL; + scheduler_init(); + #ifdef ENABLE_MEMPOOLS init_cell_pool(); #endif /* ENABLE_MEMPOOLS */ diff --git a/src/test/test_relay.c b/src/test/test_relay.c new file mode 100644 index 0000000000..6907597705 --- /dev/null +++ b/src/test/test_relay.c @@ -0,0 +1,134 @@ +/* Copyright (c) 2014, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "or.h" +#define CIRCUITBUILD_PRIVATE +#include "circuitbuild.h" +#define RELAY_PRIVATE +#include "relay.h" +/* For init/free stuff */ +#include "scheduler.h" + +/* Test suite stuff */ +#include "test.h" +#include "fakechans.h" + +static or_circuit_t * new_fake_orcirc(channel_t *nchan, channel_t *pchan); + +static void test_relay_append_cell_to_circuit_queue(void *arg); + +static or_circuit_t * +new_fake_orcirc(channel_t *nchan, channel_t *pchan) +{ + or_circuit_t *orcirc = NULL; + circuit_t *circ = NULL; + + orcirc = tor_malloc_zero(sizeof(*orcirc)); + circ = &(orcirc->base_); + circ->magic = OR_CIRCUIT_MAGIC; + + circ->n_chan = nchan; + circ->n_circ_id = get_unique_circ_id_by_chan(nchan); + circ->n_mux = NULL; /* ?? */ + cell_queue_init(&(circ->n_chan_cells)); + circ->n_hop = NULL; + circ->streams_blocked_on_n_chan = 0; + circ->streams_blocked_on_p_chan = 0; + circ->n_delete_pending = 0; + circ->p_delete_pending = 0; + circ->received_destroy = 0; + circ->state = CIRCUIT_STATE_OPEN; + circ->purpose = CIRCUIT_PURPOSE_OR; + circ->package_window = CIRCWINDOW_START_MAX; + circ->deliver_window = CIRCWINDOW_START_MAX; + circ->n_chan_create_cell = NULL; + + orcirc->p_chan = pchan; + orcirc->p_circ_id = get_unique_circ_id_by_chan(pchan); + cell_queue_init(&(orcirc->p_chan_cells)); + + return orcirc; +} + +static void +test_relay_append_cell_to_circuit_queue(void *arg) +{ + channel_t *nchan = NULL, *pchan = NULL; + or_circuit_t *orcirc = NULL; + cell_t *cell = NULL; + int old_count, new_count; + + (void)arg; + + /* We'll need the cell pool for append_cell_to_circuit_queue() to work */ +#ifdef ENABLE_MEMPOOLS + init_cell_pool(); +#endif /* ENABLE_MEMPOOLS */ + + /* Make fake channels to be nchan and pchan for the circuit */ + nchan = new_fake_channel(); + tt_assert(nchan); + + pchan = new_fake_channel(); + tt_assert(pchan); + + /* We'll need chans with working cmuxes */ + nchan->cmux = circuitmux_alloc(); + pchan->cmux = circuitmux_alloc(); + + /* Make a fake orcirc */ + orcirc = new_fake_orcirc(nchan, pchan); + tt_assert(orcirc); + + /* Make a cell */ + cell = tor_malloc_zero(sizeof(cell_t)); + make_fake_cell(cell); + + MOCK(scheduler_channel_has_waiting_cells, + scheduler_channel_has_waiting_cells_mock); + + /* Append it */ + old_count = get_mock_scheduler_has_waiting_cells_count(); + append_cell_to_circuit_queue(TO_CIRCUIT(orcirc), nchan, cell, + CELL_DIRECTION_OUT, 0); + new_count = get_mock_scheduler_has_waiting_cells_count(); + tt_int_op(new_count, ==, old_count + 1); + + /* Now try the reverse direction */ + old_count = get_mock_scheduler_has_waiting_cells_count(); + append_cell_to_circuit_queue(TO_CIRCUIT(orcirc), pchan, cell, + CELL_DIRECTION_IN, 0); + new_count = get_mock_scheduler_has_waiting_cells_count(); + tt_int_op(new_count, ==, old_count + 1); + + UNMOCK(scheduler_channel_has_waiting_cells); + + /* Get rid of the fake channels */ + MOCK(scheduler_release_channel, scheduler_release_channel_mock); + channel_mark_for_close(nchan); + channel_mark_for_close(pchan); + UNMOCK(scheduler_release_channel); + + /* Shut down channels */ + channel_free_all(); + nchan = pchan = NULL; + + done: + tor_free(orcirc); + if (nchan && nchan->cmux) circuitmux_free(nchan->cmux); + tor_free(nchan); + if (pchan && pchan->cmux) circuitmux_free(pchan->cmux); + tor_free(pchan); +#ifdef ENABLE_MEMPOOLS + free_cell_pool(); +#endif /* ENABLE_MEMPOOLS */ + + return; +} + +struct testcase_t relay_tests[] = { + { "append_cell_to_circuit_queue", test_relay_append_cell_to_circuit_queue, + TT_FORK, NULL, NULL }, + END_OF_TESTCASES +}; + diff --git a/src/test/test_scheduler.c b/src/test/test_scheduler.c new file mode 100644 index 0000000000..da5c4e8fa0 --- /dev/null +++ b/src/test/test_scheduler.c @@ -0,0 +1,763 @@ +/* Copyright (c) 2014, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include <math.h> + +#include "orconfig.h" + +/* Libevent stuff */ +#ifdef HAVE_EVENT2_EVENT_H +#include <event2/event.h> +#else +#include <event.h> +#endif + +#define TOR_CHANNEL_INTERNAL_ +#define CHANNEL_PRIVATE_ +#include "or.h" +#include "compat_libevent.h" +#include "channel.h" +#define SCHEDULER_PRIVATE_ +#include "scheduler.h" + +/* Test suite stuff */ +#include "test.h" +#include "fakechans.h" + +/* Statics in scheduler.c exposed to the test suite */ +extern smartlist_t *channels_pending; +extern struct event *run_sched_ev; +extern uint64_t queue_heuristic; +extern time_t queue_heuristic_timestamp; + +/* Event base for scheduelr tests */ +static struct event_base *mock_event_base = NULL; + +/* Statics controlling mocks */ +static circuitmux_t *mock_ccm_tgt_1 = NULL; +static circuitmux_t *mock_ccm_tgt_2 = NULL; + +static circuitmux_t *mock_cgp_tgt_1 = NULL; +static const circuitmux_policy_t *mock_cgp_val_1 = NULL; +static circuitmux_t *mock_cgp_tgt_2 = NULL; +static const circuitmux_policy_t *mock_cgp_val_2 = NULL; +static int scheduler_compare_channels_mock_ctr = 0; +static int scheduler_run_mock_ctr = 0; + +static void channel_flush_some_cells_mock_free_all(void); +static void channel_flush_some_cells_mock_set(channel_t *chan, + ssize_t num_cells); + +/* Setup for mock event stuff */ +static void mock_event_free_all(void); +static void mock_event_init(void); + +/* Mocks used by scheduler tests */ +static ssize_t channel_flush_some_cells_mock(channel_t *chan, + ssize_t num_cells); +static int circuitmux_compare_muxes_mock(circuitmux_t *cmux_1, + circuitmux_t *cmux_2); +static const circuitmux_policy_t * circuitmux_get_policy_mock( + circuitmux_t *cmux); +static int scheduler_compare_channels_mock(const void *c1_v, + const void *c2_v); +static void scheduler_run_noop_mock(void); +static struct event_base * tor_libevent_get_base_mock(void); + +/* Scheduler test cases */ +static void test_scheduler_channel_states(void *arg); +static void test_scheduler_compare_channels(void *arg); +static void test_scheduler_initfree(void *arg); +static void test_scheduler_loop(void *arg); +static void test_scheduler_queue_heuristic(void *arg); + +/* Mock event init/free */ + +/* Shamelessly stolen from compat_libevent.c */ +#define V(major, minor, patch) \ + (((major) << 24) | ((minor) << 16) | ((patch) << 8)) + +static void +mock_event_free_all(void) +{ + tt_assert(mock_event_base != NULL); + + if (mock_event_base) { + event_base_free(mock_event_base); + mock_event_base = NULL; + } + + tt_ptr_op(mock_event_base, ==, NULL); + + done: + return; +} + +static void +mock_event_init(void) +{ +#ifdef HAVE_EVENT2_EVENT_H + struct event_config *cfg = NULL; +#endif + + tt_ptr_op(mock_event_base, ==, NULL); + + /* + * Really cut down from tor_libevent_initialize of + * src/common/compat_libevent.c to kill config dependencies + */ + + if (!mock_event_base) { +#ifdef HAVE_EVENT2_EVENT_H + cfg = event_config_new(); +#if LIBEVENT_VERSION_NUMBER >= V(2,0,9) + /* We can enable changelist support with epoll, since we don't give + * Libevent any dup'd fds. This lets us avoid some syscalls. */ + event_config_set_flag(cfg, EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST); +#endif + mock_event_base = event_base_new_with_config(cfg); + event_config_free(cfg); +#else + mock_event_base = event_init(); +#endif + } + + tt_assert(mock_event_base != NULL); + + done: + return; +} + +/* Mocks */ + +typedef struct { + const channel_t *chan; + ssize_t cells; +} flush_mock_channel_t; + +static smartlist_t *chans_for_flush_mock = NULL; + +static void +channel_flush_some_cells_mock_free_all(void) +{ + if (chans_for_flush_mock) { + SMARTLIST_FOREACH_BEGIN(chans_for_flush_mock, + flush_mock_channel_t *, + flush_mock_ch) { + SMARTLIST_DEL_CURRENT(chans_for_flush_mock, flush_mock_ch); + tor_free(flush_mock_ch); + } SMARTLIST_FOREACH_END(flush_mock_ch); + + smartlist_free(chans_for_flush_mock); + chans_for_flush_mock = NULL; + } +} + +static void +channel_flush_some_cells_mock_set(channel_t *chan, ssize_t num_cells) +{ + flush_mock_channel_t *flush_mock_ch = NULL; + + if (!chan) return; + if (num_cells <= 0) return; + + if (!chans_for_flush_mock) { + chans_for_flush_mock = smartlist_new(); + } + + SMARTLIST_FOREACH_BEGIN(chans_for_flush_mock, + flush_mock_channel_t *, + flush_mock_ch) { + if (flush_mock_ch != NULL && flush_mock_ch->chan != NULL) { + if (flush_mock_ch->chan == chan) { + /* Found it */ + flush_mock_ch->cells = num_cells; + break; + } + } else { + /* That shouldn't be there... */ + SMARTLIST_DEL_CURRENT(chans_for_flush_mock, flush_mock_ch); + tor_free(flush_mock_ch); + } + } SMARTLIST_FOREACH_END(flush_mock_ch); + + if (!flush_mock_ch) { + /* The loop didn't find it */ + flush_mock_ch = tor_malloc_zero(sizeof(*flush_mock_ch)); + flush_mock_ch->chan = chan; + flush_mock_ch->cells = num_cells; + smartlist_add(chans_for_flush_mock, flush_mock_ch); + } +} + +static ssize_t +channel_flush_some_cells_mock(channel_t *chan, ssize_t num_cells) +{ + ssize_t flushed = 0, max; + char unlimited = 0; + flush_mock_channel_t *found = NULL; + + tt_assert(chan != NULL); + if (chan) { + if (num_cells < 0) { + num_cells = 0; + unlimited = 1; + } + + /* Check if we have it */ + if (chans_for_flush_mock != NULL) { + SMARTLIST_FOREACH_BEGIN(chans_for_flush_mock, + flush_mock_channel_t *, + flush_mock_ch) { + if (flush_mock_ch != NULL && flush_mock_ch->chan != NULL) { + if (flush_mock_ch->chan == chan) { + /* Found it */ + found = flush_mock_ch; + break; + } + } else { + /* That shouldn't be there... */ + SMARTLIST_DEL_CURRENT(chans_for_flush_mock, flush_mock_ch); + tor_free(flush_mock_ch); + } + } SMARTLIST_FOREACH_END(flush_mock_ch); + + if (found) { + /* We found one */ + if (found->cells < 0) found->cells = 0; + + if (unlimited) max = found->cells; + else max = MIN(found->cells, num_cells); + + flushed += max; + found->cells -= max; + + if (found->cells <= 0) { + smartlist_remove(chans_for_flush_mock, found); + tor_free(found); + } + } + } + } + + done: + return flushed; +} + +static int +circuitmux_compare_muxes_mock(circuitmux_t *cmux_1, + circuitmux_t *cmux_2) +{ + int result = 0; + + tt_assert(cmux_1 != NULL); + tt_assert(cmux_2 != NULL); + + if (cmux_1 != cmux_2) { + if (cmux_1 == mock_ccm_tgt_1 && cmux_2 == mock_ccm_tgt_2) result = -1; + else if (cmux_1 == mock_ccm_tgt_2 && cmux_2 == mock_ccm_tgt_1) { + result = 1; + } else { + if (cmux_1 == mock_ccm_tgt_1 || cmux_1 == mock_ccm_tgt_1) result = -1; + else if (cmux_2 == mock_ccm_tgt_1 || cmux_2 == mock_ccm_tgt_2) { + result = 1; + } else { + result = circuitmux_compare_muxes__real(cmux_1, cmux_2); + } + } + } + /* else result = 0 always */ + + done: + return result; +} + +static const circuitmux_policy_t * +circuitmux_get_policy_mock(circuitmux_t *cmux) +{ + const circuitmux_policy_t *result = NULL; + + tt_assert(cmux != NULL); + if (cmux) { + if (cmux == mock_cgp_tgt_1) result = mock_cgp_val_1; + else if (cmux == mock_cgp_tgt_2) result = mock_cgp_val_2; + else result = circuitmux_get_policy__real(cmux); + } + + done: + return result; +} + +static int +scheduler_compare_channels_mock(const void *c1_v, + const void *c2_v) +{ + uintptr_t p1, p2; + + p1 = (uintptr_t)(c1_v); + p2 = (uintptr_t)(c2_v); + + ++scheduler_compare_channels_mock_ctr; + + if (p1 == p2) return 0; + else if (p1 < p2) return 1; + else return -1; +} + +static void +scheduler_run_noop_mock(void) +{ + ++scheduler_run_mock_ctr; +} + +static struct event_base * +tor_libevent_get_base_mock(void) +{ + return mock_event_base; +} + +/* Test cases */ + +static void +test_scheduler_channel_states(void *arg) +{ + channel_t *ch1 = NULL, *ch2 = NULL; + int old_count; + + (void)arg; + + /* Set up libevent and scheduler */ + + mock_event_init(); + MOCK(tor_libevent_get_base, tor_libevent_get_base_mock); + scheduler_init(); + /* + * Install the compare channels mock so we can test + * scheduler_touch_channel(). + */ + MOCK(scheduler_compare_channels, scheduler_compare_channels_mock); + /* + * Disable scheduler_run so we can just check the state transitions + * without having to make everything it might call work too. + */ + MOCK(scheduler_run, scheduler_run_noop_mock); + + tt_int_op(smartlist_len(channels_pending), ==, 0); + + /* Set up a fake channel */ + ch1 = new_fake_channel(); + tt_assert(ch1); + + /* Start it off in OPENING */ + ch1->state = CHANNEL_STATE_OPENING; + /* We'll need a cmux */ + ch1->cmux = circuitmux_alloc(); + /* Try to register it */ + channel_register(ch1); + tt_assert(ch1->registered); + + /* It should start off in SCHED_CHAN_IDLE */ + tt_int_op(ch1->scheduler_state, ==, SCHED_CHAN_IDLE); + + /* Now get another one */ + ch2 = new_fake_channel(); + tt_assert(ch2); + ch2->state = CHANNEL_STATE_OPENING; + ch2->cmux = circuitmux_alloc(); + channel_register(ch2); + tt_assert(ch2->registered); + + /* Send it to SCHED_CHAN_WAITING_TO_WRITE */ + scheduler_channel_has_waiting_cells(ch1); + tt_int_op(ch1->scheduler_state, ==, SCHED_CHAN_WAITING_TO_WRITE); + + /* This should send it to SCHED_CHAN_PENDING */ + scheduler_channel_wants_writes(ch1); + tt_int_op(ch1->scheduler_state, ==, SCHED_CHAN_PENDING); + tt_int_op(smartlist_len(channels_pending), ==, 1); + + /* Now send ch2 to SCHED_CHAN_WAITING_FOR_CELLS */ + scheduler_channel_wants_writes(ch2); + tt_int_op(ch2->scheduler_state, ==, SCHED_CHAN_WAITING_FOR_CELLS); + + /* Drop ch2 back to idle */ + scheduler_channel_doesnt_want_writes(ch2); + tt_int_op(ch2->scheduler_state, ==, SCHED_CHAN_IDLE); + + /* ...and back to SCHED_CHAN_WAITING_FOR_CELLS */ + scheduler_channel_wants_writes(ch2); + tt_int_op(ch2->scheduler_state, ==, SCHED_CHAN_WAITING_FOR_CELLS); + + /* ...and this should kick ch2 into SCHED_CHAN_PENDING */ + scheduler_channel_has_waiting_cells(ch2); + tt_int_op(ch2->scheduler_state, ==, SCHED_CHAN_PENDING); + tt_int_op(smartlist_len(channels_pending), ==, 2); + + /* This should send ch2 to SCHED_CHAN_WAITING_TO_WRITE */ + scheduler_channel_doesnt_want_writes(ch2); + tt_int_op(ch2->scheduler_state, ==, SCHED_CHAN_WAITING_TO_WRITE); + tt_int_op(smartlist_len(channels_pending), ==, 1); + + /* ...and back to SCHED_CHAN_PENDING */ + scheduler_channel_wants_writes(ch2); + tt_int_op(ch2->scheduler_state, ==, SCHED_CHAN_PENDING); + tt_int_op(smartlist_len(channels_pending), ==, 2); + + /* Now we exercise scheduler_touch_channel */ + old_count = scheduler_compare_channels_mock_ctr; + scheduler_touch_channel(ch1); + tt_assert(scheduler_compare_channels_mock_ctr > old_count); + + /* Close */ + channel_mark_for_close(ch1); + tt_int_op(ch1->state, ==, CHANNEL_STATE_CLOSING); + channel_mark_for_close(ch2); + tt_int_op(ch2->state, ==, CHANNEL_STATE_CLOSING); + channel_closed(ch1); + tt_int_op(ch1->state, ==, CHANNEL_STATE_CLOSED); + ch1 = NULL; + channel_closed(ch2); + tt_int_op(ch2->state, ==, CHANNEL_STATE_CLOSED); + ch2 = NULL; + + /* Shut things down */ + + channel_free_all(); + scheduler_free_all(); + mock_event_free_all(); + + done: + tor_free(ch1); + tor_free(ch2); + + UNMOCK(scheduler_compare_channels); + UNMOCK(scheduler_run); + UNMOCK(tor_libevent_get_base); + + return; +} + +static void +test_scheduler_compare_channels(void *arg) +{ + /* We don't actually need whole fake channels... */ + channel_t c1, c2; + /* ...and some dummy circuitmuxes too */ + circuitmux_t *cm1 = NULL, *cm2 = NULL; + int result; + + (void)arg; + + /* We can't actually see sizeof(circuitmux_t) from here */ + cm1 = tor_malloc_zero(sizeof(void *)); + cm2 = tor_malloc_zero(sizeof(void *)); + + c1.cmux = cm1; + c2.cmux = cm2; + + /* Configure circuitmux_get_policy() mock */ + mock_cgp_tgt_1 = cm1; + /* + * This is to test the different-policies case, which uses the policy + * cast to an intptr_t as an arbitrary but definite thing to compare. + */ + mock_cgp_val_1 = (const circuitmux_policy_t *)(1); + mock_cgp_tgt_2 = cm2; + mock_cgp_val_2 = (const circuitmux_policy_t *)(2); + + MOCK(circuitmux_get_policy, circuitmux_get_policy_mock); + + /* Now set up circuitmux_compare_muxes() mock using cm1/cm2 */ + mock_ccm_tgt_1 = cm1; + mock_ccm_tgt_2 = cm2; + MOCK(circuitmux_compare_muxes, circuitmux_compare_muxes_mock); + + /* Equal-channel case */ + result = scheduler_compare_channels(&c1, &c1); + tt_int_op(result, ==, 0); + + /* Distinct channels, distinct policies */ + result = scheduler_compare_channels(&c1, &c2); + tt_int_op(result, ==, -1); + result = scheduler_compare_channels(&c2, &c1); + tt_int_op(result, ==, 1); + + /* Distinct channels, same policy */ + mock_cgp_val_2 = mock_cgp_val_1; + result = scheduler_compare_channels(&c1, &c2); + tt_int_op(result, ==, -1); + result = scheduler_compare_channels(&c2, &c1); + tt_int_op(result, ==, 1); + + done: + + UNMOCK(circuitmux_compare_muxes); + mock_ccm_tgt_1 = NULL; + mock_ccm_tgt_2 = NULL; + + UNMOCK(circuitmux_get_policy); + mock_cgp_tgt_1 = NULL; + mock_cgp_val_1 = NULL; + mock_cgp_tgt_2 = NULL; + mock_cgp_val_2 = NULL; + + tor_free(cm1); + tor_free(cm2); + + return; +} + +static void +test_scheduler_initfree(void *arg) +{ + (void)arg; + + tt_ptr_op(channels_pending, ==, NULL); + tt_ptr_op(run_sched_ev, ==, NULL); + + mock_event_init(); + MOCK(tor_libevent_get_base, tor_libevent_get_base_mock); + + scheduler_init(); + + tt_assert(channels_pending != NULL); + tt_assert(run_sched_ev != NULL); + + scheduler_free_all(); + + UNMOCK(tor_libevent_get_base); + mock_event_free_all(); + + tt_ptr_op(channels_pending, ==, NULL); + tt_ptr_op(run_sched_ev, ==, NULL); + + done: + return; +} + +static void +test_scheduler_loop(void *arg) +{ + channel_t *ch1 = NULL, *ch2 = NULL; + + (void)arg; + + /* Set up libevent and scheduler */ + + mock_event_init(); + MOCK(tor_libevent_get_base, tor_libevent_get_base_mock); + scheduler_init(); + /* + * Install the compare channels mock so we can test + * scheduler_touch_channel(). + */ + MOCK(scheduler_compare_channels, scheduler_compare_channels_mock); + /* + * Disable scheduler_run so we can just check the state transitions + * without having to make everything it might call work too. + */ + MOCK(scheduler_run, scheduler_run_noop_mock); + + tt_int_op(smartlist_len(channels_pending), ==, 0); + + /* Set up a fake channel */ + ch1 = new_fake_channel(); + tt_assert(ch1); + + /* Start it off in OPENING */ + ch1->state = CHANNEL_STATE_OPENING; + /* We'll need a cmux */ + ch1->cmux = circuitmux_alloc(); + /* Try to register it */ + channel_register(ch1); + tt_assert(ch1->registered); + /* Finish opening it */ + channel_change_state(ch1, CHANNEL_STATE_OPEN); + + /* It should start off in SCHED_CHAN_IDLE */ + tt_int_op(ch1->scheduler_state, ==, SCHED_CHAN_IDLE); + + /* Now get another one */ + ch2 = new_fake_channel(); + tt_assert(ch2); + ch2->state = CHANNEL_STATE_OPENING; + ch2->cmux = circuitmux_alloc(); + channel_register(ch2); + tt_assert(ch2->registered); + /* + * Don't open ch2; then channel_num_cells_writeable() will return + * zero and we'll get coverage of that exception case in scheduler_run() + */ + + tt_int_op(ch1->state, ==, CHANNEL_STATE_OPEN); + tt_int_op(ch2->state, ==, CHANNEL_STATE_OPENING); + + /* Send it to SCHED_CHAN_WAITING_TO_WRITE */ + scheduler_channel_has_waiting_cells(ch1); + tt_int_op(ch1->scheduler_state, ==, SCHED_CHAN_WAITING_TO_WRITE); + + /* This should send it to SCHED_CHAN_PENDING */ + scheduler_channel_wants_writes(ch1); + tt_int_op(ch1->scheduler_state, ==, SCHED_CHAN_PENDING); + tt_int_op(smartlist_len(channels_pending), ==, 1); + + /* Now send ch2 to SCHED_CHAN_WAITING_FOR_CELLS */ + scheduler_channel_wants_writes(ch2); + tt_int_op(ch2->scheduler_state, ==, SCHED_CHAN_WAITING_FOR_CELLS); + + /* Drop ch2 back to idle */ + scheduler_channel_doesnt_want_writes(ch2); + tt_int_op(ch2->scheduler_state, ==, SCHED_CHAN_IDLE); + + /* ...and back to SCHED_CHAN_WAITING_FOR_CELLS */ + scheduler_channel_wants_writes(ch2); + tt_int_op(ch2->scheduler_state, ==, SCHED_CHAN_WAITING_FOR_CELLS); + + /* ...and this should kick ch2 into SCHED_CHAN_PENDING */ + scheduler_channel_has_waiting_cells(ch2); + tt_int_op(ch2->scheduler_state, ==, SCHED_CHAN_PENDING); + tt_int_op(smartlist_len(channels_pending), ==, 2); + + /* + * Now we've got two pending channels and need to fire off + * scheduler_run(); first, unmock it. + */ + + UNMOCK(scheduler_run); + + scheduler_run(); + + /* Now re-mock it */ + MOCK(scheduler_run, scheduler_run_noop_mock); + + /* + * Assert that they're still in the states we left and aren't still + * pending + */ + tt_int_op(ch1->state, ==, CHANNEL_STATE_OPEN); + tt_int_op(ch2->state, ==, CHANNEL_STATE_OPENING); + tt_assert(ch1->scheduler_state != SCHED_CHAN_PENDING); + tt_assert(ch2->scheduler_state != SCHED_CHAN_PENDING); + tt_int_op(smartlist_len(channels_pending), ==, 0); + + /* Now, finish opening ch2, and get both back to pending */ + channel_change_state(ch2, CHANNEL_STATE_OPEN); + scheduler_channel_wants_writes(ch1); + scheduler_channel_wants_writes(ch2); + scheduler_channel_has_waiting_cells(ch1); + scheduler_channel_has_waiting_cells(ch2); + tt_int_op(ch1->state, ==, CHANNEL_STATE_OPEN); + tt_int_op(ch2->state, ==, CHANNEL_STATE_OPEN); + tt_int_op(ch1->scheduler_state, ==, SCHED_CHAN_PENDING); + tt_int_op(ch2->scheduler_state, ==, SCHED_CHAN_PENDING); + tt_int_op(smartlist_len(channels_pending), ==, 2); + + /* Now, set up the channel_flush_some_cells() mock */ + MOCK(channel_flush_some_cells, channel_flush_some_cells_mock); + /* + * 16 cells on ch1 means it'll completely drain into the 32 cells + * fakechan's num_cells_writeable() returns. + */ + channel_flush_some_cells_mock_set(ch1, 16); + /* + * This one should get sent back to pending, since num_cells_writeable() + * will still return non-zero. + */ + channel_flush_some_cells_mock_set(ch2, 48); + + /* + * And re-run the scheduler_run() loop with non-zero returns from + * channel_flush_some_cells() this time. + */ + UNMOCK(scheduler_run); + + scheduler_run(); + + /* Now re-mock it */ + MOCK(scheduler_run, scheduler_run_noop_mock); + + /* + * ch1 should have gone to SCHED_CHAN_WAITING_FOR_CELLS, with 16 flushed + * and 32 writeable. + */ + tt_int_op(ch1->scheduler_state, ==, SCHED_CHAN_WAITING_FOR_CELLS); + /* + * ...ch2 should also have gone to SCHED_CHAN_WAITING_FOR_CELLS, with + * channel_more_to_flush() returning false and channel_num_cells_writeable() + * > 0/ + */ + tt_int_op(ch2->scheduler_state, ==, SCHED_CHAN_WAITING_FOR_CELLS); + + /* Close */ + channel_mark_for_close(ch1); + tt_int_op(ch1->state, ==, CHANNEL_STATE_CLOSING); + channel_mark_for_close(ch2); + tt_int_op(ch2->state, ==, CHANNEL_STATE_CLOSING); + channel_closed(ch1); + tt_int_op(ch1->state, ==, CHANNEL_STATE_CLOSED); + ch1 = NULL; + channel_closed(ch2); + tt_int_op(ch2->state, ==, CHANNEL_STATE_CLOSED); + ch2 = NULL; + + /* Shut things down */ + channel_flush_some_cells_mock_free_all(); + channel_free_all(); + scheduler_free_all(); + mock_event_free_all(); + + done: + tor_free(ch1); + tor_free(ch2); + + UNMOCK(channel_flush_some_cells); + UNMOCK(scheduler_compare_channels); + UNMOCK(scheduler_run); + UNMOCK(tor_libevent_get_base); +} + +static void +test_scheduler_queue_heuristic(void *arg) +{ + time_t now = approx_time(); + uint64_t qh; + + (void)arg; + + queue_heuristic = 0; + queue_heuristic_timestamp = 0; + + /* Not yet inited case */ + scheduler_update_queue_heuristic(now - 180); + tt_int_op(queue_heuristic, ==, 0); + tt_int_op(queue_heuristic_timestamp, ==, now - 180); + + queue_heuristic = 1000000000L; + queue_heuristic_timestamp = now - 120; + + scheduler_update_queue_heuristic(now - 119); + tt_int_op(queue_heuristic, ==, 500000000L); + tt_int_op(queue_heuristic_timestamp, ==, now - 119); + + scheduler_update_queue_heuristic(now - 116); + tt_int_op(queue_heuristic, ==, 62500000L); + tt_int_op(queue_heuristic_timestamp, ==, now - 116); + + qh = scheduler_get_queue_heuristic(); + tt_int_op(qh, ==, 0); + + done: + return; +} + +struct testcase_t scheduler_tests[] = { + { "channel_states", test_scheduler_channel_states, TT_FORK, NULL, NULL }, + { "compare_channels", test_scheduler_compare_channels, + TT_FORK, NULL, NULL }, + { "initfree", test_scheduler_initfree, TT_FORK, NULL, NULL }, + { "loop", test_scheduler_loop, TT_FORK, NULL, NULL }, + { "queue_heuristic", test_scheduler_queue_heuristic, + TT_FORK, NULL, NULL }, + END_OF_TESTCASES +}; + |