summaryrefslogtreecommitdiff
path: root/src/or/channel.c
diff options
context:
space:
mode:
authorAndrea Shepard <andrea@torproject.org>2013-11-05 00:30:02 -0800
committerAndrea Shepard <andrea@torproject.org>2014-09-30 22:49:03 -0700
commit8852a1794cfa9eb5dae494f5d85242d8fd6955fc (patch)
tree266daad8c0e03603a5e8e473cafaa8c2166d7832 /src/or/channel.c
parent7674308f622904bc5a4cff5e7cb4d0f22fbf84d7 (diff)
downloadtor-8852a1794cfa9eb5dae494f5d85242d8fd6955fc.tar.gz
tor-8852a1794cfa9eb5dae494f5d85242d8fd6955fc.zip
Track total queue size per channel, with overhead estimates, and global queue total
Diffstat (limited to 'src/or/channel.c')
-rw-r--r--src/or/channel.c126
1 files changed, 126 insertions, 0 deletions
diff --git a/src/or/channel.c b/src/or/channel.c
index 0caebfb55c..f729a17be9 100644
--- a/src/or/channel.c
+++ b/src/or/channel.c
@@ -124,6 +124,13 @@ static uint64_t n_channel_bytes_passed_to_lower_layer = 0;
static uint64_t n_channel_bytes_in_queues = 0;
+/*
+ * Current total estimated queue size *including lower layer queues and
+ * transmit overhead*
+ */
+
+static uint64_t estimated_total_queue_size = 0;
+
/* Digest->channel map
*
* Similar to the one used in connection_or.c, this maps from the identity
@@ -1840,6 +1847,8 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
n_channel_bytes_queued += cell_bytes;
n_channel_bytes_in_queues += cell_bytes;
channel_assert_counter_consistency();
+ /* Update channel queue size */
+ chan->bytes_in_queue += cell_bytes;
/* Try to process the queue? */
if (chan->state == CHANNEL_STATE_OPEN) channel_flush_cells(chan);
}
@@ -1878,6 +1887,9 @@ channel_write_cell(channel_t *chan, cell_t *cell)
q.type = CELL_QUEUE_FIXED;
q.u.fixed.cell = cell;
channel_write_cell_queue_entry(chan, &q);
+
+ /* Update the queue size estimate */
+ channel_update_xmit_queue_size(chan);
}
/**
@@ -1913,6 +1925,9 @@ channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell)
q.type = CELL_QUEUE_PACKED;
q.u.packed.packed_cell = packed_cell;
channel_write_cell_queue_entry(chan, &q);
+
+ /* Update the queue size estimate */
+ channel_update_xmit_queue_size(chan);
}
/**
@@ -1949,6 +1964,9 @@ channel_write_var_cell(channel_t *chan, var_cell_t *var_cell)
q.type = CELL_QUEUE_VAR;
q.u.var.var_cell = var_cell;
channel_write_cell_queue_entry(chan, &q);
+
+ /* Update the queue size estimate */
+ channel_update_xmit_queue_size(chan);
}
/**
@@ -2056,6 +2074,29 @@ channel_change_state(channel_t *chan, channel_state_t to_state)
scheduler_channel_doesnt_want_writes(chan);
}
+ /*
+ * If we're closing, this channel no longer counts toward the global
+ * estimated queue size; if we're open, it now does.
+ */
+ if ((to_state == CHANNEL_STATE_CLOSING ||
+ to_state == CHANNEL_STATE_CLOSED ||
+ to_state == CHANNEL_STATE_ERROR) &&
+ (from_state == CHANNEL_STATE_OPEN ||
+ from_state == CHANNEL_STATE_MAINT)) {
+ estimated_total_queue_size -= chan->bytes_in_queue;
+ }
+
+ /*
+ * If we're opening, this channel now does count toward the global
+ * estimated queue size.
+ */
+ if ((to_state == CHANNEL_STATE_OPEN ||
+ to_state == CHANNEL_STATE_MAINT) &&
+ !(from_state == CHANNEL_STATE_OPEN ||
+ from_state == CHANNEL_STATE_MAINT)) {
+ estimated_total_queue_size += chan->bytes_in_queue;
+ }
+
/* Tell circuits if we opened and stuff */
if (to_state == CHANNEL_STATE_OPEN) {
channel_do_open_actions(chan);
@@ -2350,6 +2391,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
n_channel_bytes_passed_to_lower_layer += cell_size;
n_channel_bytes_in_queues -= cell_size;
channel_assert_counter_consistency();
+ /* Update the channel's queue size too */
+ chan->bytes_in_queue -= cell_size;
}
/* No cell removed from list, so we can't go on any further */
else break;
@@ -2362,6 +2405,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
channel_timestamp_drained(chan);
}
+ /* Update the estimate queue size */
+ channel_update_xmit_queue_size(chan);
+
return flushed;
}
@@ -4421,3 +4467,83 @@ channel_set_circid_type(channel_t *chan,
}
}
+/**
+ * Update the estimated number of bytes queued to transmit for this channel,
+ * and notify the scheduler. The estimate includes both the channel queue and
+ * the queue size reported by the lower layer, and an overhead estimate
+ * optionally provided by the lower layer.
+ */
+
+void
+channel_update_xmit_queue_size(channel_t *chan)
+{
+ uint64_t queued, adj;
+ double overhead;
+
+ tor_assert(chan);
+ tor_assert(chan->num_bytes_queued);
+
+ /*
+ * First, get the number of bytes we have queued without factoring in
+ * lower-layer overhead.
+ */
+ queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue;
+ /* Next, adjust by the overhead factor, if any is available */
+ if (chan->get_overhead_estimate) {
+ overhead = chan->get_overhead_estimate(chan);
+ if (overhead >= 1.0f) {
+ queued *= overhead;
+ } else {
+ /* Ignore silly overhead factors */
+ log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead);
+ }
+ }
+
+ /* Now, compare to the previous estimate */
+ if (queued > chan->bytes_queued_for_xmit) {
+ adj = queued - chan->bytes_queued_for_xmit;
+ log_debug(LD_CHANNEL,
+ "Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT
+ " from " U64_FORMAT " to " U64_FORMAT,
+ U64_PRINTF_ARG(chan->global_identifier),
+ U64_PRINTF_ARG(adj),
+ U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
+ U64_PRINTF_ARG(queued));
+ /* Update the channel's estimate */
+ chan->bytes_queued_for_xmit = queued;
+
+ /* Update the global queue size estimate if appropriate */
+ if (chan->state == CHANNEL_STATE_OPEN ||
+ chan->state == CHANNEL_STATE_MAINT) {
+ estimated_total_queue_size += adj;
+ log_debug(LD_CHANNEL,
+ "Increasing global queue size by " U64_FORMAT " for channel "
+ U64_FORMAT ", new size is " U64_FORMAT,
+ U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
+ U64_PRINTF_ARG(estimated_total_queue_size));
+ }
+ } else if (queued < chan->bytes_queued_for_xmit) {
+ adj = chan->bytes_queued_for_xmit - queued;
+ log_debug(LD_CHANNEL,
+ "Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT
+ " from " U64_FORMAT " to " U64_FORMAT,
+ U64_PRINTF_ARG(chan->global_identifier),
+ U64_PRINTF_ARG(adj),
+ U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
+ U64_PRINTF_ARG(queued));
+ /* Update the channel's estimate */
+ chan->bytes_queued_for_xmit = queued;
+
+ /* Update the global queue size estimate if appropriate */
+ if (chan->state == CHANNEL_STATE_OPEN ||
+ chan->state == CHANNEL_STATE_MAINT) {
+ estimated_total_queue_size -= adj;
+ log_debug(LD_CHANNEL,
+ "Decreasing global queue size by " U64_FORMAT " for channel "
+ U64_FORMAT ", new size is " U64_FORMAT,
+ U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
+ U64_PRINTF_ARG(estimated_total_queue_size));
+ }
+ }
+}
+