summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/or/channel.c59
-rw-r--r--src/or/circuitlist.c2
-rw-r--r--src/or/circuitmux.c73
-rw-r--r--src/or/circuitmux.h7
-rw-r--r--src/or/relay.c19
-rw-r--r--src/or/relay.h2
6 files changed, 141 insertions, 21 deletions
diff --git a/src/or/channel.c b/src/or/channel.c
index 4e9086f2e6..e327bda518 100644
--- a/src/or/channel.c
+++ b/src/or/channel.c
@@ -122,6 +122,8 @@ static cell_queue_entry_t *
cell_queue_entry_new_fixed(cell_t *cell);
static cell_queue_entry_t *
cell_queue_entry_new_var(var_cell_t *var_cell);
+static int is_destroy_cell(channel_t *chan,
+ const cell_queue_entry_t *q, circid_t *circid_out);
/* Functions to maintain the digest map */
static void channel_add_to_digest_map(channel_t *chan);
@@ -1685,6 +1687,13 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
chan->timestamp_last_added_nonpadding = approx_time();
}
+ {
+ circid_t circ_id;
+ if (is_destroy_cell(chan, q, &circ_id)) {
+ channel_note_destroy_not_pending(chan, circ_id);
+ }
+ }
+
/* Can we send it right out? If so, try */
if (TOR_SIMPLEQ_EMPTY(&chan->outgoing_queue) &&
chan->state == CHANNEL_STATE_OPEN) {
@@ -2607,6 +2616,43 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
}
}
+/** DOCDOC */
+static int
+is_destroy_cell(channel_t *chan,
+ const cell_queue_entry_t *q, circid_t *circid_out)
+{
+ *circid_out = 0;
+ switch (q->type) {
+ case CELL_QUEUE_FIXED:
+ if (q->u.fixed.cell->command == CELL_DESTROY) {
+ *circid_out = q->u.fixed.cell->circ_id;
+ return 1;
+ }
+ break;
+ case CELL_QUEUE_VAR:
+ if (q->u.var.var_cell->command == CELL_DESTROY) {
+ *circid_out = q->u.var.var_cell->circ_id;
+ return 1;
+ }
+ break;
+ case CELL_QUEUE_PACKED:
+ if (chan->wide_circ_ids) {
+ if (q->u.packed.packed_cell->body[4] == CELL_DESTROY) {
+ *circid_out = ntohl(get_uint32(q->u.packed.packed_cell->body));
+ return 1;
+ }
+ } else {
+ if (q->u.packed.packed_cell->body[2] == CELL_DESTROY) {
+ *circid_out = ntohs(get_uint16(q->u.packed.packed_cell->body));
+ return 1;
+ }
+ }
+ break;
+ }
+ return 0;
+}
+
+
/**
* Send destroy cell on a channel
*
@@ -2618,25 +2664,20 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
int
channel_send_destroy(circid_t circ_id, channel_t *chan, int reason)
{
- cell_t cell;
-
tor_assert(chan);
/* Check to make sure we can send on this channel first */
if (!(chan->state == CHANNEL_STATE_CLOSING ||
chan->state == CHANNEL_STATE_CLOSED ||
- chan->state == CHANNEL_STATE_ERROR)) {
- memset(&cell, 0, sizeof(cell_t));
- cell.circ_id = circ_id;
- cell.command = CELL_DESTROY;
- cell.payload[0] = (uint8_t) reason;
+ chan->state == CHANNEL_STATE_ERROR) &&
+ chan->cmux) {
+ channel_note_destroy_pending(chan, circ_id);
+ circuitmux_append_destroy_cell(chan, chan->cmux, circ_id, reason);
log_debug(LD_OR,
"Sending destroy (circID %u) on channel %p "
"(global ID " U64_FORMAT ")",
(unsigned)circ_id, chan,
U64_PRINTF_ARG(chan->global_identifier));
-
- channel_write_cell(chan, &cell);
} else {
log_warn(LD_BUG,
"Someone called channel_send_destroy() for circID %u "
diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c
index 3e8caa8252..deb45b7b60 100644
--- a/src/or/circuitlist.c
+++ b/src/or/circuitlist.c
@@ -343,7 +343,7 @@ circuit_set_n_circid_chan(circuit_t *circ, circid_t id,
if (circ->n_delete_pending && old_chan) {
channel_mark_circid_unusable(old_chan, old_id);
- circ->n_delete_pending = 1;
+ circ->n_delete_pending = 0;
}
}
diff --git a/src/or/circuitmux.c b/src/or/circuitmux.c
index 545cfd0650..198e518bd4 100644
--- a/src/or/circuitmux.c
+++ b/src/or/circuitmux.c
@@ -10,6 +10,7 @@
#include "channel.h"
#include "circuitlist.h"
#include "circuitmux.h"
+#include "relay.h"
/*
* Private typedefs for circuitmux.c
@@ -115,6 +116,18 @@ struct circuitmux_s {
*/
struct circuit_t *active_circuits_head, *active_circuits_tail;
+ /** List of queued destroy cells */
+ cell_queue_t destroy_cell_queue;
+ /** Boolean: True iff the last cell to circuitmux_get_first_active_circuit
+ * returned the destroy queue. Used to force alternation between
+ * destroy/non-destroy cells.
+ *
+ * XXXX There is no reason to think that alternating is a particularly good
+ * approach -- it's just designed to prevent destroys from starving other
+ * cells completely.
+ */
+ unsigned int last_cell_was_destroy : 1;
+
/*
* Circuitmux policy; if this is non-NULL, it can override the built-
* in round-robin active circuits behavior. This is how EWMA works in
@@ -508,6 +521,8 @@ circuitmux_free(circuitmux_t *cmux)
tor_free(cmux->chanid_circid_map);
}
+ cell_queue_clear(&cmux->destroy_cell_queue);
+
tor_free(cmux);
}
@@ -816,7 +831,7 @@ circuitmux_num_cells(circuitmux_t *cmux)
{
tor_assert(cmux);
- return cmux->n_cells;
+ return cmux->n_cells + cmux->destroy_cell_queue.n;
}
/**
@@ -1368,16 +1383,36 @@ circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ,
/**
* Pick a circuit to send from, using the active circuits list or a
* circuitmux policy if one is available. This is called from channel.c.
+ *
+ * If we would rather send a destroy cell, return NULL and set
+ * *<b>destroy_queue_out</b> to the destroy queue.
+ *
+ * If we have nothing to send, set *<b>destroy_queue_out</b> to NULL and
+ * return NULL.
*/
circuit_t *
-circuitmux_get_first_active_circuit(circuitmux_t *cmux)
+circuitmux_get_first_active_circuit(circuitmux_t *cmux,
+ cell_queue_t **destroy_queue_out)
{
circuit_t *circ = NULL;
tor_assert(cmux);
+ tor_assert(destroy_queue_out);
+
+ *destroy_queue_out = NULL;
+
+ if (cmux->destroy_cell_queue.n &&
+ (!cmux->last_cell_was_destroy || cmux->n_active_circuits == 0)) {
+ /* We have destroy cells to send, and either we just sent a relay cell,
+ * or we have no relay cells to send. */
- if (cmux->n_active_circuits > 0) {
+ /* XXXX We should let the cmux policy have some say in this eventually. */
+ /* XXXX Alternating is not a terribly brilliant approach here. */
+ *destroy_queue_out = &cmux->destroy_cell_queue;
+
+ cmux->last_cell_was_destroy = 1;
+ } else if (cmux->n_active_circuits > 0) {
/* We also must have a cell available for this to be the case */
tor_assert(cmux->n_cells > 0);
/* Do we have a policy-provided circuit selector? */
@@ -1389,7 +1424,11 @@ circuitmux_get_first_active_circuit(circuitmux_t *cmux)
tor_assert(cmux->active_circuits_head);
circ = cmux->active_circuits_head;
}
- } else tor_assert(cmux->n_cells == 0);
+ cmux->last_cell_was_destroy = 0;
+ } else {
+ tor_assert(cmux->n_cells == 0);
+ tor_assert(cmux->destroy_cell_queue.n == 0);
+ }
return circ;
}
@@ -1743,3 +1782,29 @@ circuitmux_assert_okay_pass_three(circuitmux_t *cmux)
}
}
+/*DOCDOC */
+void
+circuitmux_append_destroy_cell(channel_t *chan,
+ circuitmux_t *cmux,
+ circid_t circ_id,
+ uint8_t reason)
+{
+ cell_t cell;
+ memset(&cell, 0, sizeof(cell_t));
+ cell.circ_id = circ_id;
+ cell.command = CELL_DESTROY;
+ cell.payload[0] = (uint8_t) reason;
+
+ cell_queue_append_packed_copy(&cmux->destroy_cell_queue, &cell,
+ chan->wide_circ_ids, 0);
+
+ /* XXXX Duplicate code from append_cell_to_circuit_queue */
+ if (!channel_has_queued_writes(chan)) {
+ /* There is no data at all waiting to be sent on the outbuf. Add a
+ * cell, so that we can notice when it gets flushed, flushed_some can
+ * get called, and we can start putting more data onto the buffer then.
+ */
+ log_debug(LD_GENERAL, "Primed a buffer.");
+ channel_flush_from_first_active_circuit(chan, 1);
+ }
+}
diff --git a/src/or/circuitmux.h b/src/or/circuitmux.h
index 25644ffab7..da62196b21 100644
--- a/src/or/circuitmux.h
+++ b/src/or/circuitmux.h
@@ -120,7 +120,8 @@ unsigned int circuitmux_num_circuits(circuitmux_t *cmux);
unsigned int circuitmux_num_active_circuits(circuitmux_t *cmux);
/* Channel interface */
-circuit_t * circuitmux_get_first_active_circuit(circuitmux_t *cmux);
+circuit_t * circuitmux_get_first_active_circuit(circuitmux_t *cmux,
+ cell_queue_t **destroy_queue_out);
void circuitmux_notify_xmit_cells(circuitmux_t *cmux, circuit_t *circ,
unsigned int n_cells);
@@ -132,5 +133,9 @@ void circuitmux_clear_num_cells(circuitmux_t *cmux, circuit_t *circ);
void circuitmux_set_num_cells(circuitmux_t *cmux, circuit_t *circ,
unsigned int n_cells);
+void circuitmux_append_destroy_cell(channel_t *chan,
+ circuitmux_t *cmux, circid_t circ_id,
+ uint8_t reason);
+
#endif /* TOR_CIRCUITMUX_H */
diff --git a/src/or/relay.c b/src/or/relay.c
index 0ca3e56fd5..ec860269a6 100644
--- a/src/or/relay.c
+++ b/src/or/relay.c
@@ -2140,11 +2140,11 @@ cell_queue_append(cell_queue_t *queue, packed_cell_t *cell)
/** Append a newly allocated copy of <b>cell</b> to the end of <b>queue</b> */
void
cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell,
- int wide_circ_ids)
+ int wide_circ_ids, int use_stats)
{
packed_cell_t *copy = packed_cell_copy(cell, wide_circ_ids);
/* Remember the time when this cell was put in the queue. */
- if (get_options()->CellStatistics) {
+ if (get_options()->CellStatistics && use_stats) {
struct timeval now;
uint32_t added;
insertion_time_queue_t *it_queue = queue->insertion_times;
@@ -2339,7 +2339,7 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
{
circuitmux_t *cmux = NULL;
int n_flushed = 0;
- cell_queue_t *queue;
+ cell_queue_t *queue, *destroy_queue=NULL;
circuit_t *circ;
or_circuit_t *or_circ;
int streams_blocked;
@@ -2352,7 +2352,16 @@ channel_flush_from_first_active_circuit(channel_t *chan, int max)
/* Main loop: pick a circuit, send a cell, update the cmux */
while (n_flushed < max) {
- circ = circuitmux_get_first_active_circuit(cmux);
+ circ = circuitmux_get_first_active_circuit(cmux, &destroy_queue);
+ if (destroy_queue) {
+ /* this code is duplicated from some of the logic below. Ugly! XXXX */
+ tor_assert(destroy_queue->n > 0);
+ cell = cell_queue_pop(destroy_queue);
+ channel_write_packed_cell(chan, cell);
+ cell = NULL;
+ ++n_flushed;
+ continue;
+ }
/* If it returns NULL, no cells left to send */
if (!circ) break;
assert_cmux_ok_paranoid(chan);
@@ -2474,7 +2483,7 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
streams_blocked = circ->streams_blocked_on_p_chan;
}
- cell_queue_append_packed_copy(queue, cell, chan->wide_circ_ids);
+ cell_queue_append_packed_copy(queue, cell, chan->wide_circ_ids, 1);
/* If we have too many cells on the circuit, we should stop reading from
* the edge streams for a while. */
diff --git a/src/or/relay.h b/src/or/relay.h
index 7e59838f95..c4cb935bcc 100644
--- a/src/or/relay.h
+++ b/src/or/relay.h
@@ -47,7 +47,7 @@ void packed_cell_free(packed_cell_t *cell);
void cell_queue_clear(cell_queue_t *queue);
void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell);
void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell,
- int wide_circ_ids);
+ int wide_circ_ids, int use_stats);
void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
cell_t *cell, cell_direction_t direction,