summaryrefslogtreecommitdiff
path: root/src/or
diff options
context:
space:
mode:
Diffstat (limited to 'src/or')
-rw-r--r--src/or/Makefile.nmake1
-rw-r--r--src/or/buffers.c4
-rw-r--r--src/or/buffers.h2
-rw-r--r--src/or/channel.c477
-rw-r--r--src/or/channel.h87
-rw-r--r--src/or/channeltls.c144
-rw-r--r--src/or/channeltls.h2
-rw-r--r--src/or/circuitbuild.c8
-rw-r--r--src/or/circuitlist.c4
-rw-r--r--src/or/circuitlist.h3
-rw-r--r--src/or/circuitmux.c56
-rw-r--r--src/or/circuitmux.h12
-rw-r--r--src/or/circuitmux_ewma.c58
-rw-r--r--src/or/circuituse.c2
-rw-r--r--src/or/command.c9
-rw-r--r--src/or/config.c81
-rw-r--r--src/or/config.h2
-rw-r--r--src/or/connection.c2
-rw-r--r--src/or/connection_edge.c7
-rw-r--r--src/or/connection_or.c67
-rw-r--r--src/or/connection_or.h8
-rw-r--r--src/or/control.c38
-rw-r--r--src/or/control.h6
-rw-r--r--src/or/directory.c19
-rw-r--r--src/or/include.am2
-rw-r--r--src/or/main.c56
-rw-r--r--src/or/main.h4
-rw-r--r--src/or/nodelist.c2
-rw-r--r--src/or/or.h44
-rw-r--r--src/or/relay.c15
-rw-r--r--src/or/relay.h3
-rw-r--r--src/or/rendclient.c20
-rw-r--r--src/or/rendcommon.c7
-rw-r--r--src/or/rendmid.c7
-rw-r--r--src/or/rendservice.c209
-rw-r--r--src/or/rephist.c216
-rw-r--r--src/or/rephist.h7
-rw-r--r--src/or/router.c23
-rw-r--r--src/or/router.h2
-rw-r--r--src/or/routerlist.c236
-rw-r--r--src/or/scheduler.c709
-rw-r--r--src/or/scheduler.h50
42 files changed, 2141 insertions, 570 deletions
diff --git a/src/or/Makefile.nmake b/src/or/Makefile.nmake
index 523bf3306b..2ac98cd372 100644
--- a/src/or/Makefile.nmake
+++ b/src/or/Makefile.nmake
@@ -63,6 +63,7 @@ LIBTOR_OBJECTS = \
routerlist.obj \
routerparse.obj \
routerset.obj \
+ scheduler.obj \
statefile.obj \
status.obj \
transports.obj
diff --git a/src/or/buffers.c b/src/or/buffers.c
index bd33fe451d..4cdc03bc03 100644
--- a/src/or/buffers.c
+++ b/src/or/buffers.c
@@ -562,8 +562,8 @@ buf_clear(buf_t *buf)
}
/** Return the number of bytes stored in <b>buf</b> */
-size_t
-buf_datalen(const buf_t *buf)
+MOCK_IMPL(size_t,
+buf_datalen, (const buf_t *buf))
{
return buf->datalen;
}
diff --git a/src/or/buffers.h b/src/or/buffers.h
index 9c2c7d0e9d..4687fbefd7 100644
--- a/src/or/buffers.h
+++ b/src/or/buffers.h
@@ -24,7 +24,7 @@ void buf_shrink(buf_t *buf);
size_t buf_shrink_freelists(int free_all);
void buf_dump_freelist_sizes(int severity);
-size_t buf_datalen(const buf_t *buf);
+MOCK_DECL(size_t, buf_datalen, (const buf_t *buf));
size_t buf_allocation(const buf_t *buf);
size_t buf_slack(const buf_t *buf);
diff --git a/src/or/channel.c b/src/or/channel.c
index 5651bb4b27..4826bdd0a7 100644
--- a/src/or/channel.c
+++ b/src/or/channel.c
@@ -13,6 +13,9 @@
#define TOR_CHANNEL_INTERNAL_
+/* This one's for stuff only channel.c and the test suite should see */
+#define CHANNEL_PRIVATE_
+
#include "or.h"
#include "channel.h"
#include "channeltls.h"
@@ -29,29 +32,7 @@
#include "rephist.h"
#include "router.h"
#include "routerlist.h"
-
-/* Cell queue structure */
-
-typedef struct cell_queue_entry_s cell_queue_entry_t;
-struct cell_queue_entry_s {
- TOR_SIMPLEQ_ENTRY(cell_queue_entry_s) next;
- enum {
- CELL_QUEUE_FIXED,
- CELL_QUEUE_VAR,
- CELL_QUEUE_PACKED
- } type;
- union {
- struct {
- cell_t *cell;
- } fixed;
- struct {
- var_cell_t *var_cell;
- } var;
- struct {
- packed_cell_t *packed_cell;
- } packed;
- } u;
-};
+#include "scheduler.h"
/* Global lists of channels */
@@ -76,6 +57,60 @@ static smartlist_t *finished_listeners = NULL;
/* Counter for ID numbers */
static uint64_t n_channels_allocated = 0;
+/*
+ * Channel global byte/cell counters, for statistics and for scheduler high
+ * /low-water marks.
+ */
+
+/*
+ * Total number of cells ever given to any channel with the
+ * channel_write_*_cell() functions.
+ */
+
+static uint64_t n_channel_cells_queued = 0;
+
+/*
+ * Total number of cells ever passed to a channel lower layer with the
+ * write_*_cell() methods.
+ */
+
+static uint64_t n_channel_cells_passed_to_lower_layer = 0;
+
+/*
+ * Current number of cells in all channel queues; should be
+ * n_channel_cells_queued - n_channel_cells_passed_to_lower_layer.
+ */
+
+static uint64_t n_channel_cells_in_queues = 0;
+
+/*
+ * Total number of bytes for all cells ever queued to a channel and
+ * counted in n_channel_cells_queued.
+ */
+
+static uint64_t n_channel_bytes_queued = 0;
+
+/*
+ * Total number of bytes for all cells ever passed to a channel lower layer
+ * and counted in n_channel_cells_passed_to_lower_layer.
+ */
+
+static uint64_t n_channel_bytes_passed_to_lower_layer = 0;
+
+/*
+ * Current number of bytes in all channel queues; should be
+ * n_channel_bytes_queued - n_channel_bytes_passed_to_lower_layer.
+ */
+
+static uint64_t n_channel_bytes_in_queues = 0;
+
+/*
+ * Current total estimated queue size *including lower layer queues and
+ * transmit overhead*
+ */
+
+STATIC uint64_t estimated_total_queue_size = 0;
+
/* Digest->channel map
*
* Similar to the one used in connection_or.c, this maps from the identity
@@ -123,6 +158,8 @@ cell_queue_entry_new_var(var_cell_t *var_cell);
static int is_destroy_cell(channel_t *chan,
const cell_queue_entry_t *q, circid_t *circid_out);
+static void channel_assert_counter_consistency(void);
+
/* Functions to maintain the digest map */
static void channel_add_to_digest_map(channel_t *chan);
static void channel_remove_from_digest_map(channel_t *chan);
@@ -140,6 +177,8 @@ channel_free_list(smartlist_t *channels, int mark_for_close);
static void
channel_listener_free_list(smartlist_t *channels, int mark_for_close);
static void channel_listener_force_free(channel_listener_t *chan_l);
+static size_t channel_get_cell_queue_entry_size(channel_t *chan,
+ cell_queue_entry_t *q);
static void
channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q);
@@ -740,6 +779,9 @@ channel_init(channel_t *chan)
/* It hasn't been open yet. */
chan->has_been_open = 0;
+
+ /* Scheduler state is idle */
+ chan->scheduler_state = SCHED_CHAN_IDLE;
}
/**
@@ -782,6 +824,9 @@ channel_free(channel_t *chan)
"Freeing channel " U64_FORMAT " at %p",
U64_PRINTF_ARG(chan->global_identifier), chan);
+ /* Get this one out of the scheduler */
+ scheduler_release_channel(chan);
+
/*
* Get rid of cmux policy before we do anything, so cmux policies don't
* see channels in weird half-freed states.
@@ -857,6 +902,9 @@ channel_force_free(channel_t *chan)
"Force-freeing channel " U64_FORMAT " at %p",
U64_PRINTF_ARG(chan->global_identifier), chan);
+ /* Get this one out of the scheduler */
+ scheduler_release_channel(chan);
+
/*
* Get rid of cmux policy before we do anything, so cmux policies don't
* see channels in weird half-freed states.
@@ -1639,6 +1687,36 @@ cell_queue_entry_new_var(var_cell_t *var_cell)
}
/**
+ * Ask how big the cell contained in a cell_queue_entry_t is
+ */
+
+static size_t
+channel_get_cell_queue_entry_size(channel_t *chan, cell_queue_entry_t *q)
+{
+ size_t rv = 0;
+
+ tor_assert(chan);
+ tor_assert(q);
+
+ switch (q->type) {
+ case CELL_QUEUE_FIXED:
+ rv = get_cell_network_size(chan->wide_circ_ids);
+ break;
+ case CELL_QUEUE_VAR:
+ rv = get_var_cell_header_size(chan->wide_circ_ids) +
+ (q->u.var.var_cell ? q->u.var.var_cell->payload_len : 0);
+ break;
+ case CELL_QUEUE_PACKED:
+ rv = get_cell_network_size(chan->wide_circ_ids);
+ break;
+ default:
+ tor_assert(1);
+ }
+
+ return rv;
+}
+
+/**
* Write to a channel based on a cell_queue_entry_t
*
* Given a cell_queue_entry_t filled out by the caller, try to send the cell
@@ -1650,6 +1728,7 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
{
int result = 0, sent = 0;
cell_queue_entry_t *tmp = NULL;
+ size_t cell_bytes;
tor_assert(chan);
tor_assert(q);
@@ -1664,6 +1743,9 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
}
}
+ /* For statistical purposes, figure out how big this cell is */
+ cell_bytes = channel_get_cell_queue_entry_size(chan, q);
+
/* Can we send it right out? If so, try */
if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) &&
CHANNEL_IS_OPEN(chan)) {
@@ -1697,6 +1779,13 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
channel_timestamp_drained(chan);
/* Update the counter */
++(chan->n_cells_xmitted);
+ chan->n_bytes_xmitted += cell_bytes;
+ /* Update global counters */
+ ++n_channel_cells_queued;
+ ++n_channel_cells_passed_to_lower_layer;
+ n_channel_bytes_queued += cell_bytes;
+ n_channel_bytes_passed_to_lower_layer += cell_bytes;
+ channel_assert_counter_consistency();
}
}
@@ -1708,6 +1797,14 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
*/
tmp = cell_queue_entry_dup(q);
TOR_SIMPLEQ_INSERT_TAIL(&chan->outgoing_queue, tmp, next);
+ /* Update global counters */
+ ++n_channel_cells_queued;
+ ++n_channel_cells_in_queues;
+ n_channel_bytes_queued += cell_bytes;
+ n_channel_bytes_in_queues += cell_bytes;
+ channel_assert_counter_consistency();
+ /* Update channel queue size */
+ chan->bytes_in_queue += cell_bytes;
/* Try to process the queue? */
if (CHANNEL_IS_OPEN(chan)) channel_flush_cells(chan);
}
@@ -1746,6 +1843,9 @@ channel_write_cell(channel_t *chan, cell_t *cell)
q.type = CELL_QUEUE_FIXED;
q.u.fixed.cell = cell;
channel_write_cell_queue_entry(chan, &q);
+
+ /* Update the queue size estimate */
+ channel_update_xmit_queue_size(chan);
}
/**
@@ -1781,6 +1881,9 @@ channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell)
q.type = CELL_QUEUE_PACKED;
q.u.packed.packed_cell = packed_cell;
channel_write_cell_queue_entry(chan, &q);
+
+ /* Update the queue size estimate */
+ channel_update_xmit_queue_size(chan);
}
/**
@@ -1817,6 +1920,9 @@ channel_write_var_cell(channel_t *chan, var_cell_t *var_cell)
q.type = CELL_QUEUE_VAR;
q.u.var.var_cell = var_cell;
channel_write_cell_queue_entry(chan, &q);
+
+ /* Update the queue size estimate */
+ channel_update_xmit_queue_size(chan);
}
/**
@@ -1912,6 +2018,41 @@ channel_change_state(channel_t *chan, channel_state_t to_state)
}
}
+ /*
+ * If we're going to a closed/closing state, we don't need scheduling any
+ * more; in CHANNEL_STATE_MAINT we can't accept writes.
+ */
+ if (to_state == CHANNEL_STATE_CLOSING ||
+ to_state == CHANNEL_STATE_CLOSED ||
+ to_state == CHANNEL_STATE_ERROR) {
+ scheduler_release_channel(chan);
+ } else if (to_state == CHANNEL_STATE_MAINT) {
+ scheduler_channel_doesnt_want_writes(chan);
+ }
+
+ /*
+ * If we're closing, this channel no longer counts toward the global
+ * estimated queue size; if we're open, it now does.
+ */
+ if ((to_state == CHANNEL_STATE_CLOSING ||
+ to_state == CHANNEL_STATE_CLOSED ||
+ to_state == CHANNEL_STATE_ERROR) &&
+ (from_state == CHANNEL_STATE_OPEN ||
+ from_state == CHANNEL_STATE_MAINT)) {
+ estimated_total_queue_size -= chan->bytes_in_queue;
+ }
+
+ /*
+ * If we're opening, this channel now does count toward the global
+ * estimated queue size.
+ */
+ if ((to_state == CHANNEL_STATE_OPEN ||
+ to_state == CHANNEL_STATE_MAINT) &&
+ !(from_state == CHANNEL_STATE_OPEN ||
+ from_state == CHANNEL_STATE_MAINT)) {
+ estimated_total_queue_size += chan->bytes_in_queue;
+ }
+
/* Tell circuits if we opened and stuff */
if (to_state == CHANNEL_STATE_OPEN) {
channel_do_open_actions(chan);
@@ -2027,12 +2168,13 @@ channel_listener_change_state(channel_listener_t *chan_l,
#define MAX_CELLS_TO_GET_FROM_CIRCUITS_FOR_UNLIMITED 256
-ssize_t
-channel_flush_some_cells(channel_t *chan, ssize_t num_cells)
+MOCK_IMPL(ssize_t,
+channel_flush_some_cells, (channel_t *chan, ssize_t num_cells))
{
unsigned int unlimited = 0;
ssize_t flushed = 0;
int num_cells_from_circs, clamped_num_cells;
+ int q_len_before, q_len_after;
tor_assert(chan);
@@ -2058,14 +2200,45 @@ channel_flush_some_cells(channel_t *chan, ssize_t num_cells)
clamped_num_cells = (int)(num_cells - flushed);
}
}
+
+ /*
+ * Keep track of the change in queue size; we have to count cells
+ * channel_flush_from_first_active_circuit() writes out directly,
+ * but not double-count ones we might get later in
+ * channel_flush_some_cells_from_outgoing_queue()
+ */
+ q_len_before = chan_cell_queue_len(&(chan->outgoing_queue));
+
/* Try to get more cells from any active circuits */
num_cells_from_circs = channel_flush_from_first_active_circuit(
chan, clamped_num_cells);
- /* If it claims we got some, process the queue again */
+ q_len_after = chan_cell_queue_len(&(chan->outgoing_queue));
+
+ /*
+ * If it claims we got some, adjust the flushed counter and consider
+ * processing the queue again
+ */
if (num_cells_from_circs > 0) {
- flushed += channel_flush_some_cells_from_outgoing_queue(chan,
- (unlimited ? -1 : num_cells - flushed));
+ /*
+ * Adjust flushed by the number of cells counted in
+ * num_cells_from_circs that didn't go to the cell queue.
+ */
+
+ if (q_len_after > q_len_before) {
+ num_cells_from_circs -= (q_len_after - q_len_before);
+ if (num_cells_from_circs < 0) num_cells_from_circs = 0;
+ }
+
+ flushed += num_cells_from_circs;
+
+ /* Now process the queue if necessary */
+
+ if ((q_len_after > q_len_before) &&
+ (unlimited || (flushed < num_cells))) {
+ flushed += channel_flush_some_cells_from_outgoing_queue(chan,
+ (unlimited ? -1 : num_cells - flushed));
+ }
}
}
}
@@ -2088,6 +2261,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
unsigned int unlimited = 0;
ssize_t flushed = 0;
cell_queue_entry_t *q = NULL;
+ size_t cell_size;
+ int free_q = 0, handed_off = 0;
tor_assert(chan);
tor_assert(chan->write_cell);
@@ -2101,8 +2276,12 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
if (CHANNEL_IS_OPEN(chan)) {
while ((unlimited || num_cells > flushed) &&
NULL != (q = TOR_SIMPLEQ_FIRST(&chan->outgoing_queue))) {
+ free_q = 0;
+ handed_off = 0;
if (1) {
+ /* Figure out how big it is for statistical purposes */
+ cell_size = channel_get_cell_queue_entry_size(chan, q);
/*
* Okay, we have a good queue entry, try to give it to the lower
* layer.
@@ -2115,8 +2294,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
++flushed;
channel_timestamp_xmit(chan);
++(chan->n_cells_xmitted);
- cell_queue_entry_free(q, 1);
- q = NULL;
+ chan->n_bytes_xmitted += cell_size;
+ free_q = 1;
+ handed_off = 1;
}
/* Else couldn't write it; leave it on the queue */
} else {
@@ -2127,8 +2307,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
"(global ID " U64_FORMAT ").",
chan, U64_PRINTF_ARG(chan->global_identifier));
/* Throw it away */
- cell_queue_entry_free(q, 0);
- q = NULL;
+ free_q = 1;
+ handed_off = 0;
}
break;
case CELL_QUEUE_PACKED:
@@ -2138,8 +2318,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
++flushed;
channel_timestamp_xmit(chan);
++(chan->n_cells_xmitted);
- cell_queue_entry_free(q, 1);
- q = NULL;
+ chan->n_bytes_xmitted += cell_size;
+ free_q = 1;
+ handed_off = 1;
}
/* Else couldn't write it; leave it on the queue */
} else {
@@ -2150,8 +2331,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
"(global ID " U64_FORMAT ").",
chan, U64_PRINTF_ARG(chan->global_identifier));
/* Throw it away */
- cell_queue_entry_free(q, 0);
- q = NULL;
+ free_q = 1;
+ handed_off = 0;
}
break;
case CELL_QUEUE_VAR:
@@ -2161,8 +2342,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
++flushed;
channel_timestamp_xmit(chan);
++(chan->n_cells_xmitted);
- cell_queue_entry_free(q, 1);
- q = NULL;
+ chan->n_bytes_xmitted += cell_size;
+ free_q = 1;
+ handed_off = 1;
}
/* Else couldn't write it; leave it on the queue */
} else {
@@ -2173,8 +2355,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
"(global ID " U64_FORMAT ").",
chan, U64_PRINTF_ARG(chan->global_identifier));
/* Throw it away */
- cell_queue_entry_free(q, 0);
- q = NULL;
+ free_q = 1;
+ handed_off = 0;
}
break;
default:
@@ -2184,12 +2366,32 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
"(global ID " U64_FORMAT "; ignoring it."
" Someone should fix this.",
q->type, chan, U64_PRINTF_ARG(chan->global_identifier));
- cell_queue_entry_free(q, 0);
- q = NULL;
+ free_q = 1;
+ handed_off = 0;
}
- /* if q got NULLed out, we used it and should remove the queue entry */
- if (!q) TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next);
+ /*
+ * if free_q is set, we used it and should remove the queue entry;
+ * we have to do the free down here so TOR_SIMPLEQ_REMOVE_HEAD isn't
+ * accessing freed memory
+ */
+ if (free_q) {
+ TOR_SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next);
+ /*
+ * ...and we handed a cell off to the lower layer, so we should
+ * update the counters.
+ */
+ ++n_channel_cells_passed_to_lower_layer;
+ --n_channel_cells_in_queues;
+ n_channel_bytes_passed_to_lower_layer += cell_size;
+ n_channel_bytes_in_queues -= cell_size;
+ channel_assert_counter_consistency();
+ /* Update the channel's queue size too */
+ chan->bytes_in_queue -= cell_size;
+ /* Finally, free q */
+ cell_queue_entry_free(q, handed_off);
+ q = NULL;
+ }
/* No cell removed from list, so we can't go on any further */
else break;
}
@@ -2201,6 +2403,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
channel_timestamp_drained(chan);
}
+ /* Update the estimate queue size */
+ channel_update_xmit_queue_size(chan);
+
return flushed;
}
@@ -2511,8 +2716,9 @@ channel_queue_cell(channel_t *chan, cell_t *cell)
/* Timestamp for receiving */
channel_timestamp_recv(chan);
- /* Update the counter */
+ /* Update the counters */
++(chan->n_cells_recved);
+ chan->n_bytes_recved += get_cell_network_size(chan->wide_circ_ids);
/* If we don't need to queue we can just call cell_handler */
if (!need_to_queue) {
@@ -2566,6 +2772,8 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
/* Update the counter */
++(chan->n_cells_recved);
+ chan->n_bytes_recved += get_var_cell_header_size(chan->wide_circ_ids) +
+ var_cell->payload_len;
/* If we don't need to queue we can just call cell_handler */
if (!need_to_queue) {
@@ -2615,6 +2823,19 @@ packed_cell_is_destroy(channel_t *chan,
return 0;
}
+/**
+ * Assert that the global channel stats counters are internally consistent
+ */
+
+static void
+channel_assert_counter_consistency(void)
+{
+ tor_assert(n_channel_cells_queued ==
+ (n_channel_cells_in_queues + n_channel_cells_passed_to_lower_layer));
+ tor_assert(n_channel_bytes_queued ==
+ (n_channel_bytes_in_queues + n_channel_bytes_passed_to_lower_layer));
+}
+
/** DOCDOC */
static int
is_destroy_cell(channel_t *chan,
@@ -2694,6 +2915,19 @@ channel_dumpstats(int severity)
{
if (all_channels && smartlist_len(all_channels) > 0) {
tor_log(severity, LD_GENERAL,
+ "Channels have queued " U64_FORMAT " bytes in " U64_FORMAT " cells, "
+ "and handed " U64_FORMAT " bytes in " U64_FORMAT " cells to the lower"
+ " layer.",
+ U64_PRINTF_ARG(n_channel_bytes_queued),
+ U64_PRINTF_ARG(n_channel_cells_queued),
+ U64_PRINTF_ARG(n_channel_bytes_passed_to_lower_layer),
+ U64_PRINTF_ARG(n_channel_cells_passed_to_lower_layer));
+ tor_log(severity, LD_GENERAL,
+ "There are currently " U64_FORMAT " bytes in " U64_FORMAT " cells "
+ "in channel queues.",
+ U64_PRINTF_ARG(n_channel_bytes_in_queues),
+ U64_PRINTF_ARG(n_channel_cells_in_queues));
+ tor_log(severity, LD_GENERAL,
"Dumping statistics about %d channels:",
smartlist_len(all_channels));
tor_log(severity, LD_GENERAL,
@@ -3163,7 +3397,7 @@ channel_listener_describe_transport(channel_listener_t *chan_l)
/**
* Return the number of entries in <b>queue</b>
*/
-static int
+STATIC int
chan_cell_queue_len(const chan_cell_queue_t *queue)
{
int r = 0;
@@ -3179,8 +3413,8 @@ chan_cell_queue_len(const chan_cell_queue_t *queue)
* Dump statistics for one channel to the log
*/
-void
-channel_dump_statistics(channel_t *chan, int severity)
+MOCK_IMPL(void,
+channel_dump_statistics, (channel_t *chan, int severity))
{
double avg, interval, age;
time_t now = time(NULL);
@@ -3332,12 +3566,22 @@ channel_dump_statistics(channel_t *chan, int severity)
/* Describe counters and rates */
tor_log(severity, LD_GENERAL,
" * Channel " U64_FORMAT " has received "
- U64_FORMAT " cells and transmitted " U64_FORMAT,
+ U64_FORMAT " bytes in " U64_FORMAT " cells and transmitted "
+ U64_FORMAT " bytes in " U64_FORMAT " cells",
U64_PRINTF_ARG(chan->global_identifier),
+ U64_PRINTF_ARG(chan->n_bytes_recved),
U64_PRINTF_ARG(chan->n_cells_recved),
+ U64_PRINTF_ARG(chan->n_bytes_xmitted),
U64_PRINTF_ARG(chan->n_cells_xmitted));
if (now > chan->timestamp_created &&
chan->timestamp_created > 0) {
+ if (chan->n_bytes_recved > 0) {
+ avg = (double)(chan->n_bytes_recved) / age;
+ tor_log(severity, LD_GENERAL,
+ " * Channel " U64_FORMAT " has averaged %f "
+ "bytes received per second",
+ U64_PRINTF_ARG(chan->global_identifier), avg);
+ }
if (chan->n_cells_recved > 0) {
avg = (double)(chan->n_cells_recved) / age;
if (avg >= 1.0) {
@@ -3353,6 +3597,13 @@ channel_dump_statistics(channel_t *chan, int severity)
U64_PRINTF_ARG(chan->global_identifier), interval);
}
}
+ if (chan->n_bytes_xmitted > 0) {
+ avg = (double)(chan->n_bytes_xmitted) / age;
+ tor_log(severity, LD_GENERAL,
+ " * Channel " U64_FORMAT " has averaged %f "
+ "bytes transmitted per second",
+ U64_PRINTF_ARG(chan->global_identifier), avg);
+ }
if (chan->n_cells_xmitted > 0) {
avg = (double)(chan->n_cells_xmitted) / age;
if (avg >= 1.0) {
@@ -3770,6 +4021,50 @@ channel_mark_outgoing(channel_t *chan)
chan->is_incoming = 0;
}
+/************************
+ * Flow control queries *
+ ***********************/
+
+/*
+ * Get the latest estimate for the total queue size of all open channels
+ */
+
+uint64_t
+channel_get_global_queue_estimate(void)
+{
+ return estimated_total_queue_size;
+}
+
+/*
+ * Estimate the number of writeable cells
+ *
+ * Ask the lower layer for an estimate of how many cells it can accept, and
+ * then subtract the length of our outgoing_queue, if any, to produce an
+ * estimate of the number of cells this channel can accept for writes.
+ */
+
+int
+channel_num_cells_writeable(channel_t *chan)
+{
+ int result;
+
+ tor_assert(chan);
+ tor_assert(chan->num_cells_writeable);
+
+ if (chan->state == CHANNEL_STATE_OPEN) {
+ /* Query lower layer */
+ result = chan->num_cells_writeable(chan);
+ /* Subtract cell queue length, if any */
+ result -= chan_cell_queue_len(&chan->outgoing_queue);
+ if (result < 0) result = 0;
+ } else {
+ /* No cells are writeable in any other state */
+ result = 0;
+ }
+
+ return result;
+}
+
/*********************
* Timestamp updates *
********************/
@@ -4172,3 +4467,87 @@ channel_set_circid_type(channel_t *chan,
}
}
+/**
+ * Update the estimated number of bytes queued to transmit for this channel,
+ * and notify the scheduler. The estimate includes both the channel queue and
+ * the queue size reported by the lower layer, and an overhead estimate
+ * optionally provided by the lower layer.
+ */
+
+void
+channel_update_xmit_queue_size(channel_t *chan)
+{
+ uint64_t queued, adj;
+ double overhead;
+
+ tor_assert(chan);
+ tor_assert(chan->num_bytes_queued);
+
+ /*
+ * First, get the number of bytes we have queued without factoring in
+ * lower-layer overhead.
+ */
+ queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue;
+ /* Next, adjust by the overhead factor, if any is available */
+ if (chan->get_overhead_estimate) {
+ overhead = chan->get_overhead_estimate(chan);
+ if (overhead >= 1.0f) {
+ queued *= overhead;
+ } else {
+ /* Ignore silly overhead factors */
+ log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead);
+ }
+ }
+
+ /* Now, compare to the previous estimate */
+ if (queued > chan->bytes_queued_for_xmit) {
+ adj = queued - chan->bytes_queued_for_xmit;
+ log_debug(LD_CHANNEL,
+ "Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT
+ " from " U64_FORMAT " to " U64_FORMAT,
+ U64_PRINTF_ARG(chan->global_identifier),
+ U64_PRINTF_ARG(adj),
+ U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
+ U64_PRINTF_ARG(queued));
+ /* Update the channel's estimate */
+ chan->bytes_queued_for_xmit = queued;
+
+ /* Update the global queue size estimate if appropriate */
+ if (chan->state == CHANNEL_STATE_OPEN ||
+ chan->state == CHANNEL_STATE_MAINT) {
+ estimated_total_queue_size += adj;
+ log_debug(LD_CHANNEL,
+ "Increasing global queue size by " U64_FORMAT " for channel "
+ U64_FORMAT ", new size is " U64_FORMAT,
+ U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
+ U64_PRINTF_ARG(estimated_total_queue_size));
+ /* Tell the scheduler we're increasing the queue size */
+ scheduler_adjust_queue_size(chan, 1, adj);
+ }
+ } else if (queued < chan->bytes_queued_for_xmit) {
+ adj = chan->bytes_queued_for_xmit - queued;
+ log_debug(LD_CHANNEL,
+ "Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT
+ " from " U64_FORMAT " to " U64_FORMAT,
+ U64_PRINTF_ARG(chan->global_identifier),
+ U64_PRINTF_ARG(adj),
+ U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
+ U64_PRINTF_ARG(queued));
+ /* Update the channel's estimate */
+ chan->bytes_queued_for_xmit = queued;
+
+ /* Update the global queue size estimate if appropriate */
+ if (chan->state == CHANNEL_STATE_OPEN ||
+ chan->state == CHANNEL_STATE_MAINT) {
+ estimated_total_queue_size -= adj;
+ log_debug(LD_CHANNEL,
+ "Decreasing global queue size by " U64_FORMAT " for channel "
+ U64_FORMAT ", new size is " U64_FORMAT,
+ U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
+ U64_PRINTF_ARG(estimated_total_queue_size));
+ /* Tell the scheduler we're decreasing the queue size */
+ scheduler_adjust_queue_size(chan, -1, adj);
+ }
+ }
+}
+
diff --git a/src/or/channel.h b/src/or/channel.h
index 5a2431b30a..ec7a15f5f1 100644
--- a/src/or/channel.h
+++ b/src/or/channel.h
@@ -57,6 +57,32 @@ struct channel_s {
CHANNEL_CLOSE_FOR_ERROR
} reason_for_closing;
+ /** State variable for use by the scheduler */
+ enum {
+ /*
+ * The channel is not open, or it has a full output buffer but no queued
+ * cells.
+ */
+ SCHED_CHAN_IDLE = 0,
+ /*
+ * The channel has space on its output buffer to write, but no queued
+ * cells.
+ */
+ SCHED_CHAN_WAITING_FOR_CELLS,
+ /*
+ * The scheduler has queued cells but no output buffer space to write.
+ */
+ SCHED_CHAN_WAITING_TO_WRITE,
+ /*
+ * The scheduler has both queued cells and output buffer space, and is
+ * eligible for the scheduler loop.
+ */
+ SCHED_CHAN_PENDING
+ } scheduler_state;
+
+ /** Heap index for use by the scheduler */
+ int sched_heap_idx;
+
/** Timestamps for both cell channels and listeners */
time_t timestamp_created; /* Channel created */
time_t timestamp_active; /* Any activity */
@@ -79,6 +105,11 @@ struct channel_s {
/* Methods implemented by the lower layer */
/**
+ * Ask the lower layer for an estimate of the average overhead for
+ * transmissions on this channel.
+ */
+ double (*get_overhead_estimate)(channel_t *);
+ /*
* Ask the underlying transport what the remote endpoint address is, in
* a tor_addr_t. This is optional and subclasses may leave this NULL.
* If they implement it, they should write the address out to the
@@ -110,7 +141,11 @@ struct channel_s {
int (*matches_extend_info)(channel_t *, extend_info_t *);
/** Check if this channel matches a target address when extending */
int (*matches_target)(channel_t *, const tor_addr_t *);
- /** Write a cell to an open channel */
+ /* Ask the lower layer how many bytes it has queued but not yet sent */
+ size_t (*num_bytes_queued)(channel_t *);
+ /* Ask the lower layer how many cells can be written */
+ int (*num_cells_writeable)(channel_t *);
+ /* Write a cell to an open channel */
int (*write_cell)(channel_t *, cell_t *);
/** Write a packed cell to an open channel */
int (*write_packed_cell)(channel_t *, packed_cell_t *);
@@ -198,8 +233,16 @@ struct channel_s {
uint64_t dirreq_id;
/** Channel counters for cell channels */
- uint64_t n_cells_recved;
- uint64_t n_cells_xmitted;
+ uint64_t n_cells_recved, n_bytes_recved;
+ uint64_t n_cells_xmitted, n_bytes_xmitted;
+
+ /** Our current contribution to the scheduler's total xmit queue */
+ uint64_t bytes_queued_for_xmit;
+
+ /** Number of bytes in this channel's cell queue; does not include
+ * lower-layer queueing.
+ */
+ uint64_t bytes_in_queue;
};
struct channel_listener_s {
@@ -311,6 +354,34 @@ void channel_set_cmux_policy_everywhere(circuitmux_policy_t *pol);
#ifdef TOR_CHANNEL_INTERNAL_
+#ifdef CHANNEL_PRIVATE_
+/* Cell queue structure (here rather than channel.c for test suite use) */
+
+typedef struct cell_queue_entry_s cell_queue_entry_t;
+struct cell_queue_entry_s {
+ TOR_SIMPLEQ_ENTRY(cell_queue_entry_s) next;
+ enum {
+ CELL_QUEUE_FIXED,
+ CELL_QUEUE_VAR,
+ CELL_QUEUE_PACKED
+ } type;
+ union {
+ struct {
+ cell_t *cell;
+ } fixed;
+ struct {
+ var_cell_t *var_cell;
+ } var;
+ struct {
+ packed_cell_t *packed_cell;
+ } packed;
+ } u;
+};
+
+/* Cell queue functions for benefit of test suite */
+STATIC int chan_cell_queue_len(const chan_cell_queue_t *queue);
+#endif
+
/* Channel operations for subclasses and internal use only */
/* Initialize a newly allocated channel - do this first in subclass
@@ -384,7 +455,8 @@ void channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell);
void channel_flush_cells(channel_t *chan);
/* Request from lower layer for more cells if available */
-ssize_t channel_flush_some_cells(channel_t *chan, ssize_t num_cells);
+MOCK_DECL(ssize_t, channel_flush_some_cells,
+ (channel_t *chan, ssize_t num_cells));
/* Query if data available on this channel */
int channel_more_to_flush(channel_t *chan);
@@ -468,7 +540,7 @@ channel_is_in_state(channel_t *chan, channel_state_t state)
*/
const char * channel_describe_transport(channel_t *chan);
-void channel_dump_statistics(channel_t *chan, int severity);
+MOCK_DECL(void, channel_dump_statistics, (channel_t *chan, int severity));
void channel_dump_transport_statistics(channel_t *chan, int severity);
const char * channel_get_actual_remote_descr(channel_t *chan);
const char * channel_get_actual_remote_address(channel_t *chan);
@@ -491,6 +563,7 @@ unsigned int channel_num_circuits(channel_t *chan);
void channel_set_circid_type(channel_t *chan, crypto_pk_t *identity_rcvd,
int consider_identity);
void channel_timestamp_client(channel_t *chan);
+void channel_update_xmit_queue_size(channel_t *chan);
const char * channel_listener_describe_transport(channel_listener_t *chan_l);
void channel_listener_dump_statistics(channel_listener_t *chan_l,
@@ -498,6 +571,10 @@ void channel_listener_dump_statistics(channel_listener_t *chan_l,
void channel_listener_dump_transport_statistics(channel_listener_t *chan_l,
int severity);
+/* Flow control queries */
+uint64_t channel_get_global_queue_estimate(void);
+int channel_num_cells_writeable(channel_t *chan);
+
/* Timestamp queries */
time_t channel_when_created(channel_t *chan);
time_t channel_when_last_active(channel_t *chan);
diff --git a/src/or/channeltls.c b/src/or/channeltls.c
index 47f5d34bf7..b02acdb159 100644
--- a/src/or/channeltls.c
+++ b/src/or/channeltls.c
@@ -25,6 +25,7 @@
#include "relay.h"
#include "router.h"
#include "routerlist.h"
+#include "scheduler.h"
/** How many CELL_PADDING cells have we received, ever? */
uint64_t stats_n_padding_cells_processed = 0;
@@ -54,6 +55,7 @@ static void channel_tls_common_init(channel_tls_t *tlschan);
static void channel_tls_close_method(channel_t *chan);
static const char * channel_tls_describe_transport_method(channel_t *chan);
static void channel_tls_free_method(channel_t *chan);
+static double channel_tls_get_overhead_estimate_method(channel_t *chan);
static int
channel_tls_get_remote_addr_method(channel_t *chan, tor_addr_t *addr_out);
static int
@@ -67,6 +69,8 @@ channel_tls_matches_extend_info_method(channel_t *chan,
extend_info_t *extend_info);
static int channel_tls_matches_target_method(channel_t *chan,
const tor_addr_t *target);
+static int channel_tls_num_cells_writeable_method(channel_t *chan);
+static size_t channel_tls_num_bytes_queued_method(channel_t *chan);
static int channel_tls_write_cell_method(channel_t *chan,
cell_t *cell);
static int channel_tls_write_packed_cell_method(channel_t *chan,
@@ -116,6 +120,7 @@ channel_tls_common_init(channel_tls_t *tlschan)
chan->close = channel_tls_close_method;
chan->describe_transport = channel_tls_describe_transport_method;
chan->free = channel_tls_free_method;
+ chan->get_overhead_estimate = channel_tls_get_overhead_estimate_method;
chan->get_remote_addr = channel_tls_get_remote_addr_method;
chan->get_remote_descr = channel_tls_get_remote_descr_method;
chan->get_transport_name = channel_tls_get_transport_name_method;
@@ -123,6 +128,8 @@ channel_tls_common_init(channel_tls_t *tlschan)
chan->is_canonical = channel_tls_is_canonical_method;
chan->matches_extend_info = channel_tls_matches_extend_info_method;
chan->matches_target = channel_tls_matches_target_method;
+ chan->num_bytes_queued = channel_tls_num_bytes_queued_method;
+ chan->num_cells_writeable = channel_tls_num_cells_writeable_method;
chan->write_cell = channel_tls_write_cell_method;
chan->write_packed_cell = channel_tls_write_packed_cell_method;
chan->write_var_cell = channel_tls_write_var_cell_method;
@@ -435,6 +442,40 @@ channel_tls_free_method(channel_t *chan)
}
/**
+ * Get an estimate of the average TLS overhead for the upper layer
+ */
+
+static double
+channel_tls_get_overhead_estimate_method(channel_t *chan)
+{
+ double overhead = 1.0f;
+ channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan);
+
+ tor_assert(tlschan);
+ tor_assert(tlschan->conn);
+
+ /* Just return 1.0f if we don't have sensible data */
+ if (tlschan->conn->bytes_xmitted > 0 &&
+ tlschan->conn->bytes_xmitted_by_tls >=
+ tlschan->conn->bytes_xmitted) {
+ overhead = ((double)(tlschan->conn->bytes_xmitted_by_tls)) /
+ ((double)(tlschan->conn->bytes_xmitted));
+
+ /*
+ * Never estimate more than 2.0; otherwise we get silly large estimates
+ * at the very start of a new TLS connection.
+ */
+ if (overhead > 2.0f) overhead = 2.0f;
+ }
+
+ log_debug(LD_CHANNEL,
+ "Estimated overhead ratio for TLS chan " U64_FORMAT " is %f",
+ U64_PRINTF_ARG(chan->global_identifier), overhead);
+
+ return overhead;
+}
+
+/**
* Get the remote address of a channel_tls_t
*
* This implements the get_remote_addr method for channel_tls_t; copy the
@@ -673,6 +714,53 @@ channel_tls_matches_target_method(channel_t *chan,
}
/**
+ * Tell the upper layer how many bytes we have queued and not yet
+ * sent.
+ */
+
+static size_t
+channel_tls_num_bytes_queued_method(channel_t *chan)
+{
+ channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan);
+
+ tor_assert(tlschan);
+ tor_assert(tlschan->conn);
+
+ return connection_get_outbuf_len(TO_CONN(tlschan->conn));
+}
+
+/**
+ * Tell the upper layer how many cells we can accept to write
+ *
+ * This implements the num_cells_writeable method for channel_tls_t; it
+ * returns an estimate of the number of cells we can accept with
+ * channel_tls_write_*_cell().
+ */
+
+static int
+channel_tls_num_cells_writeable_method(channel_t *chan)
+{
+ size_t outbuf_len;
+ ssize_t n;
+ channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan);
+ size_t cell_network_size;
+
+ tor_assert(tlschan);
+ tor_assert(tlschan->conn);
+
+ cell_network_size = get_cell_network_size(tlschan->conn->wide_circ_ids);
+ outbuf_len = connection_get_outbuf_len(TO_CONN(tlschan->conn));
+ /* Get the number of cells */
+ n = CEIL_DIV(OR_CONN_HIGHWATER - outbuf_len, cell_network_size);
+ if (n < 0) n = 0;
+#if SIZEOF_SIZE_T > SIZEOF_INT
+ if (n > INT_MAX) n = INT_MAX;
+#endif
+
+ return (int)n;
+}
+
+/**
* Write a cell to a channel_tls_t
*
* This implements the write_cell method for channel_tls_t; given a
@@ -867,6 +955,10 @@ channel_tls_handle_state_change_on_orconn(channel_tls_t *chan,
* CHANNEL_STATE_MAINT on this.
*/
channel_change_state(base_chan, CHANNEL_STATE_OPEN);
+ /* We might have just become writeable; check and tell the scheduler */
+ if (connection_or_num_cells_writeable(conn) > 0) {
+ scheduler_channel_wants_writes(base_chan);
+ }
} else {
/*
* Not open, so from CHANNEL_STATE_OPEN we go to CHANNEL_STATE_MAINT,
@@ -878,58 +970,6 @@ channel_tls_handle_state_change_on_orconn(channel_tls_t *chan,
}
}
-/**
- * Flush cells from a channel_tls_t
- *
- * Try to flush up to about num_cells cells, and return how many we flushed.
- */
-
-ssize_t
-channel_tls_flush_some_cells(channel_tls_t *chan, ssize_t num_cells)
-{
- ssize_t flushed = 0;
-
- tor_assert(chan);
-
- if (flushed >= num_cells) goto done;
-
- /*
- * If channel_tls_t ever buffers anything below the channel_t layer, flush
- * that first here.
- */
-
- flushed += channel_flush_some_cells(TLS_CHAN_TO_BASE(chan),
- num_cells - flushed);
-
- /*
- * If channel_tls_t ever buffers anything below the channel_t layer, check
- * how much we actually got and push it on down here.
- */
-
- done:
- return flushed;
-}
-
-/**
- * Check if a channel_tls_t has anything to flush
- *
- * Return true if there is any more to flush on this channel (cells in queue
- * or active circuits).
- */
-
-int
-channel_tls_more_to_flush(channel_tls_t *chan)
-{
- tor_assert(chan);
-
- /*
- * If channel_tls_t ever buffers anything below channel_t, the
- * check for that should go here first.
- */
-
- return channel_more_to_flush(TLS_CHAN_TO_BASE(chan));
-}
-
#ifdef KEEP_TIMING_STATS
/**
diff --git a/src/or/channeltls.h b/src/or/channeltls.h
index 06be6fa555..133ad43bb4 100644
--- a/src/or/channeltls.h
+++ b/src/or/channeltls.h
@@ -40,8 +40,6 @@ channel_t * channel_tls_to_base(channel_tls_t *tlschan);
channel_tls_t * channel_tls_from_base(channel_t *chan);
/* Things for connection_or.c to call back into */
-ssize_t channel_tls_flush_some_cells(channel_tls_t *chan, ssize_t num_cells);
-int channel_tls_more_to_flush(channel_tls_t *chan);
void channel_tls_handle_cell(cell_t *cell, or_connection_t *conn);
void channel_tls_handle_state_change_on_orconn(channel_tls_t *chan,
or_connection_t *conn,
diff --git a/src/or/circuitbuild.c b/src/or/circuitbuild.c
index 847a0c0112..faddc08e03 100644
--- a/src/or/circuitbuild.c
+++ b/src/or/circuitbuild.c
@@ -14,6 +14,7 @@
#include "or.h"
#include "channel.h"
#include "circpathbias.h"
+#define CIRCUITBUILD_PRIVATE
#include "circuitbuild.h"
#include "circuitlist.h"
#include "circuitstats.h"
@@ -943,9 +944,9 @@ circuit_send_next_onion_skin(origin_circuit_t *circ)
circuit_rep_hist_note_result(circ);
circuit_has_opened(circ); /* do other actions as necessary */
- if (!can_complete_circuit && !circ->build_state->onehop_tunnel) {
+ if (!have_completed_a_circuit() && !circ->build_state->onehop_tunnel) {
const or_options_t *options = get_options();
- can_complete_circuit=1;
+ note_that_we_completed_a_circuit();
/* FFFF Log a count of known routers here */
log_notice(LD_GENERAL,
"Tor has successfully opened a circuit. "
@@ -1033,7 +1034,8 @@ circuit_note_clock_jumped(int seconds_elapsed)
seconds_elapsed >=0 ? "forward" : "backward");
control_event_general_status(LOG_WARN, "CLOCK_JUMPED TIME=%d",
seconds_elapsed);
- can_complete_circuit=0; /* so it'll log when it works again */
+ /* so we log when it works again */
+ note_that_we_maybe_cant_complete_circuits();
control_event_client_status(severity, "CIRCUIT_NOT_ESTABLISHED REASON=%s",
"CLOCK_JUMPED");
circuit_mark_all_unused_circs();
diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c
index fad79c116d..affb015177 100644
--- a/src/or/circuitlist.c
+++ b/src/or/circuitlist.c
@@ -302,8 +302,8 @@ channel_note_destroy_pending(channel_t *chan, circid_t id)
/** Called to indicate that a DESTROY is no longer pending on <b>chan</b> with
* circuit ID <b>id</b> -- typically, because it has been sent. */
-void
-channel_note_destroy_not_pending(channel_t *chan, circid_t id)
+MOCK_IMPL(void, channel_note_destroy_not_pending,
+ (channel_t *chan, circid_t id))
{
circuit_t *circ = circuit_get_by_circid_channel_even_if_marked(id,chan);
if (circ) {
diff --git a/src/or/circuitlist.h b/src/or/circuitlist.h
index addaa725d4..ea1076d53f 100644
--- a/src/or/circuitlist.h
+++ b/src/or/circuitlist.h
@@ -72,7 +72,8 @@ void circuit_free_all(void);
void circuits_handle_oom(size_t current_allocation);
void channel_note_destroy_pending(channel_t *chan, circid_t id);
-void channel_note_destroy_not_pending(channel_t *chan, circid_t id);
+MOCK_DECL(void, channel_note_destroy_not_pending,
+ (channel_t *chan, circid_t id));
#ifdef CIRCUITLIST_PRIVATE
STATIC void circuit_free(circuit_t *circ);
diff --git a/src/or/circuitmux.c b/src/or/circuitmux.c
index 663711c6c0..443dad0a54 100644
--- a/src/or/circuitmux.c
+++ b/src/or/circuitmux.c
@@ -621,8 +621,8 @@ circuitmux_clear_policy(circuitmux_t *cmux)
* Return the policy currently installed on a circuitmux_t
*/
-const circuitmux_policy_t *
-circuitmux_get_policy(circuitmux_t *cmux)
+MOCK_IMPL(const circuitmux_policy_t *,
+circuitmux_get_policy, (circuitmux_t *cmux))
{
tor_assert(cmux);
@@ -896,8 +896,8 @@ circuitmux_num_cells_for_circuit(circuitmux_t *cmux, circuit_t *circ)
* Query total number of available cells on a circuitmux
*/
-unsigned int
-circuitmux_num_cells(circuitmux_t *cmux)
+MOCK_IMPL(unsigned int,
+circuitmux_num_cells, (circuitmux_t *cmux))
{
tor_assert(cmux);
@@ -1951,3 +1951,51 @@ circuitmux_count_queued_destroy_cells(const channel_t *chan,
return n_destroy_cells;
}
+/**
+ * Compare cmuxes to see which is more preferred; return < 0 if
+ * cmux_1 has higher priority (i.e., cmux_1 < cmux_2 in the scheduler's
+ * sort order), > 0 if cmux_2 has higher priority, or 0 if they are
+ * equally preferred.
+ *
+ * If the cmuxes have different cmux policies or the policy does not
+ * support the cmp_cmux method, return 0.
+ */
+
+MOCK_IMPL(int,
+circuitmux_compare_muxes, (circuitmux_t *cmux_1, circuitmux_t *cmux_2))
+{
+ const circuitmux_policy_t *policy;
+
+ tor_assert(cmux_1);
+ tor_assert(cmux_2);
+
+ if (cmux_1 == cmux_2) {
+ /* Equivalent because they're the same cmux */
+ return 0;
+ }
+
+ if (cmux_1->policy && cmux_2->policy) {
+ if (cmux_1->policy == cmux_2->policy) {
+ policy = cmux_1->policy;
+
+ if (policy->cmp_cmux) {
+ /* Okay, we can compare! */
+ return policy->cmp_cmux(cmux_1, cmux_1->policy_data,
+ cmux_2, cmux_2->policy_data);
+ } else {
+ /*
+ * Equivalent because the policy doesn't know how to compare between
+ * muxes.
+ */
+ return 0;
+ }
+ } else {
+ /* Equivalent because they have different policies */
+ return 0;
+ }
+ } else {
+ /* Equivalent because one or both are missing a policy */
+ return 0;
+ }
+}
+
diff --git a/src/or/circuitmux.h b/src/or/circuitmux.h
index eade2486a2..53092cd66c 100644
--- a/src/or/circuitmux.h
+++ b/src/or/circuitmux.h
@@ -57,6 +57,9 @@ struct circuitmux_policy_s {
/* Choose a circuit */
circuit_t * (*pick_active_circuit)(circuitmux_t *cmux,
circuitmux_policy_data_t *pol_data);
+ /* Optional: channel comparator for use by the scheduler */
+ int (*cmp_cmux)(circuitmux_t *cmux_1, circuitmux_policy_data_t *pol_data_1,
+ circuitmux_t *cmux_2, circuitmux_policy_data_t *pol_data_2);
};
/*
@@ -105,7 +108,8 @@ void circuitmux_free(circuitmux_t *cmux);
/* Policy control */
void circuitmux_clear_policy(circuitmux_t *cmux);
-const circuitmux_policy_t * circuitmux_get_policy(circuitmux_t *cmux);
+MOCK_DECL(const circuitmux_policy_t *,
+ circuitmux_get_policy, (circuitmux_t *cmux));
void circuitmux_set_policy(circuitmux_t *cmux,
const circuitmux_policy_t *pol);
@@ -117,7 +121,7 @@ int circuitmux_is_circuit_attached(circuitmux_t *cmux, circuit_t *circ);
int circuitmux_is_circuit_active(circuitmux_t *cmux, circuit_t *circ);
unsigned int circuitmux_num_cells_for_circuit(circuitmux_t *cmux,
circuit_t *circ);
-unsigned int circuitmux_num_cells(circuitmux_t *cmux);
+MOCK_DECL(unsigned int, circuitmux_num_cells, (circuitmux_t *cmux));
unsigned int circuitmux_num_circuits(circuitmux_t *cmux);
unsigned int circuitmux_num_active_circuits(circuitmux_t *cmux);
@@ -148,5 +152,9 @@ void circuitmux_append_destroy_cell(channel_t *chan,
void circuitmux_mark_destroyed_circids_usable(circuitmux_t *cmux,
channel_t *chan);
+/* Optional interchannel comparisons for scheduling */
+MOCK_DECL(int, circuitmux_compare_muxes,
+ (circuitmux_t *cmux_1, circuitmux_t *cmux_2));
+
#endif /* TOR_CIRCUITMUX_H */
diff --git a/src/or/circuitmux_ewma.c b/src/or/circuitmux_ewma.c
index 49d899e5e7..0d7d6ef197 100644
--- a/src/or/circuitmux_ewma.c
+++ b/src/or/circuitmux_ewma.c
@@ -187,6 +187,9 @@ ewma_notify_xmit_cells(circuitmux_t *cmux,
static circuit_t *
ewma_pick_active_circuit(circuitmux_t *cmux,
circuitmux_policy_data_t *pol_data);
+static int
+ewma_cmp_cmux(circuitmux_t *cmux_1, circuitmux_policy_data_t *pol_data_1,
+ circuitmux_t *cmux_2, circuitmux_policy_data_t *pol_data_2);
/*** EWMA global variables ***/
@@ -209,7 +212,8 @@ circuitmux_policy_t ewma_policy = {
/*.notify_circ_inactive =*/ ewma_notify_circ_inactive,
/*.notify_set_n_cells =*/ NULL, /* EWMA doesn't need this */
/*.notify_xmit_cells =*/ ewma_notify_xmit_cells,
- /*.pick_active_circuit =*/ ewma_pick_active_circuit
+ /*.pick_active_circuit =*/ ewma_pick_active_circuit,
+ /*.cmp_cmux =*/ ewma_cmp_cmux
};
/*** EWMA method implementations using the below EWMA helper functions ***/
@@ -453,6 +457,58 @@ ewma_pick_active_circuit(circuitmux_t *cmux,
return circ;
}
+/**
+ * Compare two EWMA cmuxes, and return -1, 0 or 1 to indicate which should
+ * be more preferred - see circuitmux_compare_muxes() of circuitmux.c.
+ */
+
+static int
+ewma_cmp_cmux(circuitmux_t *cmux_1, circuitmux_policy_data_t *pol_data_1,
+ circuitmux_t *cmux_2, circuitmux_policy_data_t *pol_data_2)
+{
+ ewma_policy_data_t *p1 = NULL, *p2 = NULL;
+ cell_ewma_t *ce1 = NULL, *ce2 = NULL;
+
+ tor_assert(cmux_1);
+ tor_assert(pol_data_1);
+ tor_assert(cmux_2);
+ tor_assert(pol_data_2);
+
+ p1 = TO_EWMA_POL_DATA(pol_data_1);
+ p2 = TO_EWMA_POL_DATA(pol_data_1);
+
+ if (p1 != p2) {
+ /* Get the head cell_ewma_t from each queue */
+ if (smartlist_len(p1->active_circuit_pqueue) > 0) {
+ ce1 = smartlist_get(p1->active_circuit_pqueue, 0);
+ }
+
+ if (smartlist_len(p2->active_circuit_pqueue) > 0) {
+ ce2 = smartlist_get(p2->active_circuit_pqueue, 0);
+ }
+
+ /* Got both of them? */
+ if (ce1 != NULL && ce2 != NULL) {
+ /* Pick whichever one has the better best circuit */
+ return compare_cell_ewma_counts(ce1, ce2);
+ } else {
+ if (ce1 != NULL ) {
+ /* We only have a circuit on cmux_1, so prefer it */
+ return -1;
+ } else if (ce2 != NULL) {
+ /* We only have a circuit on cmux_2, so prefer it */
+ return 1;
+ } else {
+ /* No circuits at all; no preference */
+ return 0;
+ }
+ }
+ } else {
+ /* We got identical params */
+ return 0;
+ }
+}
+
/** Helper for sorting cell_ewma_t values in their priority queue. */
static int
compare_cell_ewma_counts(const void *p1, const void *p2)
diff --git a/src/or/circuituse.c b/src/or/circuituse.c
index ad4a3a546d..90571360de 100644
--- a/src/or/circuituse.c
+++ b/src/or/circuituse.c
@@ -2012,7 +2012,7 @@ circuit_get_open_circ_or_launch(entry_connection_t *conn,
circ->rend_data = rend_data_dup(ENTRY_TO_EDGE_CONN(conn)->rend_data);
if (circ->base_.purpose == CIRCUIT_PURPOSE_C_ESTABLISH_REND &&
circ->base_.state == CIRCUIT_STATE_OPEN)
- rend_client_rendcirc_has_opened(circ);
+ circuit_has_opened(circ);
}
}
} /* endif (!circ) */
diff --git a/src/or/command.c b/src/or/command.c
index 268c495371..8e214bf0a4 100644
--- a/src/or/command.c
+++ b/src/or/command.c
@@ -438,6 +438,7 @@ command_process_created_cell(cell_t *cell, channel_t *chan)
static void
command_process_relay_cell(cell_t *cell, channel_t *chan)
{
+ const or_options_t *options = get_options();
circuit_t *circ;
int reason, direction;
@@ -511,6 +512,14 @@ command_process_relay_cell(cell_t *cell, channel_t *chan)
direction==CELL_DIRECTION_OUT?"forward":"backward");
circuit_mark_for_close(circ, -reason);
}
+
+ /* If this is a cell in an RP circuit, count it as part of the
+ hidden service stats */
+ if (options->HiddenServiceStatistics &&
+ !CIRCUIT_IS_ORIGIN(circ) &&
+ TO_OR_CIRCUIT(circ)->circuit_carries_hs_traffic_stats) {
+ rep_hist_seen_new_rp_cell();
+ }
}
/** Process a 'destroy' <b>cell</b> that just arrived from
diff --git a/src/or/config.c b/src/or/config.c
index 4b8c6834e9..701d8977ac 100644
--- a/src/or/config.c
+++ b/src/or/config.c
@@ -43,6 +43,7 @@
#include "util.h"
#include "routerlist.h"
#include "routerset.h"
+#include "scheduler.h"
#include "statefile.h"
#include "transports.h"
#include "ext_orport.h"
@@ -267,6 +268,7 @@ static config_var_t option_vars_[] = {
VAR("HiddenServicePort", LINELIST_S, RendConfigLines, NULL),
VAR("HiddenServiceVersion",LINELIST_S, RendConfigLines, NULL),
VAR("HiddenServiceAuthorizeClient",LINELIST_S,RendConfigLines, NULL),
+ V(HiddenServiceStatistics, BOOL, "0"),
V(HidServAuth, LINELIST, NULL),
V(CloseHSClientCircuitsImmediatelyOnTimeout, BOOL, "0"),
V(CloseHSServiceRendCircuitsImmediatelyOnTimeout, BOOL, "0"),
@@ -368,6 +370,9 @@ static config_var_t option_vars_[] = {
V(ServerDNSSearchDomains, BOOL, "0"),
V(ServerDNSTestAddresses, CSV,
"www.google.com,www.mit.edu,www.yahoo.com,www.slashdot.org"),
+ V(SchedulerLowWaterMark__, MEMUNIT, "100 MB"),
+ V(SchedulerHighWaterMark__, MEMUNIT, "101 MB"),
+ V(SchedulerMaxFlushCells__, UINT, "1000"),
V(ShutdownWaitLength, INTERVAL, "30 seconds"),
V(SocksListenAddress, LINELIST, NULL),
V(SocksPolicy, LINELIST, NULL),
@@ -377,7 +382,7 @@ static config_var_t option_vars_[] = {
OBSOLETE("StrictEntryNodes"),
OBSOLETE("StrictExitNodes"),
V(StrictNodes, BOOL, "0"),
- V(Support022HiddenServices, AUTOBOOL, "auto"),
+ OBSOLETE("Support022HiddenServices"),
V(TestSocks, BOOL, "0"),
V(TokenBucketRefillInterval, MSEC_INTERVAL, "100 msec"),
V(Tor2webMode, BOOL, "0"),
@@ -824,19 +829,22 @@ add_default_trusted_dir_authorities(dirinfo_type_t type)
"moria1 orport=9101 "
"v3ident=D586D18309DED4CD6D57C18FDB97EFA96D330566 "
"128.31.0.39:9131 9695 DFC3 5FFE B861 329B 9F1A B04C 4639 7020 CE31",
- "tor26 orport=443 v3ident=14C131DFC5C6F93646BE72FA1401C02A8DF2E8B4 "
+ "tor26 orport=443 "
+ "v3ident=14C131DFC5C6F93646BE72FA1401C02A8DF2E8B4 "
"86.59.21.38:80 847B 1F85 0344 D787 6491 A548 92F9 0493 4E4E B85D",
- "dizum orport=443 v3ident=E8A9C45EDE6D711294FADF8E7951F4DE6CA56B58 "
+ "dizum orport=443 "
+ "v3ident=E8A9C45EDE6D711294FADF8E7951F4DE6CA56B58 "
"194.109.206.212:80 7EA6 EAD6 FD83 083C 538F 4403 8BBF A077 587D D755",
- "Tonga orport=443 bridge 82.94.251.203:80 "
- "4A0C CD2D DC79 9508 3D73 F5D6 6710 0C8A 5831 F16D",
+ "Tonga orport=443 bridge "
+ "82.94.251.203:80 4A0C CD2D DC79 9508 3D73 F5D6 6710 0C8A 5831 F16D",
"gabelmoo orport=443 "
"v3ident=ED03BB616EB2F60BEC80151114BB25CEF515B226 "
"131.188.40.189:80 F204 4413 DAC2 E02E 3D6B CF47 35A1 9BCA 1DE9 7281",
"dannenberg orport=443 "
"v3ident=585769C78764D58426B8B52B6651A5A71137189A "
"193.23.244.244:80 7BE6 83E6 5D48 1413 21C5 ED92 F075 C553 64AC 7123",
- "urras orport=80 v3ident=80550987E1D626E3EBA5E5E75A458DE0626D088C "
+ "urras orport=80 "
+ "v3ident=80550987E1D626E3EBA5E5E75A458DE0626D088C "
"208.83.223.34:443 0AD3 FA88 4D18 F89E EA2D 89C0 1937 9E0E 7FD9 4417",
"maatuska orport=80 "
"v3ident=49015F787433103580E3B66A1707A00E60F2D15B "
@@ -846,7 +854,7 @@ add_default_trusted_dir_authorities(dirinfo_type_t type)
"154.35.32.5:80 CF6D 0AAF B385 BE71 B8E1 11FC 5CFF 4B47 9237 33BC",
"longclaw orport=443 "
"v3ident=23D15D965BC35114467363C165C4F724B64B4F66 "
- "202.85.227.202:80 74A9 1064 6BCE EFBC D2E8 74FC 1DC9 9743 0F96 8145",
+ "199.254.238.52:80 74A9 1064 6BCE EFBC D2E8 74FC 1DC9 9743 0F96 8145",
NULL
};
for (i=0; authorities[i]; i++) {
@@ -1042,6 +1050,14 @@ options_act_reversible(const or_options_t *old_options, char **msg)
if (running_tor && !libevent_initialized) {
init_libevent(options);
libevent_initialized = 1;
+
+ /*
+ * Initialize the scheduler - this has to come after
+ * options_init_from_torrc() sets up libevent - why yes, that seems
+ * completely sensible to hide the libevent setup in the option parsing
+ * code! It also needs to happen before init_keys(), so it needs to
+ * happen here too. How yucky. */
+ scheduler_init();
}
/* Adjust the port configuration so we can launch listeners. */
@@ -1071,6 +1087,8 @@ options_act_reversible(const or_options_t *old_options, char **msg)
"non-control network connections. Shutting down all existing "
"connections.");
connection_mark_all_noncontrol_connections();
+ /* We can't complete circuits until the network is re-enabled. */
+ note_that_we_maybe_cant_complete_circuits();
}
}
@@ -1521,6 +1539,12 @@ options_act(const or_options_t *old_options)
return -1;
}
+ /* Set up scheduler thresholds */
+ scheduler_set_watermarks((uint32_t)options->SchedulerLowWaterMark__,
+ (uint32_t)options->SchedulerHighWaterMark__,
+ (options->SchedulerMaxFlushCells__ > 0) ?
+ options->SchedulerMaxFlushCells__ : 1000);
+
/* Set up accounting */
if (accounting_parse_options(options, 0)<0) {
log_warn(LD_CONFIG,"Error in accounting options");
@@ -1668,7 +1692,7 @@ options_act(const or_options_t *old_options)
if (server_mode(options) && !server_mode(old_options)) {
ip_address_changed(0);
- if (can_complete_circuit || !any_predicted_circuits(time(NULL)))
+ if (have_completed_a_circuit() || !any_predicted_circuits(time(NULL)))
inform_testing_reachability();
}
cpuworkers_rotate();
@@ -1691,6 +1715,7 @@ options_act(const or_options_t *old_options)
if (options->CellStatistics || options->DirReqStatistics ||
options->EntryStatistics || options->ExitPortStatistics ||
options->ConnDirectionStatistics ||
+ options->HiddenServiceStatistics ||
options->BridgeAuthoritativeDir) {
time_t now = time(NULL);
int print_notice = 0;
@@ -1699,6 +1724,7 @@ options_act(const or_options_t *old_options)
if (!public_server_mode(options)) {
options->CellStatistics = 0;
options->EntryStatistics = 0;
+ options->HiddenServiceStatistics = 0;
options->ExitPortStatistics = 0;
}
@@ -1744,6 +1770,11 @@ options_act(const or_options_t *old_options)
options->ConnDirectionStatistics) {
rep_hist_conn_stats_init(now);
}
+ if ((!old_options || !old_options->HiddenServiceStatistics) &&
+ options->HiddenServiceStatistics) {
+ log_info(LD_CONFIG, "Configured to measure hidden service statistics.");
+ rep_hist_hs_stats_init(now);
+ }
if ((!old_options || !old_options->BridgeAuthoritativeDir) &&
options->BridgeAuthoritativeDir) {
rep_hist_desc_stats_init(now);
@@ -1755,6 +1786,8 @@ options_act(const or_options_t *old_options)
"data directory in 24 hours from now.");
}
+ /* If we used to have statistics enabled but we just disabled them,
+ stop gathering them. */
if (old_options && old_options->CellStatistics &&
!options->CellStatistics)
rep_hist_buffer_stats_term();
@@ -1764,6 +1797,9 @@ options_act(const or_options_t *old_options)
if (old_options && old_options->EntryStatistics &&
!options->EntryStatistics)
geoip_entry_stats_term();
+ if (old_options && old_options->HiddenServiceStatistics &&
+ !options->HiddenServiceStatistics)
+ rep_hist_hs_stats_term();
if (old_options && old_options->ExitPortStatistics &&
!options->ExitPortStatistics)
rep_hist_exit_stats_term();
@@ -2264,8 +2300,8 @@ resolve_my_address(int warn_severity, const or_options_t *options,
/** Return true iff <b>addr</b> is judged to be on the same network as us, or
* on a private network.
*/
-int
-is_local_addr(const tor_addr_t *addr)
+MOCK_IMPL(int,
+is_local_addr, (const tor_addr_t *addr))
{
if (tor_addr_is_internal(addr, 0))
return 1;
@@ -2574,20 +2610,24 @@ options_validate(or_options_t *old_options, or_options_t *options,
if (!strcasecmp(options->TransProxyType, "default")) {
options->TransProxyType_parsed = TPT_DEFAULT;
} else if (!strcasecmp(options->TransProxyType, "pf-divert")) {
-#ifndef __OpenBSD__
- REJECT("pf-divert is a OpenBSD-specific feature.");
+#if !defined(__OpenBSD__) && !defined( DARWIN )
+ /* Later versions of OS X have pf */
+ REJECT("pf-divert is a OpenBSD-specific "
+ "and OS X/Darwin-specific feature.");
#else
options->TransProxyType_parsed = TPT_PF_DIVERT;
#endif
} else if (!strcasecmp(options->TransProxyType, "tproxy")) {
-#ifndef __linux__
+#if !defined(__linux__)
REJECT("TPROXY is a Linux-specific feature.");
#else
options->TransProxyType_parsed = TPT_TPROXY;
#endif
} else if (!strcasecmp(options->TransProxyType, "ipfw")) {
-#ifndef __FreeBSD__
- REJECT("ipfw is a FreeBSD-specific feature.");
+#if !defined(__FreeBSD__) && !defined( DARWIN )
+ /* Earlier versions of OS X have ipfw */
+ REJECT("ipfw is a FreeBSD-specific"
+ "and OS X/Darwin-specific feature.");
#else
options->TransProxyType_parsed = TPT_IPFW;
#endif
@@ -2618,6 +2658,17 @@ options_validate(or_options_t *old_options, or_options_t *options,
routerset_union(options->ExcludeExitNodesUnion_,options->ExcludeNodes);
}
+ if (options->SchedulerLowWaterMark__ == 0 ||
+ options->SchedulerLowWaterMark__ > UINT32_MAX) {
+ log_warn(LD_GENERAL, "Bad SchedulerLowWaterMark__ option");
+ return -1;
+ } else if (options->SchedulerHighWaterMark__ <=
+ options->SchedulerLowWaterMark__ ||
+ options->SchedulerHighWaterMark__ > UINT32_MAX) {
+ log_warn(LD_GENERAL, "Bad SchedulerHighWaterMark option");
+ return -1;
+ }
+
if (options->NodeFamilies) {
options->NodeFamilySets = smartlist_new();
for (cl = options->NodeFamilies; cl; cl = cl->next) {
diff --git a/src/or/config.h b/src/or/config.h
index 6cc81ab948..133b472eb2 100644
--- a/src/or/config.h
+++ b/src/or/config.h
@@ -33,7 +33,7 @@ void reset_last_resolved_addr(void);
int resolve_my_address(int warn_severity, const or_options_t *options,
uint32_t *addr_out,
const char **method_out, char **hostname_out);
-int is_local_addr(const tor_addr_t *addr);
+MOCK_DECL(int, is_local_addr, (const tor_addr_t *addr));
void options_init(or_options_t *options);
#define OPTIONS_DUMP_MINIMAL 1
diff --git a/src/or/connection.c b/src/or/connection.c
index 16b359d5ed..d6edc4ab91 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -3838,6 +3838,8 @@ connection_handle_write_impl(connection_t *conn, int force)
tor_tls_get_n_raw_bytes(or_conn->tls, &n_read, &n_written);
log_debug(LD_GENERAL, "After TLS write of %d: %ld read, %ld written",
result, (long)n_read, (long)n_written);
+ or_conn->bytes_xmitted += result;
+ or_conn->bytes_xmitted_by_tls += n_written;
/* So we notice bytes were written even on error */
/* XXXX024 This cast is safe since we can never write INT_MAX bytes in a
* single set of TLS operations. But it looks kinda ugly. If we refactor
diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c
index 14b391180e..a90ca00883 100644
--- a/src/or/connection_edge.c
+++ b/src/or/connection_edge.c
@@ -744,8 +744,9 @@ connection_ap_fail_onehop(const char *failed_digest,
/* we don't know the digest; have to compare addr:port */
tor_addr_t addr;
if (!build_state || !build_state->chosen_exit ||
- !entry_conn->socks_request || !entry_conn->socks_request->address)
+ !entry_conn->socks_request) {
continue;
+ }
if (tor_addr_parse(&addr, entry_conn->socks_request->address)<0 ||
!tor_addr_eq(&build_state->chosen_exit->addr, &addr) ||
build_state->chosen_exit->port != entry_conn->socks_request->port)
@@ -2461,7 +2462,7 @@ connection_exit_begin_conn(cell_t *cell, circuit_t *circ)
relay_header_unpack(&rh, cell->payload);
if (rh.length > RELAY_PAYLOAD_SIZE)
- return -1;
+ return -END_CIRC_REASON_TORPROTOCOL;
/* Note: we have to use relay_send_command_from_edge here, not
* connection_edge_end or connection_edge_send_command, since those require
@@ -2479,7 +2480,7 @@ connection_exit_begin_conn(cell_t *cell, circuit_t *circ)
r = begin_cell_parse(cell, &bcell, &end_reason);
if (r < -1) {
- return -1;
+ return -END_CIRC_REASON_TORPROTOCOL;
} else if (r == -1) {
tor_free(bcell.address);
relay_send_end_cell_from_edge(rh.stream_id, circ, end_reason, NULL);
diff --git a/src/or/connection_or.c b/src/or/connection_or.c
index e26e0bcf2d..2232a1b565 100644
--- a/src/or/connection_or.c
+++ b/src/or/connection_or.c
@@ -38,6 +38,8 @@
#include "router.h"
#include "routerlist.h"
#include "ext_orport.h"
+#include "scheduler.h"
+
#ifdef USE_BUFFEREVENTS
#include <event2/bufferevent_ssl.h>
#endif
@@ -574,48 +576,51 @@ connection_or_process_inbuf(or_connection_t *conn)
return ret;
}
-/** When adding cells to an OR connection's outbuf, keep adding until the
- * outbuf is at least this long, or we run out of cells. */
-#define OR_CONN_HIGHWATER (32*1024)
-
-/** Add cells to an OR connection's outbuf whenever the outbuf's data length
- * drops below this size. */
-#define OR_CONN_LOWWATER (16*1024)
-
/** Called whenever we have flushed some data on an or_conn: add more data
* from active circuits. */
int
connection_or_flushed_some(or_connection_t *conn)
{
- size_t datalen, temp;
- ssize_t n, flushed;
- size_t cell_network_size = get_cell_network_size(conn->wide_circ_ids);
+ size_t datalen;
+
+ /* The channel will want to update its estimated queue size */
+ channel_update_xmit_queue_size(TLS_CHAN_TO_BASE(conn->chan));
/* If we're under the low water mark, add cells until we're just over the
* high water mark. */
datalen = connection_get_outbuf_len(TO_CONN(conn));
if (datalen < OR_CONN_LOWWATER) {
- while ((conn->chan) && channel_tls_more_to_flush(conn->chan)) {
- /* Compute how many more cells we want at most */
- n = CEIL_DIV(OR_CONN_HIGHWATER - datalen, cell_network_size);
- /* Bail out if we don't want any more */
- if (n <= 0) break;
- /* We're still here; try to flush some more cells */
- flushed = channel_tls_flush_some_cells(conn->chan, n);
- /* Bail out if it says it didn't flush anything */
- if (flushed <= 0) break;
- /* How much in the outbuf now? */
- temp = connection_get_outbuf_len(TO_CONN(conn));
- /* Bail out if we didn't actually increase the outbuf size */
- if (temp <= datalen) break;
- /* Update datalen for the next iteration */
- datalen = temp;
- }
+ /* Let the scheduler know */
+ scheduler_channel_wants_writes(TLS_CHAN_TO_BASE(conn->chan));
}
return 0;
}
+/** This is for channeltls.c to ask how many cells we could accept if
+ * they were available. */
+ssize_t
+connection_or_num_cells_writeable(or_connection_t *conn)
+{
+ size_t datalen, cell_network_size;
+ ssize_t n = 0;
+
+ tor_assert(conn);
+
+ /*
+ * If we're under the high water mark, we're potentially
+ * writeable; note this is different from the calculation above
+ * used to trigger when to start writing after we've stopped.
+ */
+ datalen = connection_get_outbuf_len(TO_CONN(conn));
+ if (datalen < OR_CONN_HIGHWATER) {
+ cell_network_size = get_cell_network_size(conn->wide_circ_ids);
+ n = CEIL_DIV(OR_CONN_HIGHWATER - datalen, cell_network_size);
+ }
+
+ return n;
+}
+
/** Connection <b>conn</b> has finished writing and has no bytes left on
* its outbuf.
*
@@ -1167,10 +1172,10 @@ connection_or_notify_error(or_connection_t *conn,
*
* Return the launched conn, or NULL if it failed.
*/
-or_connection_t *
-connection_or_connect(const tor_addr_t *_addr, uint16_t port,
- const char *id_digest,
- channel_tls_t *chan)
+
+MOCK_IMPL(or_connection_t *,
+connection_or_connect, (const tor_addr_t *_addr, uint16_t port,
+ const char *id_digest, channel_tls_t *chan))
{
or_connection_t *conn;
const or_options_t *options = get_options();
diff --git a/src/or/connection_or.h b/src/or/connection_or.h
index 1ceabdaa4b..b82896e26d 100644
--- a/src/or/connection_or.h
+++ b/src/or/connection_or.h
@@ -24,6 +24,7 @@ void connection_or_set_bad_connections(const char *digest, int force);
void connection_or_block_renegotiation(or_connection_t *conn);
int connection_or_reached_eof(or_connection_t *conn);
int connection_or_process_inbuf(or_connection_t *conn);
+ssize_t connection_or_num_cells_writeable(or_connection_t *conn);
int connection_or_flushed_some(or_connection_t *conn);
int connection_or_finished_flushing(or_connection_t *conn);
int connection_or_finished_connecting(or_connection_t *conn);
@@ -36,9 +37,10 @@ void connection_or_connect_failed(or_connection_t *conn,
int reason, const char *msg);
void connection_or_notify_error(or_connection_t *conn,
int reason, const char *msg);
-or_connection_t *connection_or_connect(const tor_addr_t *addr, uint16_t port,
- const char *id_digest,
- channel_tls_t *chan);
+MOCK_DECL(or_connection_t *,
+ connection_or_connect,
+ (const tor_addr_t *addr, uint16_t port,
+ const char *id_digest, channel_tls_t *chan));
void connection_or_close_normally(or_connection_t *orconn, int flush);
void connection_or_close_for_error(or_connection_t *orconn, int flush);
diff --git a/src/or/control.c b/src/or/control.c
index e3f913177b..dc67588d6a 100644
--- a/src/or/control.c
+++ b/src/or/control.c
@@ -1263,6 +1263,7 @@ static const struct signal_t signal_table[] = {
{ SIGTERM, "INT" },
{ SIGNEWNYM, "NEWNYM" },
{ SIGCLEARDNSCACHE, "CLEARDNSCACHE"},
+ { SIGHEARTBEAT, "HEARTBEAT"},
{ 0, NULL },
};
@@ -2015,7 +2016,7 @@ getinfo_helper_events(control_connection_t *control_conn,
/* Note that status/ is not a catch-all for events; there's only supposed
* to be a status GETINFO if there's a corresponding STATUS event. */
if (!strcmp(question, "status/circuit-established")) {
- *answer = tor_strdup(can_complete_circuit ? "1" : "0");
+ *answer = tor_strdup(have_completed_a_circuit() ? "1" : "0");
} else if (!strcmp(question, "status/enough-dir-info")) {
*answer = tor_strdup(router_have_minimum_dir_info() ? "1" : "0");
} else if (!strcmp(question, "status/good-server-descriptor") ||
@@ -4454,6 +4455,9 @@ control_event_signal(uintptr_t signal)
case SIGCLEARDNSCACHE:
signal_string = "CLEARDNSCACHE";
break;
+ case SIGHEARTBEAT:
+ signal_string = "HEARTBEAT";
+ break;
default:
log_warn(LD_BUG, "Unrecognized signal %lu in control_event_signal",
(unsigned long)signal);
@@ -5096,20 +5100,30 @@ control_event_hs_descriptor_requested(const rend_data_t *rend_query,
void
control_event_hs_descriptor_receive_end(const char *action,
const rend_data_t *rend_query,
- const char *id_digest)
+ const char *id_digest,
+ const char *reason)
{
+ char *reason_field = NULL;
+
if (!action || !rend_query || !id_digest) {
log_warn(LD_BUG, "Called with action==%p, rend_query==%p, "
"id_digest==%p", action, rend_query, id_digest);
return;
}
+ if (reason) {
+ tor_asprintf(&reason_field, " REASON=%s", reason);
+ }
+
send_control_event(EVENT_HS_DESC, ALL_FORMATS,
- "650 HS_DESC %s %s %s %s\r\n",
+ "650 HS_DESC %s %s %s %s%s\r\n",
action,
rend_query->onion_address,
rend_auth_type_to_string(rend_query->auth_type),
- node_describe_longname_by_id(id_digest));
+ node_describe_longname_by_id(id_digest),
+ reason_field ? reason_field : "");
+
+ tor_free(reason_field);
}
/** send HS_DESC RECEIVED event
@@ -5125,23 +5139,27 @@ control_event_hs_descriptor_received(const rend_data_t *rend_query,
rend_query, id_digest);
return;
}
- control_event_hs_descriptor_receive_end("RECEIVED", rend_query, id_digest);
+ control_event_hs_descriptor_receive_end("RECEIVED", rend_query,
+ id_digest, NULL);
}
-/** send HS_DESC FAILED event
- *
- * called when request for hidden service descriptor returned failure.
+/** Send HS_DESC event to inform controller that query <b>rend_query</b>
+ * failed to retrieve hidden service descriptor identified by
+ * <b>id_digest</b>. If <b>reason</b> is not NULL, add it to REASON=
+ * field.
*/
void
control_event_hs_descriptor_failed(const rend_data_t *rend_query,
- const char *id_digest)
+ const char *id_digest,
+ const char *reason)
{
if (!rend_query || !id_digest) {
log_warn(LD_BUG, "Called with rend_query==%p, id_digest==%p",
rend_query, id_digest);
return;
}
- control_event_hs_descriptor_receive_end("FAILED", rend_query, id_digest);
+ control_event_hs_descriptor_receive_end("FAILED", rend_query,
+ id_digest, reason);
}
/** Free any leftover allocated memory of the control.c subsystem. */
diff --git a/src/or/control.h b/src/or/control.h
index 0c92d5503d..f62084b931 100644
--- a/src/or/control.h
+++ b/src/or/control.h
@@ -108,11 +108,13 @@ void control_event_hs_descriptor_requested(const rend_data_t *rend_query,
const char *hs_dir);
void control_event_hs_descriptor_receive_end(const char *action,
const rend_data_t *rend_query,
- const char *hs_dir);
+ const char *hs_dir,
+ const char *reason);
void control_event_hs_descriptor_received(const rend_data_t *rend_query,
const char *hs_dir);
void control_event_hs_descriptor_failed(const rend_data_t *rend_query,
- const char *hs_dir);
+ const char *hs_dir,
+ const char *reason);
void control_free_all(void);
diff --git a/src/or/directory.c b/src/or/directory.c
index df9e7f8ad3..cca4b54e24 100644
--- a/src/or/directory.c
+++ b/src/or/directory.c
@@ -2073,9 +2073,10 @@ connection_dir_client_reached_eof(dir_connection_t *conn)
}
if (conn->base_.purpose == DIR_PURPOSE_FETCH_RENDDESC_V2) {
- #define SEND_HS_DESC_FAILED_EVENT() ( \
+ #define SEND_HS_DESC_FAILED_EVENT(reason) ( \
control_event_hs_descriptor_failed(conn->rend_data, \
- conn->identity_digest) )
+ conn->identity_digest, \
+ reason) )
tor_assert(conn->rend_data);
log_info(LD_REND,"Received rendezvous descriptor (size %d, status %d "
"(%s))",
@@ -2090,7 +2091,7 @@ connection_dir_client_reached_eof(dir_connection_t *conn)
"Retrying at another directory.");
/* We'll retry when connection_about_to_close_connection()
* cleans this dir conn up. */
- SEND_HS_DESC_FAILED_EVENT();
+ SEND_HS_DESC_FAILED_EVENT("BAD_DESC");
break;
case RCS_OKAY:
default:
@@ -2109,14 +2110,14 @@ connection_dir_client_reached_eof(dir_connection_t *conn)
* connection_about_to_close_connection() cleans this conn up. */
log_info(LD_REND,"Fetching v2 rendezvous descriptor failed: "
"Retrying at another directory.");
- SEND_HS_DESC_FAILED_EVENT();
+ SEND_HS_DESC_FAILED_EVENT("NOT_FOUND");
break;
case 400:
log_warn(LD_REND, "Fetching v2 rendezvous descriptor failed: "
"http status 400 (%s). Dirserver didn't like our "
"v2 rendezvous query? Retrying at another directory.",
escaped(reason));
- SEND_HS_DESC_FAILED_EVENT();
+ SEND_HS_DESC_FAILED_EVENT("QUERY_REJECTED");
break;
default:
log_warn(LD_REND, "Fetching v2 rendezvous descriptor failed: "
@@ -2125,7 +2126,7 @@ connection_dir_client_reached_eof(dir_connection_t *conn)
"Retrying at another directory.",
status_code, escaped(reason), conn->base_.address,
conn->base_.port);
- SEND_HS_DESC_FAILED_EVENT();
+ SEND_HS_DESC_FAILED_EVENT("UNEXPECTED");
break;
}
}
@@ -2209,8 +2210,10 @@ connection_dir_process_inbuf(dir_connection_t *conn)
}
if (connection_get_inbuf_len(TO_CONN(conn)) > MAX_DIRECTORY_OBJECT_SIZE) {
- log_warn(LD_HTTP, "Too much data received from directory connection: "
- "denial of service attempt, or you need to upgrade?");
+ log_warn(LD_HTTP,
+ "Too much data received from directory connection (%s): "
+ "denial of service attempt, or you need to upgrade?",
+ conn->base_.address);
connection_mark_for_close(TO_CONN(conn));
return -1;
}
diff --git a/src/or/include.am b/src/or/include.am
index 0f53f007f0..643f7ce001 100644
--- a/src/or/include.am
+++ b/src/or/include.am
@@ -74,6 +74,7 @@ LIBTOR_A_SOURCES = \
src/or/routerlist.c \
src/or/routerparse.c \
src/or/routerset.c \
+ src/or/scheduler.c \
src/or/statefile.c \
src/or/status.c \
src/or/onion_ntor.c \
@@ -179,6 +180,7 @@ ORHEADERS = \
src/or/routerlist.h \
src/or/routerset.h \
src/or/routerparse.h \
+ src/or/scheduler.h \
src/or/statefile.h \
src/or/status.h
diff --git a/src/or/main.c b/src/or/main.c
index 5a4e0a3e2d..160bfa00e0 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -53,6 +53,7 @@
#include "router.h"
#include "routerlist.h"
#include "routerparse.h"
+#include "scheduler.h"
#include "statefile.h"
#include "status.h"
#include "util_process.h"
@@ -150,7 +151,7 @@ static int called_loop_once = 0;
* any longer (a big time jump happened, when we notice our directory is
* heinously out-of-date, etc.
*/
-int can_complete_circuit=0;
+static int can_complete_circuits = 0;
/** How often do we check for router descriptors that we should download
* when we have too little directory info? */
@@ -171,11 +172,11 @@ int quiet_level = 0;
/********* END VARIABLES ************/
/****************************************************************************
-*
-* This section contains accessors and other methods on the connection_array
-* variables (which are global within this file and unavailable outside it).
-*
-****************************************************************************/
+ *
+ * This section contains accessors and other methods on the connection_array
+ * variables (which are global within this file and unavailable outside it).
+ *
+ ****************************************************************************/
#if 0 && defined(USE_BUFFEREVENTS)
static void
@@ -223,6 +224,31 @@ set_buffer_lengths_to_zero(tor_socket_t s)
}
#endif
+/** Return 1 if we have successfully built a circuit, and nothing has changed
+ * to make us think that maybe we can't.
+ */
+int
+have_completed_a_circuit(void)
+{
+ return can_complete_circuits;
+}
+
+/** Note that we have successfully built a circuit, so that reachability
+ * testing and introduction points and so on may be attempted. */
+void
+note_that_we_completed_a_circuit(void)
+{
+ can_complete_circuits = 1;
+}
+
+/** Note that something has happened (like a clock jump, or DisableNetwork) to
+ * make us think that maybe we can't complete circuits. */
+void
+note_that_we_maybe_cant_complete_circuits(void)
+{
+ can_complete_circuits = 0;
+}
+
/** Add <b>conn</b> to the array of connections that we can poll on. The
* connection's socket must be set; the connection starts out
* non-reading and non-writing.
@@ -999,7 +1025,7 @@ directory_info_has_arrived(time_t now, int from_cache)
}
if (server_mode(options) && !net_is_disabled() && !from_cache &&
- (can_complete_circuit || !any_predicted_circuits(now)))
+ (have_completed_a_circuit() || !any_predicted_circuits(now)))
consider_testing_reachability(1, 1);
}
@@ -1358,6 +1384,11 @@ run_scheduled_events(time_t now)
if (next_write && next_write < next_time_to_write_stats_files)
next_time_to_write_stats_files = next_write;
}
+ if (options->HiddenServiceStatistics) {
+ time_t next_write = rep_hist_hs_stats_write(time_to_write_stats_files);
+ if (next_write && next_write < next_time_to_write_stats_files)
+ next_time_to_write_stats_files = next_write;
+ }
if (options->ExitPortStatistics) {
time_t next_write = rep_hist_exit_stats_write(time_to_write_stats_files);
if (next_write && next_write < next_time_to_write_stats_files)
@@ -1436,7 +1467,7 @@ run_scheduled_events(time_t now)
/* also, check religiously for reachability, if it's within the first
* 20 minutes of our uptime. */
if (is_server &&
- (can_complete_circuit || !any_predicted_circuits(now)) &&
+ (have_completed_a_circuit() || !any_predicted_circuits(now)) &&
!we_are_hibernating()) {
if (stats_n_seconds_working < TIMEOUT_UNTIL_UNREACHABILITY_COMPLAINT) {
consider_testing_reachability(1, dirport_reachability_count==0);
@@ -1549,7 +1580,7 @@ run_scheduled_events(time_t now)
circuit_close_all_marked();
/* 7. And upload service descriptors if necessary. */
- if (can_complete_circuit && !net_is_disabled()) {
+ if (have_completed_a_circuit() && !net_is_disabled()) {
rend_consider_services_upload(now);
rend_consider_descriptor_republication();
}
@@ -1680,7 +1711,7 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
if (server_mode(options) &&
!net_is_disabled() &&
seconds_elapsed > 0 &&
- can_complete_circuit &&
+ have_completed_a_circuit() &&
stats_n_seconds_working / TIMEOUT_UNTIL_UNREACHABILITY_COMPLAINT !=
(stats_n_seconds_working+seconds_elapsed) /
TIMEOUT_UNTIL_UNREACHABILITY_COMPLAINT) {
@@ -2137,6 +2168,10 @@ process_signal(uintptr_t sig)
addressmap_clear_transient();
control_event_signal(sig);
break;
+ case SIGHEARTBEAT:
+ log_heartbeat(time(NULL));
+ control_event_signal(sig);
+ break;
}
}
@@ -2553,6 +2588,7 @@ tor_free_all(int postfork)
channel_tls_free_all();
channel_free_all();
connection_free_all();
+ scheduler_free_all();
buf_shrink_freelists(1);
memarea_clear_freelist();
nodelist_free_all();
diff --git a/src/or/main.h b/src/or/main.h
index e918517b82..7d98983100 100644
--- a/src/or/main.h
+++ b/src/or/main.h
@@ -12,7 +12,9 @@
#ifndef TOR_MAIN_H
#define TOR_MAIN_H
-extern int can_complete_circuit;
+int have_completed_a_circuit(void);
+void note_that_we_completed_a_circuit(void);
+void note_that_we_maybe_cant_complete_circuits(void);
int connection_add_impl(connection_t *conn, int is_connecting);
#define connection_add(conn) connection_add_impl((conn), 0)
diff --git a/src/or/nodelist.c b/src/or/nodelist.c
index 53abc820f5..e0e01ec190 100644
--- a/src/or/nodelist.c
+++ b/src/or/nodelist.c
@@ -1562,7 +1562,7 @@ update_router_have_minimum_dir_info(void)
* is back up and usable, and b) disable some activities that Tor
* should only do while circuits are working, like reachability tests
* and fetching bridge descriptors only over circuits. */
- can_complete_circuit = 0;
+ note_that_we_maybe_cant_complete_circuits();
control_event_client_status(LOG_NOTICE, "NOT_ENOUGH_DIR_INFO");
}
diff --git a/src/or/or.h b/src/or/or.h
index 5ebe7bfac3..ee86697fd8 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -119,6 +119,7 @@
* conflict with system-defined signals. */
#define SIGNEWNYM 129
#define SIGCLEARDNSCACHE 130
+#define SIGHEARTBEAT 131
#if (SIZEOF_CELL_T != 0)
/* On Irix, stdlib.h defines a cell_t type, so we need to make sure
@@ -1430,6 +1431,18 @@ typedef struct or_handshake_state_t {
/** Length of Extended ORPort connection identifier. */
#define EXT_OR_CONN_ID_LEN DIGEST_LEN /* 20 */
+/*
+ * OR_CONN_HIGHWATER and OR_CONN_LOWWATER moved from connection_or.c so
+ * channeltls.c can see them too.
+ */
+
+/** When adding cells to an OR connection's outbuf, keep adding until the
+ * outbuf is at least this long, or we run out of cells. */
+#define OR_CONN_HIGHWATER (32*1024)
+
+/** Add cells to an OR connection's outbuf whenever the outbuf's data length
+ * drops below this size. */
+#define OR_CONN_LOWWATER (16*1024)
/** Subtype of connection_t for an "OR connection" -- that is, one that speaks
* cells over TLS. */
@@ -1521,6 +1534,12 @@ typedef struct or_connection_t {
/** Last emptied write token bucket in msec since midnight; only used if
* TB_EMPTY events are enabled. */
uint32_t write_emptied_time;
+
+ /*
+ * Count the number of bytes flushed out on this orconn, and the number of
+ * bytes TLS actually sent - used for overhead estimation for scheduling.
+ */
+ uint64_t bytes_xmitted, bytes_xmitted_by_tls;
} or_connection_t;
/** Subtype of connection_t for an "edge connection" -- that is, an entry (ap)
@@ -3185,6 +3204,10 @@ typedef struct or_circuit_t {
/** True iff this circuit was made with a CREATE_FAST cell. */
unsigned int is_first_hop : 1;
+ /** If set, this circuit carries HS traffic. Consider it in any HS
+ * statistics. */
+ unsigned int circuit_carries_hs_traffic_stats : 1;
+
/** Number of cells that were removed from circuit queue; reset every
* time when writing buffer stats to disk. */
uint32_t processed_cells;
@@ -3942,6 +3965,10 @@ typedef struct {
/** If true, the user wants us to collect statistics as entry node. */
int EntryStatistics;
+ /** If true, the user wants us to collect statistics as hidden service
+ * directory, introduction point, or rendezvous point. */
+ int HiddenServiceStatistics;
+
/** If true, include statistics file contents in extra-info documents. */
int ExtraInfoStatistics;
@@ -4229,9 +4256,18 @@ typedef struct {
/** How long (seconds) do we keep a guard before picking a new one? */
int GuardLifetime;
- /** Should we send the timestamps that pre-023 hidden services want? */
- int Support022HiddenServices;
-
+ /** Low-water mark for global scheduler - start sending when estimated
+ * queued size falls below this threshold.
+ */
+ uint64_t SchedulerLowWaterMark__;
+ /** High-water mark for global scheduler - stop sending when estimated
+ * queued size exceeds this threshold.
+ */
+ uint64_t SchedulerHighWaterMark__;
+ /** Flush size for global scheduler - flush this many cells at a time
+ * when sending.
+ */
+ int SchedulerMaxFlushCells__;
} or_options_t;
/** Persistent state for an onion router, as saved to disk. */
@@ -5027,7 +5063,7 @@ typedef enum was_router_added_t {
ROUTER_WAS_NOT_WANTED = -6,
/* Router descriptor was rejected because it was older than
* OLD_ROUTER_DESC_MAX_AGE. */
- ROUTER_WAS_TOO_OLD = -7,
+ ROUTER_WAS_TOO_OLD = -7, /* note contrast with 'NOT_NEW' */
} was_router_added_t;
/********************************* routerparse.c ************************/
diff --git a/src/or/relay.c b/src/or/relay.c
index 05c7b3c955..b95e5841e7 100644
--- a/src/or/relay.c
+++ b/src/or/relay.c
@@ -39,6 +39,7 @@
#include "router.h"
#include "routerlist.h"
#include "routerparse.h"
+#include "scheduler.h"
static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
cell_direction_t cell_direction,
@@ -2591,8 +2592,8 @@ packed_cell_get_circid(const packed_cell_t *cell, int wide_circ_ids)
* queue of the first active circuit on <b>chan</b>, and write them to
* <b>chan</b>-&gt;outbuf. Return the number of cells written. Advance
* the active circuit pointer to the next active circuit in the ring. */
-int
-channel_flush_from_first_active_circuit(channel_t *chan, int max)
+MOCK_IMPL(int,
+channel_flush_from_first_active_circuit, (channel_t *chan, int max))
{
circuitmux_t *cmux = NULL;
int n_flushed = 0;
@@ -2868,14 +2869,8 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
log_debug(LD_GENERAL, "Made a circuit active.");
}
- if (!channel_has_queued_writes(chan)) {
- /* There is no data at all waiting to be sent on the outbuf. Add a
- * cell, so that we can notice when it gets flushed, flushed_some can
- * get called, and we can start putting more data onto the buffer then.
- */
- log_debug(LD_GENERAL, "Primed a buffer.");
- channel_flush_from_first_active_circuit(chan, 1);
- }
+ /* New way: mark this as having waiting cells for the scheduler */
+ scheduler_channel_has_waiting_cells(chan);
}
/** Append an encoded value of <b>addr</b> to <b>payload_out</b>, which must
diff --git a/src/or/relay.h b/src/or/relay.h
index 73c399154d..351516aada 100644
--- a/src/or/relay.h
+++ b/src/or/relay.h
@@ -64,7 +64,8 @@ void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
cell_t *cell, cell_direction_t direction,
streamid_t fromstream);
void channel_unlink_all_circuits(channel_t *chan, smartlist_t *detached_out);
-int channel_flush_from_first_active_circuit(channel_t *chan, int max);
+MOCK_DECL(int, channel_flush_from_first_active_circuit,
+ (channel_t *chan, int max));
void assert_circuit_mux_okay(channel_t *chan);
void update_circuit_on_cmux_(circuit_t *circ, cell_direction_t direction,
const char *file, int lineno);
diff --git a/src/or/rendclient.c b/src/or/rendclient.c
index 5e5a09e41f..f351ae7161 100644
--- a/src/or/rendclient.c
+++ b/src/or/rendclient.c
@@ -130,16 +130,6 @@ rend_client_reextend_intro_circuit(origin_circuit_t *circ)
return result;
}
-/** Return true iff we should send timestamps in our INTRODUCE1 cells */
-static int
-rend_client_should_send_timestamp(void)
-{
- if (get_options()->Support022HiddenServices >= 0)
- return get_options()->Support022HiddenServices;
-
- return networkstatus_get_param(NULL, "Support022HiddenServices", 1, 0, 1);
-}
-
/** Called when we're trying to connect an ap conn; sends an INTRODUCE1 cell
* down introcirc if possible.
*/
@@ -251,14 +241,8 @@ rend_client_send_introduction(origin_circuit_t *introcirc,
REND_DESC_COOKIE_LEN);
v3_shift += 2+REND_DESC_COOKIE_LEN;
}
- if (rend_client_should_send_timestamp()) {
- uint32_t now = (uint32_t)time(NULL);
- now += 300;
- now -= now % 600;
- set_uint32(tmp+v3_shift+1, htonl(now));
- } else {
- set_uint32(tmp+v3_shift+1, 0);
- }
+ /* Once this held a timestamp. */
+ set_uint32(tmp+v3_shift+1, 0);
v3_shift += 4;
} /* if version 2 only write version number */
else if (entry->parsed->protocols & (1<<2)) {
diff --git a/src/or/rendcommon.c b/src/or/rendcommon.c
index df74b745a2..e779ecfe90 100644
--- a/src/or/rendcommon.c
+++ b/src/or/rendcommon.c
@@ -924,6 +924,7 @@ rend_cache_lookup_v2_desc_as_dir(const char *desc_id, const char **desc)
rend_cache_store_status_t
rend_cache_store_v2_desc_as_dir(const char *desc)
{
+ const or_options_t *options = get_options();
rend_service_descriptor_t *parsed;
char desc_id[DIGEST_LEN];
char *intro_content;
@@ -1003,6 +1004,12 @@ rend_cache_store_v2_desc_as_dir(const char *desc)
log_info(LD_REND, "Successfully stored service descriptor with desc ID "
"'%s' and len %d.",
safe_str(desc_id_base32), (int)encoded_size);
+
+ /* Statistics: Note down this potentially new HS. */
+ if (options->HiddenServiceStatistics) {
+ rep_hist_stored_maybe_new_hs(e->parsed->pk);
+ }
+
number_stored++;
goto advance;
skip:
diff --git a/src/or/rendmid.c b/src/or/rendmid.c
index 6a701e7a77..1c56471b8c 100644
--- a/src/or/rendmid.c
+++ b/src/or/rendmid.c
@@ -281,6 +281,7 @@ int
rend_mid_rendezvous(or_circuit_t *circ, const uint8_t *request,
size_t request_len)
{
+ const or_options_t *options = get_options();
or_circuit_t *rend_circ;
char hexid[9];
int reason = END_CIRC_REASON_INTERNAL;
@@ -316,6 +317,12 @@ rend_mid_rendezvous(or_circuit_t *circ, const uint8_t *request,
goto err;
}
+ /* Statistics: Mark this circuit as an RP circuit so that we collect
+ stats from it. */
+ if (options->HiddenServiceStatistics) {
+ circ->circuit_carries_hs_traffic_stats = 1;
+ }
+
/* Send the RENDEZVOUS2 cell to Alice. */
if (relay_send_command_from_edge(0, TO_CIRCUIT(rend_circ),
RELAY_COMMAND_RENDEZVOUS2,
diff --git a/src/or/rendservice.c b/src/or/rendservice.c
index 3fed540e84..196145e210 100644
--- a/src/or/rendservice.c
+++ b/src/or/rendservice.c
@@ -16,6 +16,7 @@
#include "circuituse.h"
#include "config.h"
#include "directory.h"
+#include "main.h"
#include "networkstatus.h"
#include "nodelist.h"
#include "rendclient.h"
@@ -372,101 +373,101 @@ rend_config_services(const or_options_t *options, int validate_only)
if (!strcasecmp(line->key, "HiddenServiceDir")) {
if (service) { /* register the one we just finished parsing */
if (validate_only)
- rend_service_free(service);
- else
- rend_add_service(service);
- }
- service = tor_malloc_zero(sizeof(rend_service_t));
- service->directory = tor_strdup(line->value);
- service->ports = smartlist_new();
- service->intro_period_started = time(NULL);
- service->n_intro_points_wanted = NUM_INTRO_POINTS_DEFAULT;
- continue;
- }
- if (!service) {
- log_warn(LD_CONFIG, "%s with no preceding HiddenServiceDir directive",
- line->key);
- rend_service_free(service);
- return -1;
- }
- if (!strcasecmp(line->key, "HiddenServicePort")) {
- portcfg = parse_port_config(line->value);
- if (!portcfg) {
- rend_service_free(service);
- return -1;
- }
- smartlist_add(service->ports, portcfg);
- } else if (!strcasecmp(line->key,
- "HiddenServiceDirGroupReadable")) {
- service->dir_group_readable = (int)tor_parse_long(line->value,
- 10, 0, 1, &ok, NULL);
- if (!ok) {
- log_warn(LD_CONFIG,
- "HiddenServiceDirGroupReadable should be 0 or 1, not %s",
- line->value);
- rend_service_free(service);
- return -1;
- }
- log_info(LD_CONFIG,
- "HiddenServiceDirGroupReadable=%d for %s",
- service->dir_group_readable, service->directory);
- } else if (!strcasecmp(line->key, "HiddenServiceAuthorizeClient")) {
- /* Parse auth type and comma-separated list of client names and add a
- * rend_authorized_client_t for each client to the service's list
- * of authorized clients. */
- smartlist_t *type_names_split, *clients;
- const char *authname;
- int num_clients;
- if (service->auth_type != REND_NO_AUTH) {
- log_warn(LD_CONFIG, "Got multiple HiddenServiceAuthorizeClient "
- "lines for a single service.");
- rend_service_free(service);
- return -1;
- }
- type_names_split = smartlist_new();
- smartlist_split_string(type_names_split, line->value, " ", 0, 2);
- if (smartlist_len(type_names_split) < 1) {
- log_warn(LD_BUG, "HiddenServiceAuthorizeClient has no value. This "
- "should have been prevented when parsing the "
- "configuration.");
- smartlist_free(type_names_split);
- rend_service_free(service);
- return -1;
- }
- authname = smartlist_get(type_names_split, 0);
- if (!strcasecmp(authname, "basic")) {
- service->auth_type = REND_BASIC_AUTH;
- } else if (!strcasecmp(authname, "stealth")) {
- service->auth_type = REND_STEALTH_AUTH;
- } else {
- log_warn(LD_CONFIG, "HiddenServiceAuthorizeClient contains "
- "unrecognized auth-type '%s'. Only 'basic' or 'stealth' "
- "are recognized.",
- (char *) smartlist_get(type_names_split, 0));
- SMARTLIST_FOREACH(type_names_split, char *, cp, tor_free(cp));
- smartlist_free(type_names_split);
- rend_service_free(service);
- return -1;
- }
- service->clients = smartlist_new();
- if (smartlist_len(type_names_split) < 2) {
- log_warn(LD_CONFIG, "HiddenServiceAuthorizeClient contains "
- "auth-type '%s', but no client names.",
- service->auth_type == REND_BASIC_AUTH ? "basic" : "stealth");
- SMARTLIST_FOREACH(type_names_split, char *, cp, tor_free(cp));
- smartlist_free(type_names_split);
- continue;
- }
- clients = smartlist_new();
- smartlist_split_string(clients, smartlist_get(type_names_split, 1),
- ",", SPLIT_SKIP_SPACE, 0);
- SMARTLIST_FOREACH(type_names_split, char *, cp, tor_free(cp));
- smartlist_free(type_names_split);
- /* Remove duplicate client names. */
- num_clients = smartlist_len(clients);
- smartlist_sort_strings(clients);
- smartlist_uniq_strings(clients);
- if (smartlist_len(clients) < num_clients) {
+ rend_service_free(service);
+ else
+ rend_add_service(service);
+ }
+ service = tor_malloc_zero(sizeof(rend_service_t));
+ service->directory = tor_strdup(line->value);
+ service->ports = smartlist_new();
+ service->intro_period_started = time(NULL);
+ service->n_intro_points_wanted = NUM_INTRO_POINTS_DEFAULT;
+ continue;
+ }
+ if (!service) {
+ log_warn(LD_CONFIG, "%s with no preceding HiddenServiceDir directive",
+ line->key);
+ rend_service_free(service);
+ return -1;
+ }
+ if (!strcasecmp(line->key, "HiddenServicePort")) {
+ portcfg = parse_port_config(line->value);
+ if (!portcfg) {
+ rend_service_free(service);
+ return -1;
+ }
+ smartlist_add(service->ports, portcfg);
+ } else if (!strcasecmp(line->key,
+ "HiddenServiceDirGroupReadable")) {
+ service->dir_group_readable = (int)tor_parse_long(line->value,
+ 10, 0, 1, &ok, NULL);
+ if (!ok) {
+ log_warn(LD_CONFIG,
+ "HiddenServiceDirGroupReadable should be 0 or 1, not %s",
+ line->value);
+ rend_service_free(service);
+ return -1;
+ }
+ log_info(LD_CONFIG,
+ "HiddenServiceDirGroupReadable=%d for %s",
+ service->dir_group_readable, service->directory);
+ } else if (!strcasecmp(line->key, "HiddenServiceAuthorizeClient")) {
+ /* Parse auth type and comma-separated list of client names and add a
+ * rend_authorized_client_t for each client to the service's list
+ * of authorized clients. */
+ smartlist_t *type_names_split, *clients;
+ const char *authname;
+ int num_clients;
+ if (service->auth_type != REND_NO_AUTH) {
+ log_warn(LD_CONFIG, "Got multiple HiddenServiceAuthorizeClient "
+ "lines for a single service.");
+ rend_service_free(service);
+ return -1;
+ }
+ type_names_split = smartlist_new();
+ smartlist_split_string(type_names_split, line->value, " ", 0, 2);
+ if (smartlist_len(type_names_split) < 1) {
+ log_warn(LD_BUG, "HiddenServiceAuthorizeClient has no value. This "
+ "should have been prevented when parsing the "
+ "configuration.");
+ smartlist_free(type_names_split);
+ rend_service_free(service);
+ return -1;
+ }
+ authname = smartlist_get(type_names_split, 0);
+ if (!strcasecmp(authname, "basic")) {
+ service->auth_type = REND_BASIC_AUTH;
+ } else if (!strcasecmp(authname, "stealth")) {
+ service->auth_type = REND_STEALTH_AUTH;
+ } else {
+ log_warn(LD_CONFIG, "HiddenServiceAuthorizeClient contains "
+ "unrecognized auth-type '%s'. Only 'basic' or 'stealth' "
+ "are recognized.",
+ (char *) smartlist_get(type_names_split, 0));
+ SMARTLIST_FOREACH(type_names_split, char *, cp, tor_free(cp));
+ smartlist_free(type_names_split);
+ rend_service_free(service);
+ return -1;
+ }
+ service->clients = smartlist_new();
+ if (smartlist_len(type_names_split) < 2) {
+ log_warn(LD_CONFIG, "HiddenServiceAuthorizeClient contains "
+ "auth-type '%s', but no client names.",
+ service->auth_type == REND_BASIC_AUTH ? "basic" : "stealth");
+ SMARTLIST_FOREACH(type_names_split, char *, cp, tor_free(cp));
+ smartlist_free(type_names_split);
+ continue;
+ }
+ clients = smartlist_new();
+ smartlist_split_string(clients, smartlist_get(type_names_split, 1),
+ ",", SPLIT_SKIP_SPACE, 0);
+ SMARTLIST_FOREACH(type_names_split, char *, cp, tor_free(cp));
+ smartlist_free(type_names_split);
+ /* Remove duplicate client names. */
+ num_clients = smartlist_len(clients);
+ smartlist_sort_strings(clients);
+ smartlist_uniq_strings(clients);
+ if (smartlist_len(clients) < num_clients) {
log_info(LD_CONFIG, "HiddenServiceAuthorizeClient contains %d "
"duplicate client name(s); removing.",
num_clients - smartlist_len(clients));
@@ -530,6 +531,16 @@ rend_config_services(const or_options_t *options, int validate_only)
}
}
if (service) {
+ cpd_check_t check_opts = CPD_CHECK_MODE_ONLY;
+ if (service->dir_group_readable) {
+ check_opts |= CPD_GROUP_READ;
+ }
+
+ if (check_private_dir(service->directory, check_opts, options->User) < 0) {
+ rend_service_free(service);
+ return -1;
+ }
+
if (validate_only) {
rend_service_free(service);
} else {
@@ -737,7 +748,7 @@ rend_service_load_keys(rend_service_t *s)
s->directory);
return -1;
}
- s->private_key = init_key_from_file(fname, 1, LOG_ERR);
+ s->private_key = init_key_from_file(fname, 1, LOG_ERR, 0);
if (!s->private_key)
return -1;
@@ -3072,8 +3083,12 @@ rend_services_introduce(void)
const or_options_t *options = get_options();
/* List of nodes we need to _exclude_ when choosing a new node to establish
* an intro point to. */
- smartlist_t *exclude_nodes = smartlist_new();
+ smartlist_t *exclude_nodes;
+
+ if (!have_completed_a_circuit())
+ return;
+ exclude_nodes = smartlist_new();
now = time(NULL);
for (i=0; i < smartlist_len(rend_service_list); ++i) {
diff --git a/src/or/rephist.c b/src/or/rephist.c
index f1e882729b..a190fc8c0a 100644
--- a/src/or/rephist.c
+++ b/src/or/rephist.c
@@ -2908,11 +2908,227 @@ rep_hist_log_circuit_handshake_stats(time_t now)
memset(onion_handshakes_requested, 0, sizeof(onion_handshakes_requested));
}
+/* Hidden service statistics section */
+
+/** Start of the current hidden service stats interval or 0 if we're
+ * not collecting hidden service statistics. */
+static time_t start_of_hs_stats_interval;
+
+/** Carries the various hidden service statistics, and any other
+ * information needed. */
+typedef struct hs_stats_t {
+ /** How many relay cells have we seen as rendezvous points? */
+ int64_t rp_relay_cells_seen;
+
+ /** Set of unique public key digests we've seen this stat period
+ * (could also be implemented as sorted smartlist). */
+ digestmap_t *onions_seen_this_period;
+} hs_stats_t;
+
+/** Our statistics structure singleton. */
+static hs_stats_t *hs_stats = NULL;
+
+/** Allocate, initialize and return an hs_stats_t structure. */
+static hs_stats_t *
+hs_stats_new(void)
+{
+ hs_stats_t * hs_stats = tor_malloc_zero(sizeof(hs_stats_t));
+ hs_stats->onions_seen_this_period = digestmap_new();
+
+ return hs_stats;
+}
+
+/** Free an hs_stats_t structure. */
+static void
+hs_stats_free(hs_stats_t *hs_stats)
+{
+ if (!hs_stats) {
+ return;
+ }
+
+ digestmap_free(hs_stats->onions_seen_this_period, NULL);
+ tor_free(hs_stats);
+}
+
+/** Initialize hidden service statistics. */
+void
+rep_hist_hs_stats_init(time_t now)
+{
+ if (!hs_stats) {
+ hs_stats = hs_stats_new();
+ }
+
+ start_of_hs_stats_interval = now;
+}
+
+/** Clear history of hidden service statistics and set the measurement
+ * interval start to <b>now</b>. */
+static void
+rep_hist_reset_hs_stats(time_t now)
+{
+ if (!hs_stats) {
+ hs_stats = hs_stats_new();
+ }
+
+ hs_stats->rp_relay_cells_seen = 0;
+
+ digestmap_free(hs_stats->onions_seen_this_period, NULL);
+ hs_stats->onions_seen_this_period = digestmap_new();
+
+ start_of_hs_stats_interval = now;
+}
+
+/** Stop collecting hidden service stats in a way that we can re-start
+ * doing so in rep_hist_buffer_stats_init(). */
+void
+rep_hist_hs_stats_term(void)
+{
+ rep_hist_reset_hs_stats(0);
+}
+
+/** We saw a new HS relay cell, Count it! */
+void
+rep_hist_seen_new_rp_cell(void)
+{
+ if (!hs_stats) {
+ return; // We're not collecting stats
+ }
+
+ hs_stats->rp_relay_cells_seen++;
+}
+
+/** As HSDirs, we saw another hidden service with public key
+ * <b>pubkey</b>. Check whether we have counted it before, if not
+ * count it now! */
+void
+rep_hist_stored_maybe_new_hs(const crypto_pk_t *pubkey)
+{
+ char pubkey_hash[DIGEST_LEN];
+
+ if (!hs_stats) {
+ return; // We're not collecting stats
+ }
+
+ /* Get the digest of the pubkey which will be used to detect whether
+ we've seen this hidden service before or not. */
+ if (crypto_pk_get_digest(pubkey, pubkey_hash) < 0) {
+ /* This fail should not happen; key has been validated by
+ descriptor parsing code first. */
+ return;
+ }
+
+ /* Check if this is the first time we've seen this hidden
+ service. If it is, count it as new. */
+ if (!digestmap_get(hs_stats->onions_seen_this_period,
+ pubkey_hash)) {
+ digestmap_set(hs_stats->onions_seen_this_period,
+ pubkey_hash, (void*)(uintptr_t)1);
+ }
+}
+
+/* The number of cells that are supposed to be hidden from the adversary
+ * by adding noise from the Laplace distribution. This value, divided by
+ * EPSILON, is Laplace parameter b. */
+#define REND_CELLS_DELTA_F 2048
+/* Security parameter for obfuscating number of cells with a value between
+ * 0 and 1. Smaller values obfuscate observations more, but at the same
+ * time make statistics less usable. */
+#define REND_CELLS_EPSILON 0.3
+/* The number of cells that are supposed to be hidden from the adversary
+ * by rounding up to the next multiple of this number. */
+#define REND_CELLS_BIN_SIZE 1024
+/* The number of service identities that are supposed to be hidden from
+ * the adversary by adding noise from the Laplace distribution. This
+ * value, divided by EPSILON, is Laplace parameter b. */
+#define ONIONS_SEEN_DELTA_F 8
+/* Security parameter for obfuscating number of service identities with a
+ * value between 0 and 1. Smaller values obfuscate observations more, but
+ * at the same time make statistics less usable. */
+#define ONIONS_SEEN_EPSILON 0.3
+/* The number of service identities that are supposed to be hidden from
+ * the adversary by rounding up to the next multiple of this number. */
+#define ONIONS_SEEN_BIN_SIZE 8
+
+/** Allocate and return a string containing hidden service stats that
+ * are meant to be placed in the extra-info descriptor. */
+static char *
+rep_hist_format_hs_stats(time_t now)
+{
+ char t[ISO_TIME_LEN+1];
+ char *hs_stats_string;
+ int64_t obfuscated_cells_seen;
+ int64_t obfuscated_onions_seen;
+
+ obfuscated_cells_seen = round_int64_to_next_multiple_of(
+ hs_stats->rp_relay_cells_seen,
+ REND_CELLS_BIN_SIZE);
+ obfuscated_cells_seen = add_laplace_noise(obfuscated_cells_seen,
+ crypto_rand_double(),
+ REND_CELLS_DELTA_F, REND_CELLS_EPSILON);
+ obfuscated_onions_seen = round_int64_to_next_multiple_of(digestmap_size(
+ hs_stats->onions_seen_this_period),
+ ONIONS_SEEN_BIN_SIZE);
+ obfuscated_onions_seen = add_laplace_noise(obfuscated_onions_seen,
+ crypto_rand_double(), ONIONS_SEEN_DELTA_F,
+ ONIONS_SEEN_EPSILON);
+
+ format_iso_time(t, now);
+ tor_asprintf(&hs_stats_string, "hidserv-stats-end %s (%d s)\n"
+ "hidserv-rend-relayed-cells "I64_FORMAT" delta_f=%d "
+ "epsilon=%.2f bin_size=%d\n"
+ "hidserv-dir-onions-seen "I64_FORMAT" delta_f=%d "
+ "epsilon=%.2f bin_size=%d\n",
+ t, (unsigned) (now - start_of_hs_stats_interval),
+ I64_PRINTF_ARG(obfuscated_cells_seen), REND_CELLS_DELTA_F,
+ REND_CELLS_EPSILON, REND_CELLS_BIN_SIZE,
+ I64_PRINTF_ARG(obfuscated_onions_seen),
+ ONIONS_SEEN_DELTA_F,
+ ONIONS_SEEN_EPSILON, ONIONS_SEEN_BIN_SIZE);
+
+ return hs_stats_string;
+}
+
+/** If 24 hours have passed since the beginning of the current HS
+ * stats period, write buffer stats to $DATADIR/stats/hidserv-stats
+ * (possibly overwriting an existing file) and reset counters. Return
+ * when we would next want to write buffer stats or 0 if we never want to
+ * write. */
+time_t
+rep_hist_hs_stats_write(time_t now)
+{
+ char *str = NULL;
+
+ if (!start_of_hs_stats_interval) {
+ return 0; /* Not initialized. */
+ }
+
+ if (start_of_hs_stats_interval + WRITE_STATS_INTERVAL > now) {
+ goto done; /* Not ready to write */
+ }
+
+ /* Generate history string. */
+ str = rep_hist_format_hs_stats(now);
+
+ /* Reset HS history. */
+ rep_hist_reset_hs_stats(now);
+
+ /* Try to write to disk. */
+ if (!check_or_create_data_subdir("stats")) {
+ write_to_data_subdir("stats", "hidserv-stats", str,
+ "hidden service stats");
+ }
+
+ done:
+ tor_free(str);
+ return start_of_hs_stats_interval + WRITE_STATS_INTERVAL;
+}
+
/** Free all storage held by the OR/link history caches, by the
* bandwidth history arrays, by the port history, or by statistics . */
void
rep_hist_free_all(void)
{
+ hs_stats_free(hs_stats);
digestmap_free(history_map, free_or_history);
tor_free(read_array);
tor_free(write_array);
diff --git a/src/or/rephist.h b/src/or/rephist.h
index d853fe2e00..8fd1599513 100644
--- a/src/or/rephist.h
+++ b/src/or/rephist.h
@@ -99,6 +99,13 @@ void rep_hist_note_circuit_handshake_requested(uint16_t type);
void rep_hist_note_circuit_handshake_assigned(uint16_t type);
void rep_hist_log_circuit_handshake_stats(time_t now);
+void rep_hist_hs_stats_init(time_t now);
+void rep_hist_hs_stats_term(void);
+time_t rep_hist_hs_stats_write(time_t now);
+char *rep_hist_get_hs_stats_string(void);
+void rep_hist_seen_new_rp_cell(void);
+void rep_hist_stored_maybe_new_hs(const crypto_pk_t *pubkey);
+
void rep_hist_free_all(void);
#endif
diff --git a/src/or/router.c b/src/or/router.c
index 01838b4b3e..7119a29d68 100644
--- a/src/or/router.c
+++ b/src/or/router.c
@@ -392,10 +392,12 @@ log_new_relay_greeting(void)
/** Try to read an RSA key from <b>fname</b>. If <b>fname</b> doesn't exist
* and <b>generate</b> is true, create a new RSA key and save it in
* <b>fname</b>. Return the read/created key, or NULL on error. Log all
- * errors at level <b>severity</b>.
+ * errors at level <b>severity</b>. If <b>log_greeting</b> is non-zero and a
+ * new key was created, log_new_relay_greeting() is called.
*/
crypto_pk_t *
-init_key_from_file(const char *fname, int generate, int severity)
+init_key_from_file(const char *fname, int generate, int severity,
+ int log_greeting)
{
crypto_pk_t *prkey = NULL;
@@ -433,7 +435,9 @@ init_key_from_file(const char *fname, int generate, int severity)
goto error;
}
log_info(LD_GENERAL, "Generated key seems valid");
- log_new_relay_greeting();
+ if (log_greeting) {
+ log_new_relay_greeting();
+ }
if (crypto_pk_write_private_key_to_filename(prkey, fname)) {
tor_log(severity, LD_FS,
"Couldn't write generated key to \"%s\".", fname);
@@ -545,7 +549,7 @@ load_authority_keyset(int legacy, crypto_pk_t **key_out,
fname = get_datadir_fname2("keys",
legacy ? "legacy_signing_key" : "authority_signing_key");
- signing_key = init_key_from_file(fname, 0, LOG_INFO);
+ signing_key = init_key_from_file(fname, 0, LOG_INFO, 0);
if (!signing_key) {
log_warn(LD_DIR, "No version 3 directory key found in %s", fname);
goto done;
@@ -828,7 +832,7 @@ init_keys(void)
/* 1b. Read identity key. Make it if none is found. */
keydir = get_datadir_fname2("keys", "secret_id_key");
log_info(LD_GENERAL,"Reading/making identity key \"%s\"...",keydir);
- prkey = init_key_from_file(keydir, 1, LOG_ERR);
+ prkey = init_key_from_file(keydir, 1, LOG_ERR, 1);
tor_free(keydir);
if (!prkey) return -1;
set_server_identity_key(prkey);
@@ -851,7 +855,7 @@ init_keys(void)
/* 2. Read onion key. Make it if none is found. */
keydir = get_datadir_fname2("keys", "secret_onion_key");
log_info(LD_GENERAL,"Reading/making onion key \"%s\"...",keydir);
- prkey = init_key_from_file(keydir, 1, LOG_ERR);
+ prkey = init_key_from_file(keydir, 1, LOG_ERR, 1);
tor_free(keydir);
if (!prkey) return -1;
set_onion_key(prkey);
@@ -876,7 +880,7 @@ init_keys(void)
keydir = get_datadir_fname2("keys", "secret_onion_key.old");
if (!lastonionkey && file_status(keydir) == FN_FILE) {
- prkey = init_key_from_file(keydir, 1, LOG_ERR); /* XXXX Why 1? */
+ prkey = init_key_from_file(keydir, 1, LOG_ERR, 0); /* XXXX Why 1? */
if (prkey)
lastonionkey = prkey;
}
@@ -2654,6 +2658,11 @@ extrainfo_dump_to_string(char **s_out, extrainfo_t *extrainfo,
"dirreq-stats-end", now, &contents) > 0) {
smartlist_add(chunks, contents);
}
+ if (options->HiddenServiceStatistics &&
+ load_stats_file("stats"PATH_SEPARATOR"hidserv-stats",
+ "hidserv-stats-end", now, &contents) > 0) {
+ smartlist_add(chunks, contents);
+ }
if (options->EntryStatistics &&
load_stats_file("stats"PATH_SEPARATOR"entry-stats",
"entry-stats-end", now, &contents) > 0) {
diff --git a/src/or/router.h b/src/or/router.h
index 16d3845be1..b5d7f11053 100644
--- a/src/or/router.h
+++ b/src/or/router.h
@@ -29,7 +29,7 @@ crypto_pk_t *get_my_v3_legacy_signing_key(void);
void dup_onion_keys(crypto_pk_t **key, crypto_pk_t **last);
void rotate_onion_key(void);
crypto_pk_t *init_key_from_file(const char *fname, int generate,
- int severity);
+ int severity, int log_greeting);
void v3_authority_check_key_expiry(void);
di_digest256_map_t *construct_ntor_key_map(void);
diff --git a/src/or/routerlist.c b/src/or/routerlist.c
index 0ebdac6851..8379bc80b3 100644
--- a/src/or/routerlist.c
+++ b/src/or/routerlist.c
@@ -2030,9 +2030,10 @@ compute_weighted_bandwidths(const smartlist_t *sl,
if (Wg < 0 || Wm < 0 || We < 0 || Wd < 0 || Wgb < 0 || Wmb < 0 || Wdb < 0
|| Web < 0) {
log_debug(LD_CIRC,
- "Got negative bandwidth weights. Defaulting to old selection"
+ "Got negative bandwidth weights. Defaulting to naive selection"
" algorithm.");
- return -1; // Use old algorithm.
+ Wg = Wm = We = Wd = weight_scale;
+ Wgb = Wmb = Web = Wdb = weight_scale;
}
Wg /= weight_scale;
@@ -2048,6 +2049,7 @@ compute_weighted_bandwidths(const smartlist_t *sl,
bandwidths = tor_calloc(smartlist_len(sl), sizeof(u64_dbl_t));
// Cycle through smartlist and total the bandwidth.
+ static int warned_missing_bw = 0;
SMARTLIST_FOREACH_BEGIN(sl, const node_t *, node) {
int is_exit = 0, is_guard = 0, is_dir = 0, this_bw = 0;
double weight = 1;
@@ -2056,15 +2058,18 @@ compute_weighted_bandwidths(const smartlist_t *sl,
is_dir = node_is_dir(node);
if (node->rs) {
if (!node->rs->has_bandwidth) {
- tor_free(bandwidths);
/* This should never happen, unless all the authorites downgrade
* to 0.2.0 or rogue routerstatuses get inserted into our consensus. */
- log_warn(LD_BUG,
- "Consensus is not listing bandwidths. Defaulting back to "
- "old router selection algorithm.");
- return -1;
+ if (! warned_missing_bw) {
+ log_warn(LD_BUG,
+ "Consensus is missing some bandwidths. Using a naive "
+ "router selection algorithm");
+ warned_missing_bw = 1;
+ }
+ this_bw = 30000; /* Chosen arbitrarily */
+ } else {
+ this_bw = kb_to_bytes(node->rs->bandwidth_kb);
}
- this_bw = kb_to_bytes(node->rs->bandwidth_kb);
} else if (node->ri) {
/* bridge or other descriptor not in our consensus */
this_bw = bridge_get_advertised_bandwidth_bounded(node->ri);
@@ -2141,226 +2146,13 @@ frac_nodes_with_descriptors(const smartlist_t *sl,
return present / total;
}
-/** Helper function:
- * choose a random node_t element of smartlist <b>sl</b>, weighted by
- * the advertised bandwidth of each element.
- *
- * If <b>rule</b>==WEIGHT_FOR_EXIT. we're picking an exit node: consider all
- * nodes' bandwidth equally regardless of their Exit status, since there may
- * be some in the list because they exit to obscure ports. If
- * <b>rule</b>==NO_WEIGHTING, we're picking a non-exit node: weight
- * exit-node's bandwidth less depending on the smallness of the fraction of
- * Exit-to-total bandwidth. If <b>rule</b>==WEIGHT_FOR_GUARD, we're picking a
- * guard node: consider all guard's bandwidth equally. Otherwise, weight
- * guards proportionally less.
- */
-static const node_t *
-smartlist_choose_node_by_bandwidth(const smartlist_t *sl,
- bandwidth_weight_rule_t rule)
-{
- unsigned int i;
- u64_dbl_t *bandwidths;
- int is_exit;
- int is_guard;
- int is_fast;
- double total_nonexit_bw = 0, total_exit_bw = 0;
- double total_nonguard_bw = 0, total_guard_bw = 0;
- double exit_weight;
- double guard_weight;
- int n_unknown = 0;
- bitarray_t *fast_bits;
- bitarray_t *exit_bits;
- bitarray_t *guard_bits;
-
- // This function does not support WEIGHT_FOR_DIR
- // or WEIGHT_FOR_MID
- if (rule == WEIGHT_FOR_DIR || rule == WEIGHT_FOR_MID) {
- rule = NO_WEIGHTING;
- }
-
- /* Can't choose exit and guard at same time */
- tor_assert(rule == NO_WEIGHTING ||
- rule == WEIGHT_FOR_EXIT ||
- rule == WEIGHT_FOR_GUARD);
-
- if (smartlist_len(sl) == 0) {
- log_info(LD_CIRC,
- "Empty routerlist passed in to old node selection for rule %s",
- bandwidth_weight_rule_to_string(rule));
- return NULL;
- }
-
- /* First count the total bandwidth weight, and make a list
- * of each value. We use UINT64_MAX to indicate "unknown". */
- bandwidths = tor_calloc(smartlist_len(sl), sizeof(u64_dbl_t));
- fast_bits = bitarray_init_zero(smartlist_len(sl));
- exit_bits = bitarray_init_zero(smartlist_len(sl));
- guard_bits = bitarray_init_zero(smartlist_len(sl));
-
- /* Iterate over all the routerinfo_t or routerstatus_t, and */
- SMARTLIST_FOREACH_BEGIN(sl, const node_t *, node) {
- /* first, learn what bandwidth we think i has */
- int is_known = 1;
- uint32_t this_bw = 0;
- i = node_sl_idx;
-
- is_exit = node_is_good_exit(node);
- is_guard = node->is_possible_guard;
- if (node->rs) {
- if (node->rs->has_bandwidth) {
- this_bw = kb_to_bytes(node->rs->bandwidth_kb);
- } else { /* guess */
- is_known = 0;
- }
- } else if (node->ri) {
- /* Must be a bridge if we're willing to use it */
- this_bw = bridge_get_advertised_bandwidth_bounded(node->ri);
- }
-
- if (is_exit)
- bitarray_set(exit_bits, i);
- if (is_guard)
- bitarray_set(guard_bits, i);
- if (node->is_fast)
- bitarray_set(fast_bits, i);
-
- if (is_known) {
- bandwidths[i].dbl = this_bw;
- if (is_guard)
- total_guard_bw += this_bw;
- else
- total_nonguard_bw += this_bw;
- if (is_exit)
- total_exit_bw += this_bw;
- else
- total_nonexit_bw += this_bw;
- } else {
- ++n_unknown;
- bandwidths[i].dbl = -1.0;
- }
- } SMARTLIST_FOREACH_END(node);
-
-#define EPSILON .1
-
- /* Now, fill in the unknown values. */
- if (n_unknown) {
- int32_t avg_fast, avg_slow;
- if (total_exit_bw+total_nonexit_bw < EPSILON) {
- /* if there's some bandwidth, there's at least one known router,
- * so no worries about div by 0 here */
- int n_known = smartlist_len(sl)-n_unknown;
- avg_fast = avg_slow = (int32_t)
- ((total_exit_bw+total_nonexit_bw)/((uint64_t) n_known));
- } else {
- avg_fast = 40000;
- avg_slow = 20000;
- }
- for (i=0; i<(unsigned)smartlist_len(sl); ++i) {
- if (bandwidths[i].dbl >= 0.0)
- continue;
- is_fast = bitarray_is_set(fast_bits, i);
- is_exit = bitarray_is_set(exit_bits, i);
- is_guard = bitarray_is_set(guard_bits, i);
- bandwidths[i].dbl = is_fast ? avg_fast : avg_slow;
- if (is_exit)
- total_exit_bw += bandwidths[i].dbl;
- else
- total_nonexit_bw += bandwidths[i].dbl;
- if (is_guard)
- total_guard_bw += bandwidths[i].dbl;
- else
- total_nonguard_bw += bandwidths[i].dbl;
- }
- }
-
- /* If there's no bandwidth at all, pick at random. */
- if (total_exit_bw+total_nonexit_bw < EPSILON) {
- tor_free(bandwidths);
- tor_free(fast_bits);
- tor_free(exit_bits);
- tor_free(guard_bits);
- return smartlist_choose(sl);
- }
-
- /* Figure out how to weight exits and guards */
- {
- double all_bw = U64_TO_DBL(total_exit_bw+total_nonexit_bw);
- double exit_bw = U64_TO_DBL(total_exit_bw);
- double guard_bw = U64_TO_DBL(total_guard_bw);
- /*
- * For detailed derivation of this formula, see
- * http://archives.seul.org/or/dev/Jul-2007/msg00056.html
- */
- if (rule == WEIGHT_FOR_EXIT || total_exit_bw<EPSILON)
- exit_weight = 1.0;
- else
- exit_weight = 1.0 - all_bw/(3.0*exit_bw);
-
- if (rule == WEIGHT_FOR_GUARD || total_guard_bw<EPSILON)
- guard_weight = 1.0;
- else
- guard_weight = 1.0 - all_bw/(3.0*guard_bw);
-
- if (exit_weight <= 0.0)
- exit_weight = 0.0;
-
- if (guard_weight <= 0.0)
- guard_weight = 0.0;
-
- for (i=0; i < (unsigned)smartlist_len(sl); i++) {
- tor_assert(bandwidths[i].dbl >= 0.0);
-
- is_exit = bitarray_is_set(exit_bits, i);
- is_guard = bitarray_is_set(guard_bits, i);
- if (is_exit && is_guard)
- bandwidths[i].dbl *= exit_weight * guard_weight;
- else if (is_guard)
- bandwidths[i].dbl *= guard_weight;
- else if (is_exit)
- bandwidths[i].dbl *= exit_weight;
- }
- }
-
-#if 0
- log_debug(LD_CIRC, "Total weighted bw = "U64_FORMAT
- ", exit bw = "U64_FORMAT
- ", nonexit bw = "U64_FORMAT", exit weight = %f "
- "(for exit == %d)"
- ", guard bw = "U64_FORMAT
- ", nonguard bw = "U64_FORMAT", guard weight = %f "
- "(for guard == %d)",
- U64_PRINTF_ARG(total_bw),
- U64_PRINTF_ARG(total_exit_bw), U64_PRINTF_ARG(total_nonexit_bw),
- exit_weight, (int)(rule == WEIGHT_FOR_EXIT),
- U64_PRINTF_ARG(total_guard_bw), U64_PRINTF_ARG(total_nonguard_bw),
- guard_weight, (int)(rule == WEIGHT_FOR_GUARD));
-#endif
-
- scale_array_elements_to_u64(bandwidths, smartlist_len(sl), NULL);
-
- {
- int idx = choose_array_element_by_weight(bandwidths,
- smartlist_len(sl));
- tor_free(bandwidths);
- tor_free(fast_bits);
- tor_free(exit_bits);
- tor_free(guard_bits);
- return idx < 0 ? NULL : smartlist_get(sl, idx);
- }
-}
-
/** Choose a random element of status list <b>sl</b>, weighted by
* the advertised bandwidth of each node */
const node_t *
node_sl_choose_by_bandwidth(const smartlist_t *sl,
bandwidth_weight_rule_t rule)
{ /*XXXX MOVE */
- const node_t *ret;
- if ((ret = smartlist_choose_node_by_bandwidth_weights(sl, rule))) {
- return ret;
- } else {
- return smartlist_choose_node_by_bandwidth(sl, rule);
- }
+ return smartlist_choose_node_by_bandwidth_weights(sl, rule);
}
/** Return a random running node from the nodelist. Never
diff --git a/src/or/scheduler.c b/src/or/scheduler.c
new file mode 100644
index 0000000000..d1a15aacb2
--- /dev/null
+++ b/src/or/scheduler.c
@@ -0,0 +1,709 @@
+/* * Copyright (c) 2013, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file scheduler.c
+ * \brief Relay scheduling system
+ **/
+
+#include "or.h"
+
+#define TOR_CHANNEL_INTERNAL_ /* For channel_flush_some_cells() */
+#include "channel.h"
+
+#include "compat_libevent.h"
+#define SCHEDULER_PRIVATE_
+#include "scheduler.h"
+
+#ifdef HAVE_EVENT2_EVENT_H
+#include <event2/event.h>
+#else
+#include <event.h>
+#endif
+
+/*
+ * Scheduler high/low watermarks
+ */
+
+static uint32_t sched_q_low_water = 16384;
+static uint32_t sched_q_high_water = 32768;
+
+/*
+ * Maximum cells to flush in a single call to channel_flush_some_cells();
+ * setting this low means more calls, but too high and we could overshoot
+ * sched_q_high_water.
+ */
+
+static uint32_t sched_max_flush_cells = 16;
+
+/*
+ * Write scheduling works by keeping track of which channels can
+ * accept cells, and have cells to write. From the scheduler's perspective,
+ * a channel can be in four possible states:
+ *
+ * 1.) Not open for writes, no cells to send
+ * - Not much to do here, and the channel will have scheduler_state ==
+ * SCHED_CHAN_IDLE
+ * - Transitions from:
+ * - Open for writes/has cells by simultaneously draining all circuit
+ * queues and filling the output buffer.
+ * - Transitions to:
+ * - Not open for writes/has cells by arrival of cells on an attached
+ * circuit (this would be driven from append_cell_to_circuit_queue())
+ * - Open for writes/no cells by a channel type specific path;
+ * driven from connection_or_flushed_some() for channel_tls_t.
+ *
+ * 2.) Open for writes, no cells to send
+ * - Not much here either; this will be the state an idle but open channel
+ * can be expected to settle in. It will have scheduler_state ==
+ * SCHED_CHAN_WAITING_FOR_CELLS
+ * - Transitions from:
+ * - Not open for writes/no cells by flushing some of the output
+ * buffer.
+ * - Open for writes/has cells by the scheduler moving cells from
+ * circuit queues to channel output queue, but not having enough
+ * to fill the output queue.
+ * - Transitions to:
+ * - Open for writes/has cells by arrival of new cells on an attached
+ * circuit, in append_cell_to_circuit_queue()
+ *
+ * 3.) Not open for writes, cells to send
+ * - This is the state of a busy circuit limited by output bandwidth;
+ * cells have piled up in the circuit queues waiting to be relayed.
+ * The channel will have scheduler_state == SCHED_CHAN_WAITING_TO_WRITE.
+ * - Transitions from:
+ * - Not open for writes/no cells by arrival of cells on an attached
+ * circuit
+ * - Open for writes/has cells by filling an output buffer without
+ * draining all cells from attached circuits
+ * - Transitions to:
+ * - Opens for writes/has cells by draining some of the output buffer
+ * via the connection_or_flushed_some() path (for channel_tls_t).
+ *
+ * 4.) Open for writes, cells to send
+ * - This connection is ready to relay some cells and waiting for
+ * the scheduler to choose it. The channel will have scheduler_state ==
+ * SCHED_CHAN_PENDING.
+ * - Transitions from:
+ * - Not open for writes/has cells by the connection_or_flushed_some()
+ * path
+ * - Open for writes/no cells by the append_cell_to_circuit_queue()
+ * path
+ * - Transitions to:
+ * - Not open for writes/no cells by draining all circuit queues and
+ * simultaneously filling the output buffer.
+ * - Not open for writes/has cells by writing enough cells to fill the
+ * output buffer
+ * - Open for writes/no cells by draining all attached circuit queues
+ * without also filling the output buffer
+ *
+ * Other event-driven parts of the code move channels between these scheduling
+ * states by calling scheduler functions; the scheduler only runs on open-for-
+ * writes/has-cells channels and is the only path for those to transition to
+ * other states. The scheduler_run() function gives us the opportunity to do
+ * scheduling work, and is called from other scheduler functions whenever a
+ * state transition occurs, and periodically from the main event loop.
+ */
+
+/* Scheduler global data structures */
+
+/*
+ * We keep a list of channels that are pending - i.e, have cells to write
+ * and can accept them to send. The enum scheduler_state in channel_t
+ * is reserved for our use.
+ */
+
+/* Pqueue of channels that can write and have cells (pending work) */
+STATIC smartlist_t *channels_pending = NULL;
+
+/*
+ * This event runs the scheduler from its callback, and is manually
+ * activated whenever a channel enters open for writes/cells to send.
+ */
+
+STATIC struct event *run_sched_ev = NULL;
+
+/*
+ * Queue heuristic; this is not the queue size, but an 'effective queuesize'
+ * that ages out contributions from stalled channels.
+ */
+
+STATIC uint64_t queue_heuristic = 0;
+
+/*
+ * Timestamp for last queue heuristic update
+ */
+
+STATIC time_t queue_heuristic_timestamp = 0;
+
+/* Scheduler static function declarations */
+
+static void scheduler_evt_callback(evutil_socket_t fd,
+ short events, void *arg);
+static int scheduler_more_work(void);
+static void scheduler_retrigger(void);
+#if 0
+static void scheduler_trigger(void);
+#endif
+
+/* Scheduler function implementations */
+
+/** Free everything and shut down the scheduling system */
+
+void
+scheduler_free_all(void)
+{
+ log_debug(LD_SCHED, "Shutting down scheduler");
+
+ if (run_sched_ev) {
+ event_del(run_sched_ev);
+ tor_event_free(run_sched_ev);
+ run_sched_ev = NULL;
+ }
+
+ if (channels_pending) {
+ smartlist_free(channels_pending);
+ channels_pending = NULL;
+ }
+}
+
+/**
+ * Comparison function to use when sorting pending channels
+ */
+
+MOCK_IMPL(STATIC int,
+scheduler_compare_channels, (const void *c1_v, const void *c2_v))
+{
+ channel_t *c1 = NULL, *c2 = NULL;
+ /* These are a workaround for -Wbad-function-cast throwing a fit */
+ const circuitmux_policy_t *p1, *p2;
+ uintptr_t p1_i, p2_i;
+
+ tor_assert(c1_v);
+ tor_assert(c2_v);
+
+ c1 = (channel_t *)(c1_v);
+ c2 = (channel_t *)(c2_v);
+
+ tor_assert(c1);
+ tor_assert(c2);
+
+ if (c1 != c2) {
+ if (circuitmux_get_policy(c1->cmux) ==
+ circuitmux_get_policy(c2->cmux)) {
+ /* Same cmux policy, so use the mux comparison */
+ return circuitmux_compare_muxes(c1->cmux, c2->cmux);
+ } else {
+ /*
+ * Different policies; not important to get this edge case perfect
+ * because the current code never actually gives different channels
+ * different cmux policies anyway. Just use this arbitrary but
+ * definite choice.
+ */
+ p1 = circuitmux_get_policy(c1->cmux);
+ p2 = circuitmux_get_policy(c2->cmux);
+ p1_i = (uintptr_t)p1;
+ p2_i = (uintptr_t)p2;
+
+ return (p1_i < p2_i) ? -1 : 1;
+ }
+ } else {
+ /* c1 == c2, so always equal */
+ return 0;
+ }
+}
+
+/*
+ * Scheduler event callback; this should get triggered once per event loop
+ * if any scheduling work was created during the event loop.
+ */
+
+static void
+scheduler_evt_callback(evutil_socket_t fd, short events, void *arg)
+{
+ (void)fd;
+ (void)events;
+ (void)arg;
+ log_debug(LD_SCHED, "Scheduler event callback called");
+
+ tor_assert(run_sched_ev);
+
+ /* Run the scheduler */
+ scheduler_run();
+
+ /* Do we have more work to do? */
+ if (scheduler_more_work()) scheduler_retrigger();
+}
+
+/** Mark a channel as no longer ready to accept writes */
+
+MOCK_IMPL(void,
+scheduler_channel_doesnt_want_writes,(channel_t *chan))
+{
+ tor_assert(chan);
+
+ tor_assert(channels_pending);
+
+ /* If it's already in pending, we can put it in waiting_to_write */
+ if (chan->scheduler_state == SCHED_CHAN_PENDING) {
+ /*
+ * It's in channels_pending, so it shouldn't be in any of
+ * the other lists. It can't write any more, so it goes to
+ * channels_waiting_to_write.
+ */
+ smartlist_pqueue_remove(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx),
+ chan);
+ chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p went from pending "
+ "to waiting_to_write",
+ U64_PRINTF_ARG(chan->global_identifier), chan);
+ } else {
+ /*
+ * It's not in pending, so it can't become waiting_to_write; it's
+ * either not in any of the lists (nothing to do) or it's already in
+ * waiting_for_cells (remove it, can't write any more).
+ */
+ if (chan->scheduler_state == SCHED_CHAN_WAITING_FOR_CELLS) {
+ chan->scheduler_state = SCHED_CHAN_IDLE;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p left waiting_for_cells",
+ U64_PRINTF_ARG(chan->global_identifier), chan);
+ }
+ }
+}
+
+/** Mark a channel as having waiting cells */
+
+MOCK_IMPL(void,
+scheduler_channel_has_waiting_cells,(channel_t *chan))
+{
+ int became_pending = 0;
+
+ tor_assert(chan);
+ tor_assert(channels_pending);
+
+ /* First, check if this one also writeable */
+ if (chan->scheduler_state == SCHED_CHAN_WAITING_FOR_CELLS) {
+ /*
+ * It's in channels_waiting_for_cells, so it shouldn't be in any of
+ * the other lists. It has waiting cells now, so it goes to
+ * channels_pending.
+ */
+ chan->scheduler_state = SCHED_CHAN_PENDING;
+ smartlist_pqueue_add(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx),
+ chan);
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p went from waiting_for_cells "
+ "to pending",
+ U64_PRINTF_ARG(chan->global_identifier), chan);
+ became_pending = 1;
+ } else {
+ /*
+ * It's not in waiting_for_cells, so it can't become pending; it's
+ * either not in any of the lists (we add it to waiting_to_write)
+ * or it's already in waiting_to_write or pending (we do nothing)
+ */
+ if (!(chan->scheduler_state == SCHED_CHAN_WAITING_TO_WRITE ||
+ chan->scheduler_state == SCHED_CHAN_PENDING)) {
+ chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p entered waiting_to_write",
+ U64_PRINTF_ARG(chan->global_identifier), chan);
+ }
+ }
+
+ /*
+ * If we made a channel pending, we potentially have scheduling work
+ * to do.
+ */
+ if (became_pending) scheduler_retrigger();
+}
+
+/** Set up the scheduling system */
+
+void
+scheduler_init(void)
+{
+ log_debug(LD_SCHED, "Initting scheduler");
+
+ tor_assert(!run_sched_ev);
+ run_sched_ev = tor_event_new(tor_libevent_get_base(), -1,
+ 0, scheduler_evt_callback, NULL);
+
+ channels_pending = smartlist_new();
+ queue_heuristic = 0;
+ queue_heuristic_timestamp = approx_time();
+}
+
+/** Check if there's more scheduling work */
+
+static int
+scheduler_more_work(void)
+{
+ tor_assert(channels_pending);
+
+ return ((scheduler_get_queue_heuristic() < sched_q_low_water) &&
+ ((smartlist_len(channels_pending) > 0))) ? 1 : 0;
+}
+
+/** Retrigger the scheduler in a way safe to use from the callback */
+
+static void
+scheduler_retrigger(void)
+{
+ tor_assert(run_sched_ev);
+ event_active(run_sched_ev, EV_TIMEOUT, 1);
+}
+
+/** Notify the scheduler of a channel being closed */
+
+MOCK_IMPL(void,
+scheduler_release_channel,(channel_t *chan))
+{
+ tor_assert(chan);
+ tor_assert(channels_pending);
+
+ if (chan->scheduler_state == SCHED_CHAN_PENDING) {
+ smartlist_pqueue_remove(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx),
+ chan);
+ }
+
+ chan->scheduler_state = SCHED_CHAN_IDLE;
+}
+
+/** Run the scheduling algorithm if necessary */
+
+MOCK_IMPL(void,
+scheduler_run, (void))
+{
+ int n_cells, n_chans_before, n_chans_after;
+ uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after;
+ ssize_t flushed, flushed_this_time;
+ smartlist_t *to_readd = NULL;
+ channel_t *chan = NULL;
+
+ log_debug(LD_SCHED, "We have a chance to run the scheduler");
+
+ if (scheduler_get_queue_heuristic() < sched_q_low_water) {
+ n_chans_before = smartlist_len(channels_pending);
+ q_len_before = channel_get_global_queue_estimate();
+ q_heur_before = scheduler_get_queue_heuristic();
+
+ while (scheduler_get_queue_heuristic() <= sched_q_high_water &&
+ smartlist_len(channels_pending) > 0) {
+ /* Pop off a channel */
+ chan = smartlist_pqueue_pop(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx));
+ tor_assert(chan);
+
+ /* Figure out how many cells we can write */
+ n_cells = channel_num_cells_writeable(chan);
+ if (n_cells > 0) {
+ log_debug(LD_SCHED,
+ "Scheduler saw pending channel " U64_FORMAT " at %p with "
+ "%d cells writeable",
+ U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
+
+ flushed = 0;
+ while (flushed < n_cells &&
+ scheduler_get_queue_heuristic() <= sched_q_high_water) {
+ flushed_this_time =
+ channel_flush_some_cells(chan,
+ MIN(sched_max_flush_cells,
+ (size_t) n_cells - flushed));
+ if (flushed_this_time <= 0) break;
+ flushed += flushed_this_time;
+ }
+
+ if (flushed < n_cells) {
+ /* We ran out of cells to flush */
+ chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "entered waiting_for_cells from pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ } else {
+ /* The channel may still have some cells */
+ if (channel_more_to_flush(chan)) {
+ /* The channel goes to either pending or waiting_to_write */
+ if (channel_num_cells_writeable(chan) > 0) {
+ /* Add it back to pending later */
+ if (!to_readd) to_readd = smartlist_new();
+ smartlist_add(to_readd, chan);
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "is still pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ } else {
+ /* It's waiting to be able to write more */
+ chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "entered waiting_to_write from pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ }
+ } else {
+ /* No cells left; it can go to idle or waiting_for_cells */
+ if (channel_num_cells_writeable(chan) > 0) {
+ /*
+ * It can still accept writes, so it goes to
+ * waiting_for_cells
+ */
+ chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "entered waiting_for_cells from pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ } else {
+ /*
+ * We exactly filled up the output queue with all available
+ * cells; go to idle.
+ */
+ chan->scheduler_state = SCHED_CHAN_IDLE;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p "
+ "become idle from pending",
+ U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ }
+ }
+ }
+
+ log_debug(LD_SCHED,
+ "Scheduler flushed %d cells onto pending channel "
+ U64_FORMAT " at %p",
+ (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
+ chan);
+ } else {
+ log_info(LD_SCHED,
+ "Scheduler saw pending channel " U64_FORMAT " at %p with "
+ "no cells writeable",
+ U64_PRINTF_ARG(chan->global_identifier), chan);
+ /* Put it back to WAITING_TO_WRITE */
+ chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+ }
+ }
+
+ /* Readd any channels we need to */
+ if (to_readd) {
+ SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, chan) {
+ chan->scheduler_state = SCHED_CHAN_PENDING;
+ smartlist_pqueue_add(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx),
+ chan);
+ } SMARTLIST_FOREACH_END(chan);
+ smartlist_free(to_readd);
+ }
+
+ n_chans_after = smartlist_len(channels_pending);
+ q_len_after = channel_get_global_queue_estimate();
+ q_heur_after = scheduler_get_queue_heuristic();
+ log_debug(LD_SCHED,
+ "Scheduler handled %d of %d pending channels, queue size from "
+ U64_FORMAT " to " U64_FORMAT ", queue heuristic from "
+ U64_FORMAT " to " U64_FORMAT,
+ n_chans_before - n_chans_after, n_chans_before,
+ U64_PRINTF_ARG(q_len_before), U64_PRINTF_ARG(q_len_after),
+ U64_PRINTF_ARG(q_heur_before), U64_PRINTF_ARG(q_heur_after));
+ }
+}
+
+/** Trigger the scheduling event so we run the scheduler later */
+
+#if 0
+static void
+scheduler_trigger(void)
+{
+ log_debug(LD_SCHED, "Triggering scheduler event");
+
+ tor_assert(run_sched_ev);
+
+ event_add(run_sched_ev, EV_TIMEOUT, 1);
+}
+#endif
+
+/** Mark a channel as ready to accept writes */
+
+void
+scheduler_channel_wants_writes(channel_t *chan)
+{
+ int became_pending = 0;
+
+ tor_assert(chan);
+ tor_assert(channels_pending);
+
+ /* If it's already in waiting_to_write, we can put it in pending */
+ if (chan->scheduler_state == SCHED_CHAN_WAITING_TO_WRITE) {
+ /*
+ * It can write now, so it goes to channels_pending.
+ */
+ smartlist_pqueue_add(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx),
+ chan);
+ chan->scheduler_state = SCHED_CHAN_PENDING;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p went from waiting_to_write "
+ "to pending",
+ U64_PRINTF_ARG(chan->global_identifier), chan);
+ became_pending = 1;
+ } else {
+ /*
+ * It's not in SCHED_CHAN_WAITING_TO_WRITE, so it can't become pending;
+ * it's either idle and goes to WAITING_FOR_CELLS, or it's a no-op.
+ */
+ if (!(chan->scheduler_state == SCHED_CHAN_WAITING_FOR_CELLS ||
+ chan->scheduler_state == SCHED_CHAN_PENDING)) {
+ chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+ log_debug(LD_SCHED,
+ "Channel " U64_FORMAT " at %p entered waiting_for_cells",
+ U64_PRINTF_ARG(chan->global_identifier), chan);
+ }
+ }
+
+ /*
+ * If we made a channel pending, we potentially have scheduling work
+ * to do.
+ */
+ if (became_pending) scheduler_retrigger();
+}
+
+/**
+ * Notify the scheduler that a channel's position in the pqueue may have
+ * changed
+ */
+
+void
+scheduler_touch_channel(channel_t *chan)
+{
+ tor_assert(chan);
+
+ if (chan->scheduler_state == SCHED_CHAN_PENDING) {
+ /* Remove and re-add it */
+ smartlist_pqueue_remove(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx),
+ chan);
+ smartlist_pqueue_add(channels_pending,
+ scheduler_compare_channels,
+ STRUCT_OFFSET(channel_t, sched_heap_idx),
+ chan);
+ }
+ /* else no-op, since it isn't in the queue */
+}
+
+/**
+ * Notify the scheduler of a queue size adjustment, to recalculate the
+ * queue heuristic.
+ */
+
+void
+scheduler_adjust_queue_size(channel_t *chan, char dir, uint64_t adj)
+{
+ time_t now = approx_time();
+
+ log_debug(LD_SCHED,
+ "Queue size adjustment by %s" U64_FORMAT " for channel "
+ U64_FORMAT,
+ (dir >= 0) ? "+" : "-",
+ U64_PRINTF_ARG(adj),
+ U64_PRINTF_ARG(chan->global_identifier));
+
+ /* Get the queue heuristic up to date */
+ scheduler_update_queue_heuristic(now);
+
+ /* Adjust as appropriate */
+ if (dir >= 0) {
+ /* Increasing it */
+ queue_heuristic += adj;
+ } else {
+ /* Decreasing it */
+ if (queue_heuristic > adj) queue_heuristic -= adj;
+ else queue_heuristic = 0;
+ }
+
+ log_debug(LD_SCHED,
+ "Queue heuristic is now " U64_FORMAT,
+ U64_PRINTF_ARG(queue_heuristic));
+}
+
+/**
+ * Query the current value of the queue heuristic
+ */
+
+STATIC uint64_t
+scheduler_get_queue_heuristic(void)
+{
+ time_t now = approx_time();
+
+ scheduler_update_queue_heuristic(now);
+
+ return queue_heuristic;
+}
+
+/**
+ * Adjust the queue heuristic value to the present time
+ */
+
+STATIC void
+scheduler_update_queue_heuristic(time_t now)
+{
+ time_t diff;
+
+ if (queue_heuristic_timestamp == 0) {
+ /*
+ * Nothing we can sensibly do; must not have been initted properly.
+ * Oh well.
+ */
+ queue_heuristic_timestamp = now;
+ } else if (queue_heuristic_timestamp < now) {
+ diff = now - queue_heuristic_timestamp;
+ /*
+ * This is a simple exponential age-out; the other proposed alternative
+ * was a linear age-out using the bandwidth history in rephist.c; I'm
+ * going with this out of concern that if an adversary can jam the
+ * scheduler long enough, it would cause the bandwidth to drop to
+ * zero and render the aging mechanism ineffective thereafter.
+ */
+ if (0 <= diff && diff < 64) queue_heuristic >>= diff;
+ else queue_heuristic = 0;
+
+ queue_heuristic_timestamp = now;
+
+ log_debug(LD_SCHED,
+ "Queue heuristic is now " U64_FORMAT,
+ U64_PRINTF_ARG(queue_heuristic));
+ }
+ /* else no update needed, or time went backward */
+}
+
+/**
+ * Set scheduler watermarks and flush size
+ */
+
+void
+scheduler_set_watermarks(uint32_t lo, uint32_t hi, uint32_t max_flush)
+{
+ /* Sanity assertions - caller should ensure these are true */
+ tor_assert(lo > 0);
+ tor_assert(hi > lo);
+ tor_assert(max_flush > 0);
+
+ sched_q_low_water = lo;
+ sched_q_high_water = hi;
+ sched_max_flush_cells = max_flush;
+}
+
diff --git a/src/or/scheduler.h b/src/or/scheduler.h
new file mode 100644
index 0000000000..404776b18b
--- /dev/null
+++ b/src/or/scheduler.h
@@ -0,0 +1,50 @@
+/* * Copyright (c) 2013, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file scheduler.h
+ * \brief Header file for scheduler.c
+ **/
+
+#ifndef TOR_SCHEDULER_H
+#define TOR_SCHEDULER_H
+
+#include "or.h"
+#include "channel.h"
+#include "testsupport.h"
+
+/* Global-visibility scheduler functions */
+
+/* Set up and shut down the scheduler from main.c */
+void scheduler_free_all(void);
+void scheduler_init(void);
+MOCK_DECL(void, scheduler_run, (void));
+
+/* Mark channels as having cells or wanting/not wanting writes */
+MOCK_DECL(void,scheduler_channel_doesnt_want_writes,(channel_t *chan));
+MOCK_DECL(void,scheduler_channel_has_waiting_cells,(channel_t *chan));
+void scheduler_channel_wants_writes(channel_t *chan);
+
+/* Notify the scheduler of a channel being closed */
+MOCK_DECL(void,scheduler_release_channel,(channel_t *chan));
+
+/* Notify scheduler of queue size adjustments */
+void scheduler_adjust_queue_size(channel_t *chan, char dir, uint64_t adj);
+
+/* Notify scheduler that a channel's queue position may have changed */
+void scheduler_touch_channel(channel_t *chan);
+
+/* Adjust the watermarks from config file*/
+void scheduler_set_watermarks(uint32_t lo, uint32_t hi, uint32_t max_flush);
+
+/* Things only scheduler.c and its test suite should see */
+
+#ifdef SCHEDULER_PRIVATE_
+MOCK_DECL(STATIC int, scheduler_compare_channels,
+ (const void *c1_v, const void *c2_v));
+STATIC uint64_t scheduler_get_queue_heuristic(void);
+STATIC void scheduler_update_queue_heuristic(time_t now);
+#endif
+
+#endif /* !defined(TOR_SCHEDULER_H) */
+