diff options
Diffstat (limited to 'src/or/relay.c')
-rw-r--r-- | src/or/relay.c | 774 |
1 files changed, 225 insertions, 549 deletions
diff --git a/src/or/relay.c b/src/or/relay.c index 5f7fcd8b7c..bd99d91dca 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -10,10 +10,10 @@ * receiving from circuits, plus queuing on circuits. **/ -#include <math.h> #define RELAY_PRIVATE #include "or.h" #include "buffers.h" +#include "channel.h" #include "circuitbuild.h" #include "circuitlist.h" #include "config.h" @@ -166,7 +166,7 @@ int circuit_receive_relay_cell(cell_t *cell, circuit_t *circ, cell_direction_t cell_direction) { - or_connection_t *or_conn=NULL; + channel_t *chan = NULL; crypt_path_t *layer_hint=NULL; char recognized=0; int reason; @@ -213,24 +213,24 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ, /* not recognized. pass it on. */ if (cell_direction == CELL_DIRECTION_OUT) { cell->circ_id = circ->n_circ_id; /* switch it */ - or_conn = circ->n_conn; + chan = circ->n_chan; } else if (! CIRCUIT_IS_ORIGIN(circ)) { cell->circ_id = TO_OR_CIRCUIT(circ)->p_circ_id; /* switch it */ - or_conn = TO_OR_CIRCUIT(circ)->p_conn; + chan = TO_OR_CIRCUIT(circ)->p_chan; } else { log_fn(LOG_PROTOCOL_WARN, LD_OR, "Dropping unrecognized inbound cell on origin circuit."); return 0; } - if (!or_conn) { + if (!chan) { // XXXX Can this splice stuff be done more cleanly? if (! CIRCUIT_IS_ORIGIN(circ) && TO_OR_CIRCUIT(circ)->rend_splice && cell_direction == CELL_DIRECTION_OUT) { or_circuit_t *splice = TO_OR_CIRCUIT(circ)->rend_splice; tor_assert(circ->purpose == CIRCUIT_PURPOSE_REND_ESTABLISHED); - tor_assert(splice->_base.purpose == CIRCUIT_PURPOSE_REND_ESTABLISHED); + tor_assert(splice->base_.purpose == CIRCUIT_PURPOSE_REND_ESTABLISHED); cell->circ_id = splice->p_circ_id; cell->command = CELL_RELAY; /* can't be relay_early anyway */ if ((reason = circuit_receive_relay_cell(cell, TO_CIRCUIT(splice), @@ -254,7 +254,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ, * we might kill the circ before we relay * the cells. */ - append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0); + append_cell_to_circuit_queue(circ, chan, cell, cell_direction, 0); return 0; } @@ -353,13 +353,13 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ, cell_direction_t cell_direction, crypt_path_t *layer_hint, streamid_t on_stream) { - or_connection_t *conn; /* where to send the cell */ + channel_t *chan; /* where to send the cell */ if (cell_direction == CELL_DIRECTION_OUT) { crypt_path_t *thishop; /* counter for repeated crypts */ - conn = circ->n_conn; - if (!CIRCUIT_IS_ORIGIN(circ) || !conn) { - log_warn(LD_BUG,"outgoing relay cell has n_conn==NULL. Dropping."); + chan = circ->n_chan; + if (!CIRCUIT_IS_ORIGIN(circ) || !chan) { + log_warn(LD_BUG,"outgoing relay cell has n_chan==NULL. Dropping."); return 0; /* just drop it */ } @@ -388,14 +388,14 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ, return 0; /* just drop it */ } or_circ = TO_OR_CIRCUIT(circ); - conn = or_circ->p_conn; + chan = or_circ->p_chan; relay_set_digest(or_circ->p_digest, cell); if (relay_crypt_one_payload(or_circ->p_crypto, cell->payload, 1) < 0) return -1; } ++stats_n_relay_cells_relayed; - append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream); + append_cell_to_circuit_queue(circ, chan, cell, cell_direction, on_stream); return 0; } @@ -422,7 +422,7 @@ relay_lookup_conn(circuit_t *circ, cell_t *cell, for (tmpconn = TO_ORIGIN_CIRCUIT(circ)->p_streams; tmpconn; tmpconn=tmpconn->next_stream) { if (rh.stream_id == tmpconn->stream_id && - !tmpconn->_base.marked_for_close && + !tmpconn->base_.marked_for_close && tmpconn->cpath_layer == layer_hint) { log_debug(LD_APP,"found conn for stream %d.", rh.stream_id); return tmpconn; @@ -432,7 +432,7 @@ relay_lookup_conn(circuit_t *circ, cell_t *cell, for (tmpconn = TO_OR_CIRCUIT(circ)->n_streams; tmpconn; tmpconn=tmpconn->next_stream) { if (rh.stream_id == tmpconn->stream_id && - !tmpconn->_base.marked_for_close) { + !tmpconn->base_.marked_for_close) { log_debug(LD_EXIT,"found conn for stream %d.", rh.stream_id); if (cell_direction == CELL_DIRECTION_OUT || connection_edge_is_rendezvous_stream(tmpconn)) @@ -442,7 +442,7 @@ relay_lookup_conn(circuit_t *circ, cell_t *cell, for (tmpconn = TO_OR_CIRCUIT(circ)->resolving_streams; tmpconn; tmpconn=tmpconn->next_stream) { if (rh.stream_id == tmpconn->stream_id && - !tmpconn->_base.marked_for_close) { + !tmpconn->base_.marked_for_close) { log_debug(LD_EXIT,"found conn for stream %d.", rh.stream_id); return tmpconn; } @@ -561,9 +561,9 @@ relay_send_command_from_edge(streamid_t stream_id, circuit_t *circ, geoip_change_dirreq_state(circ->dirreq_id, DIRREQ_TUNNELED, DIRREQ_END_CELL_SENT); - if (cell_direction == CELL_DIRECTION_OUT && circ->n_conn) { + if (cell_direction == CELL_DIRECTION_OUT && circ->n_chan) { /* if we're using relaybandwidthrate, this conn wants priority */ - circ->n_conn->client_used = approx_time(); + channel_timestamp_client(circ->n_chan); } if (cell_direction == CELL_DIRECTION_OUT) { @@ -631,16 +631,16 @@ connection_edge_send_command(edge_connection_t *fromconn, tor_assert(fromconn); circ = fromconn->on_circuit; - if (fromconn->_base.marked_for_close) { + if (fromconn->base_.marked_for_close) { log_warn(LD_BUG, "called on conn that's already marked for close at %s:%d.", - fromconn->_base.marked_for_close_file, - fromconn->_base.marked_for_close); + fromconn->base_.marked_for_close_file, + fromconn->base_.marked_for_close); return 0; } if (!circ) { - if (fromconn->_base.type == CONN_TYPE_AP) { + if (fromconn->base_.type == CONN_TYPE_AP) { log_info(LD_APP,"no circ. Closing conn."); connection_mark_unattached_ap(EDGE_TO_ENTRY_CONN(fromconn), END_STREAM_REASON_INTERNAL); @@ -777,8 +777,8 @@ connection_ap_process_end_not_open( circuit_log_path(LOG_INFO,LD_APP,circ); /* Mark this circuit "unusable for new streams". */ /* XXXX024 this is a kludgy way to do this. */ - tor_assert(circ->_base.timestamp_dirty); - circ->_base.timestamp_dirty -= get_options()->MaxCircuitDirtiness; + tor_assert(circ->base_.timestamp_dirty); + circ->base_.timestamp_dirty -= get_options()->MaxCircuitDirtiness; if (conn->chosen_exit_optional) { /* stop wanting a specific exit */ @@ -854,7 +854,7 @@ connection_edge_process_relay_cell_not_open( edge_connection_t *conn, crypt_path_t *layer_hint) { if (rh->command == RELAY_COMMAND_END) { - if (CIRCUIT_IS_ORIGIN(circ) && conn->_base.type == CONN_TYPE_AP) { + if (CIRCUIT_IS_ORIGIN(circ) && conn->base_.type == CONN_TYPE_AP) { return connection_ap_process_end_not_open(rh, cell, TO_ORIGIN_CIRCUIT(circ), EDGE_TO_ENTRY_CONN(conn), @@ -869,18 +869,18 @@ connection_edge_process_relay_cell_not_open( } } - if (conn->_base.type == CONN_TYPE_AP && + if (conn->base_.type == CONN_TYPE_AP && rh->command == RELAY_COMMAND_CONNECTED) { entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn); tor_assert(CIRCUIT_IS_ORIGIN(circ)); - if (conn->_base.state != AP_CONN_STATE_CONNECT_WAIT) { + if (conn->base_.state != AP_CONN_STATE_CONNECT_WAIT) { log_fn(LOG_PROTOCOL_WARN, LD_APP, "Got 'connected' while not in state connect_wait. Dropping."); return 0; } - conn->_base.state = AP_CONN_STATE_OPEN; + conn->base_.state = AP_CONN_STATE_OPEN; log_info(LD_APP,"'connected' received after %d seconds.", - (int)(time(NULL) - conn->_base.timestamp_lastread)); + (int)(time(NULL) - conn->base_.timestamp_lastread)); if (rh->length >= 4) { uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE)); int ttl; @@ -944,13 +944,13 @@ connection_edge_process_relay_cell_not_open( } return 0; } - if (conn->_base.type == CONN_TYPE_AP && + if (conn->base_.type == CONN_TYPE_AP && rh->command == RELAY_COMMAND_RESOLVED) { int ttl; int answer_len; uint8_t answer_type; entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn); - if (conn->_base.state != AP_CONN_STATE_RESOLVE_WAIT) { + if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) { log_fn(LOG_PROTOCOL_WARN, LD_APP, "Got a 'resolved' cell while " "not in state resolve_wait. Dropping."); return 0; @@ -1001,8 +1001,8 @@ connection_edge_process_relay_cell_not_open( log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, "Got an unexpected relay command %d, in state %d (%s). Dropping.", - rh->command, conn->_base.state, - conn_state_to_string(conn->_base.type, conn->_base.state)); + rh->command, conn->base_.state, + conn_state_to_string(conn->base_.type, conn->base_.state)); return 0; /* for forward compatibility, don't kill the circuit */ // connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL); // connection_mark_for_close(conn); @@ -1050,9 +1050,9 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, * conn points to the recognized stream. */ if (conn && !connection_state_is_open(TO_CONN(conn))) { - if (conn->_base.type == CONN_TYPE_EXIT && - (conn->_base.state == EXIT_CONN_STATE_CONNECTING || - conn->_base.state == EXIT_CONN_STATE_RESOLVING) && + if (conn->base_.type == CONN_TYPE_EXIT && + (conn->base_.state == EXIT_CONN_STATE_CONNECTING || + conn->base_.state == EXIT_CONN_STATE_RESOLVING) && rh.command == RELAY_COMMAND_DATA) { /* Allow DATA cells to be delivered to an exit node in state * EXIT_CONN_STATE_CONNECTING or EXIT_CONN_STATE_RESOLVING. @@ -1095,7 +1095,7 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, * and linked. */ static uint64_t next_id = 0; circ->dirreq_id = ++next_id; - TO_CONN(TO_OR_CIRCUIT(circ)->p_conn)->dirreq_id = circ->dirreq_id; + TO_OR_CIRCUIT(circ)->p_chan->dirreq_id = circ->dirreq_id; } return connection_exit_begin_conn(cell, circ); @@ -1152,10 +1152,10 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, } /* XXX add to this log_fn the exit node's nickname? */ log_info(domain,"%d: end cell (%s) for stream %d. Removing stream.", - conn->_base.s, + conn->base_.s, stream_end_reason_to_string(reason), conn->stream_id); - if (conn->_base.type == CONN_TYPE_AP) { + if (conn->base_.type == CONN_TYPE_AP) { entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn); if (entry_conn->socks_request && !entry_conn->socks_request->has_finished) @@ -1166,7 +1166,7 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, conn->edge_has_sent_end = 1; if (!conn->end_reason) conn->end_reason = reason | END_STREAM_REASON_FLAG_REMOTE; - if (!conn->_base.marked_for_close) { + if (!conn->base_.marked_for_close) { /* only mark it if not already marked. it's possible to * get the 'end' right around when the client hangs up on us. */ connection_mark_and_flush(TO_CONN(conn)); @@ -1175,7 +1175,7 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, case RELAY_COMMAND_EXTEND: { static uint64_t total_n_extend=0, total_nonearly=0; total_n_extend++; - if (conn) { + if (rh.stream_id) { log_fn(LOG_PROTOCOL_WARN, domain, "'extend' cell received for non-zero stream. Dropping."); return 0; @@ -1230,12 +1230,12 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, "'truncate' unsupported at origin. Dropping."); return 0; } - if (circ->n_conn) { - uint8_t trunc_reason = *(uint8_t*)(cell->payload + RELAY_HEADER_SIZE); - circuit_clear_cell_queue(circ, circ->n_conn); - connection_or_send_destroy(circ->n_circ_id, circ->n_conn, - trunc_reason); - circuit_set_n_circid_orconn(circ, 0, NULL); + if (circ->n_chan) { + uint8_t trunc_reason = get_uint8(cell->payload + RELAY_HEADER_SIZE); + circuit_clear_cell_queue(circ, circ->n_chan); + channel_send_destroy(circ->n_circ_id, circ->n_chan, + trunc_reason); + circuit_set_n_circid_chan(circ, 0, NULL); } log_debug(LD_EXIT, "Processed 'truncate', replying."); { @@ -1251,7 +1251,8 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, "'truncated' unsupported at non-origin. Dropping."); return 0; } - circuit_truncated(TO_ORIGIN_CIRCUIT(circ), layer_hint); + circuit_truncated(TO_ORIGIN_CIRCUIT(circ), layer_hint, + get_uint8(cell->payload + RELAY_HEADER_SIZE)); return 0; case RELAY_COMMAND_CONNECTED: if (conn) { @@ -1267,7 +1268,8 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, if (layer_hint) { if (layer_hint->package_window + CIRCWINDOW_INCREMENT > CIRCWINDOW_START_MAX) { - log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, + /*XXXX024: Downgrade this back to LOG_PROTOCOL_WARN after a while*/ + log_fn(LOG_WARN, LD_PROTOCOL, "Bug/attack: unexpected sendme cell from exit relay. " "Closing circ."); return -END_CIRC_REASON_TORPROTOCOL; @@ -1279,7 +1281,8 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, } else { if (circ->package_window + CIRCWINDOW_INCREMENT > CIRCWINDOW_START_MAX) { - log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL, + /*XXXX024: Downgrade this back to LOG_PROTOCOL_WARN after a while*/ + log_fn(LOG_WARN, LD_PROTOCOL, "Bug/attack: unexpected sendme cell from client. " "Closing circ."); return -END_CIRC_REASON_TORPROTOCOL; @@ -1389,21 +1392,21 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, size_t bytes_to_process, length; char payload[CELL_PAYLOAD_SIZE]; circuit_t *circ; - const unsigned domain = conn->_base.type == CONN_TYPE_AP ? LD_APP : LD_EXIT; + const unsigned domain = conn->base_.type == CONN_TYPE_AP ? LD_APP : LD_EXIT; int sending_from_optimistic = 0; const int sending_optimistically = - conn->_base.type == CONN_TYPE_AP && - conn->_base.state != AP_CONN_STATE_OPEN; + conn->base_.type == CONN_TYPE_AP && + conn->base_.state != AP_CONN_STATE_OPEN; entry_connection_t *entry_conn = - conn->_base.type == CONN_TYPE_AP ? EDGE_TO_ENTRY_CONN(conn) : NULL; + conn->base_.type == CONN_TYPE_AP ? EDGE_TO_ENTRY_CONN(conn) : NULL; crypt_path_t *cpath_layer = conn->cpath_layer; tor_assert(conn); - if (conn->_base.marked_for_close) { + if (conn->base_.marked_for_close) { log_warn(LD_BUG, "called on conn that's already marked for close at %s:%d.", - conn->_base.marked_for_close_file, conn->_base.marked_for_close); + conn->base_.marked_for_close_file, conn->base_.marked_for_close); return 0; } @@ -1470,7 +1473,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, connection_fetch_from_buf(payload, length, TO_CONN(conn)); } - log_debug(domain,"(%d) Packaging %d bytes (%d waiting).", conn->_base.s, + log_debug(domain,"(%d) Packaging %d bytes (%d waiting).", conn->base_.s, (int)length, (int)connection_get_inbuf_len(TO_CONN(conn))); if (sending_optimistically && !sending_from_optimistic) { @@ -1536,9 +1539,9 @@ connection_edge_consider_sending_sendme(edge_connection_t *conn) } while (conn->deliver_window <= STREAMWINDOW_START - STREAMWINDOW_INCREMENT) { - log_debug(conn->_base.type == CONN_TYPE_AP ?LD_APP:LD_EXIT, + log_debug(conn->base_.type == CONN_TYPE_AP ?LD_APP:LD_EXIT, "Outbuf %d, Queuing stream sendme.", - (int)conn->_base.outbuf_flushlen); + (int)conn->base_.outbuf_flushlen); conn->deliver_window += STREAMWINDOW_INCREMENT; if (connection_edge_send_command(conn, RELAY_COMMAND_SENDME, NULL, 0) < 0) { @@ -1591,10 +1594,10 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, * needed to fill the cell queue. */ int max_to_package = circ->package_window; if (CIRCUIT_IS_ORIGIN(circ)) { - cells_on_queue = circ->n_conn_cells.n; + cells_on_queue = circ->n_chan_cells.n; } else { or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); - cells_on_queue = or_circ->p_conn_cells.n; + cells_on_queue = or_circ->p_chan_cells.n; } if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package) max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue; @@ -1627,7 +1630,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, /* Activate reading starting from the chosen stream */ for (conn=chosen_stream; conn; conn = conn->next_stream) { /* Start reading for the streams starting from here */ - if (conn->_base.marked_for_close || conn->package_window <= 0) + if (conn->base_.marked_for_close || conn->package_window <= 0) continue; if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); @@ -1638,7 +1641,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, } /* Go back and do the ones we skipped, circular-style */ for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) { - if (conn->_base.marked_for_close || conn->package_window <= 0) + if (conn->base_.marked_for_close || conn->package_window <= 0) continue; if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); @@ -1664,7 +1667,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, * package. */ for (conn=first_conn; conn; conn=conn->next_stream) { - if (conn->_base.marked_for_close || conn->package_window <= 0) + if (conn->base_.marked_for_close || conn->package_window <= 0) continue; if (!layer_hint || conn->cpath_layer == layer_hint) { int n = cells_per_conn, r; @@ -1775,10 +1778,10 @@ circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint) } #ifdef ACTIVE_CIRCUITS_PARANOIA -#define assert_active_circuits_ok_paranoid(conn) \ - assert_active_circuits_ok(conn) +#define assert_cmux_ok_paranoid(chan) \ + assert_circuit_mux_okay(chan) #else -#define assert_active_circuits_ok_paranoid(conn) +#define assert_cmux_ok_paranoid(chan) #endif /** The total number of cells we have allocated from the memory pool. */ @@ -1833,12 +1836,19 @@ packed_cell_free_unchecked(packed_cell_t *cell) /** Allocate and return a new packed_cell_t. */ static INLINE packed_cell_t * -packed_cell_alloc(void) +packed_cell_new(void) { ++total_cells_allocated; return mp_pool_get(cell_pool); } +/** Return a packed cell used outside by channel_t lower layer */ +void +packed_cell_free(packed_cell_t *cell) +{ + packed_cell_free_unchecked(cell); +} + /** Log current statistics for cell pool allocation at log level * <b>severity</b>. */ void @@ -1847,10 +1857,10 @@ dump_cell_pool_usage(int severity) circuit_t *c; int n_circs = 0; int n_cells = 0; - for (c = _circuit_get_global_list(); c; c = c->next) { - n_cells += c->n_conn_cells.n; + for (c = circuit_get_global_list_(); c; c = c->next) { + n_cells += c->n_chan_cells.n; if (!CIRCUIT_IS_ORIGIN(c)) - n_cells += TO_OR_CIRCUIT(c)->p_conn_cells.n; + n_cells += TO_OR_CIRCUIT(c)->p_chan_cells.n; ++n_circs; } log(severity, LD_MM, "%d cells allocated on %d circuits. %d cells leaked.", @@ -1862,7 +1872,7 @@ dump_cell_pool_usage(int severity) static INLINE packed_cell_t * packed_cell_copy(const cell_t *cell) { - packed_cell_t *c = packed_cell_alloc(); + packed_cell_t *c = packed_cell_new(); cell_pack(c, cell); c->next = NULL; return c; @@ -1961,363 +1971,63 @@ cell_queue_pop(cell_queue_t *queue) return cell; } -/** Return a pointer to the "next_active_on_{n,p}_conn" pointer of <b>circ</b>, - * depending on whether <b>conn</b> matches n_conn or p_conn. */ -static INLINE circuit_t ** -next_circ_on_conn_p(circuit_t *circ, or_connection_t *conn) -{ - tor_assert(circ); - tor_assert(conn); - if (conn == circ->n_conn) { - return &circ->next_active_on_n_conn; - } else { - or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); - tor_assert(conn == orcirc->p_conn); - return &orcirc->next_active_on_p_conn; - } -} - -/** Return a pointer to the "prev_active_on_{n,p}_conn" pointer of <b>circ</b>, - * depending on whether <b>conn</b> matches n_conn or p_conn. */ -static INLINE circuit_t ** -prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn) -{ - tor_assert(circ); - tor_assert(conn); - if (conn == circ->n_conn) { - return &circ->prev_active_on_n_conn; - } else { - or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); - tor_assert(conn == orcirc->p_conn); - return &orcirc->prev_active_on_p_conn; - } -} - -/** Helper for sorting cell_ewma_t values in their priority queue. */ -static int -compare_cell_ewma_counts(const void *p1, const void *p2) -{ - const cell_ewma_t *e1=p1, *e2=p2; - if (e1->cell_count < e2->cell_count) - return -1; - else if (e1->cell_count > e2->cell_count) - return 1; - else - return 0; -} - -/** Given a cell_ewma_t, return a pointer to the circuit containing it. */ -static circuit_t * -cell_ewma_to_circuit(cell_ewma_t *ewma) -{ - if (ewma->is_for_p_conn) { - /* This is an or_circuit_t's p_cell_ewma. */ - or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma); - return TO_CIRCUIT(orcirc); - } else { - /* This is some circuit's n_cell_ewma. */ - return SUBTYPE_P(ewma, circuit_t, n_cell_ewma); - } -} - -/* ==== Functions for scaling cell_ewma_t ==== - - When choosing which cells to relay first, we favor circuits that have been - quiet recently. This gives better latency on connections that aren't - pushing lots of data, and makes the network feel more interactive. - - Conceptually, we take an exponentially weighted mean average of the number - of cells a circuit has sent, and allow active circuits (those with cells to - relay) to send cells in reverse order of their exponentially-weighted mean - average (EWMA) cell count. [That is, a cell sent N seconds ago 'counts' - F^N times as much as a cell sent now, for 0<F<1.0, and we favor the - circuit that has sent the fewest cells] - - If 'double' had infinite precision, we could do this simply by counting a - cell sent at startup as having weight 1.0, and a cell sent N seconds later - as having weight F^-N. This way, we would never need to re-scale - any already-sent cells. - - To prevent double from overflowing, we could count a cell sent now as - having weight 1.0 and a cell sent N seconds ago as having weight F^N. - This, however, would mean we'd need to re-scale *ALL* old circuits every - time we wanted to send a cell. - - So as a compromise, we divide time into 'ticks' (currently, 10-second - increments) and say that a cell sent at the start of a current tick is - worth 1.0, a cell sent N seconds before the start of the current tick is - worth F^N, and a cell sent N seconds after the start of the current tick is - worth F^-N. This way we don't overflow, and we don't need to constantly - rescale. - */ - -/** How long does a tick last (seconds)? */ -#define EWMA_TICK_LEN 10 - -/** The default per-tick scale factor, if it hasn't been overridden by a - * consensus or a configuration setting. zero means "disabled". */ -#define EWMA_DEFAULT_HALFLIFE 0.0 - -/** Given a timeval <b>now</b>, compute the cell_ewma tick in which it occurs - * and the fraction of the tick that has elapsed between the start of the tick - * and <b>now</b>. Return the former and store the latter in - * *<b>remainder_out</b>. - * - * These tick values are not meant to be shared between Tor instances, or used - * for other purposes. */ -static unsigned -cell_ewma_tick_from_timeval(const struct timeval *now, - double *remainder_out) -{ - unsigned res = (unsigned) (now->tv_sec / EWMA_TICK_LEN); - /* rem */ - double rem = (now->tv_sec % EWMA_TICK_LEN) + - ((double)(now->tv_usec)) / 1.0e6; - *remainder_out = rem / EWMA_TICK_LEN; - return res; -} - -/** Compute and return the current cell_ewma tick. */ -unsigned -cell_ewma_get_tick(void) -{ - return ((unsigned)approx_time() / EWMA_TICK_LEN); -} - -/** The per-tick scale factor to be used when computing cell-count EWMA - * values. (A cell sent N ticks before the start of the current tick - * has value ewma_scale_factor ** N.) +/** + * Update the number of cells available on the circuit's n_chan or p_chan's + * circuit mux. */ -static double ewma_scale_factor = 0.1; -/* DOCDOC ewma_enabled */ -static int ewma_enabled = 0; - -/*DOCDOC*/ -#define EPSILON 0.00001 -/*DOCDOC*/ -#define LOG_ONEHALF -0.69314718055994529 - -/** Adjust the global cell scale factor based on <b>options</b> */ -void -cell_ewma_set_scale_factor(const or_options_t *options, - const networkstatus_t *consensus) -{ - int32_t halflife_ms; - double halflife; - const char *source; - if (options && options->CircuitPriorityHalflife >= -EPSILON) { - halflife = options->CircuitPriorityHalflife; - source = "CircuitPriorityHalflife in configuration"; - } else if (consensus && (halflife_ms = networkstatus_get_param( - consensus, "CircuitPriorityHalflifeMsec", - -1, -1, INT32_MAX)) >= 0) { - halflife = ((double)halflife_ms)/1000.0; - source = "CircuitPriorityHalflifeMsec in consensus"; - } else { - halflife = EWMA_DEFAULT_HALFLIFE; - source = "Default value"; - } - - if (halflife <= EPSILON) { - /* The cell EWMA algorithm is disabled. */ - ewma_scale_factor = 0.1; - ewma_enabled = 0; - log_info(LD_OR, - "Disabled cell_ewma algorithm because of value in %s", - source); - } else { - /* convert halflife into halflife-per-tick. */ - halflife /= EWMA_TICK_LEN; - /* compute per-tick scale factor. */ - ewma_scale_factor = exp( LOG_ONEHALF / halflife ); - ewma_enabled = 1; - log_info(LD_OR, - "Enabled cell_ewma algorithm because of value in %s; " - "scale factor is %f per %d seconds", - source, ewma_scale_factor, EWMA_TICK_LEN); - } -} - -/** Return the multiplier necessary to convert the value of a cell sent in - * 'from_tick' to one sent in 'to_tick'. */ -static INLINE double -get_scale_factor(unsigned from_tick, unsigned to_tick) -{ - /* This math can wrap around, but that's okay: unsigned overflow is - well-defined */ - int diff = (int)(to_tick - from_tick); - return pow(ewma_scale_factor, diff); -} - -/** Adjust the cell count of <b>ewma</b> so that it is scaled with respect to - * <b>cur_tick</b> */ -static void -scale_single_cell_ewma(cell_ewma_t *ewma, unsigned cur_tick) -{ - double factor = get_scale_factor(ewma->last_adjusted_tick, cur_tick); - ewma->cell_count *= factor; - ewma->last_adjusted_tick = cur_tick; -} - -/** Adjust the cell count of every active circuit on <b>conn</b> so - * that they are scaled with respect to <b>cur_tick</b> */ -static void -scale_active_circuits(or_connection_t *conn, unsigned cur_tick) -{ - - double factor = get_scale_factor( - conn->active_circuit_pqueue_last_recalibrated, - cur_tick); - /** Ordinarily it isn't okay to change the value of an element in a heap, - * but it's okay here, since we are preserving the order. */ - SMARTLIST_FOREACH(conn->active_circuit_pqueue, cell_ewma_t *, e, { - tor_assert(e->last_adjusted_tick == - conn->active_circuit_pqueue_last_recalibrated); - e->cell_count *= factor; - e->last_adjusted_tick = cur_tick; - }); - conn->active_circuit_pqueue_last_recalibrated = cur_tick; -} - -/** Rescale <b>ewma</b> to the same scale as <b>conn</b>, and add it to - * <b>conn</b>'s priority queue of active circuits */ -static void -add_cell_ewma_to_conn(or_connection_t *conn, cell_ewma_t *ewma) -{ - tor_assert(ewma->heap_index == -1); - scale_single_cell_ewma(ewma, - conn->active_circuit_pqueue_last_recalibrated); - - smartlist_pqueue_add(conn->active_circuit_pqueue, - compare_cell_ewma_counts, - STRUCT_OFFSET(cell_ewma_t, heap_index), - ewma); -} - -/** Remove <b>ewma</b> from <b>conn</b>'s priority queue of active circuits */ -static void -remove_cell_ewma_from_conn(or_connection_t *conn, cell_ewma_t *ewma) -{ - tor_assert(ewma->heap_index != -1); - smartlist_pqueue_remove(conn->active_circuit_pqueue, - compare_cell_ewma_counts, - STRUCT_OFFSET(cell_ewma_t, heap_index), - ewma); -} - -/** Remove and return the first cell_ewma_t from conn's priority queue of - * active circuits. Requires that the priority queue is nonempty. */ -static cell_ewma_t * -pop_first_cell_ewma_from_conn(or_connection_t *conn) -{ - return smartlist_pqueue_pop(conn->active_circuit_pqueue, - compare_cell_ewma_counts, - STRUCT_OFFSET(cell_ewma_t, heap_index)); -} - -/** Add <b>circ</b> to the list of circuits with pending cells on - * <b>conn</b>. No effect if <b>circ</b> is already linked. */ void -make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn) +update_circuit_on_cmux(circuit_t *circ, cell_direction_t direction) { - circuit_t **nextp = next_circ_on_conn_p(circ, conn); - circuit_t **prevp = prev_circ_on_conn_p(circ, conn); - - if (*nextp && *prevp) { - /* Already active. */ - return; - } - - assert_active_circuits_ok_paranoid(conn); + channel_t *chan = NULL; + or_circuit_t *or_circ = NULL; + circuitmux_t *cmux = NULL; - if (! conn->active_circuits) { - conn->active_circuits = circ; - *prevp = *nextp = circ; - } else { - circuit_t *head = conn->active_circuits; - circuit_t *old_tail = *prev_circ_on_conn_p(head, conn); - *next_circ_on_conn_p(old_tail, conn) = circ; - *nextp = head; - *prev_circ_on_conn_p(head, conn) = circ; - *prevp = old_tail; - } + tor_assert(circ); - if (circ->n_conn == conn) { - add_cell_ewma_to_conn(conn, &circ->n_cell_ewma); + /* Okay, get the channel */ + if (direction == CELL_DIRECTION_OUT) { + chan = circ->n_chan; } else { - or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); - tor_assert(conn == orcirc->p_conn); - add_cell_ewma_to_conn(conn, &orcirc->p_cell_ewma); + or_circ = TO_OR_CIRCUIT(circ); + chan = or_circ->p_chan; } - assert_active_circuits_ok_paranoid(conn); -} + tor_assert(chan); + tor_assert(chan->cmux); -/** Remove <b>circ</b> from the list of circuits with pending cells on - * <b>conn</b>. No effect if <b>circ</b> is already unlinked. */ -void -make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn) -{ - circuit_t **nextp = next_circ_on_conn_p(circ, conn); - circuit_t **prevp = prev_circ_on_conn_p(circ, conn); - circuit_t *next = *nextp, *prev = *prevp; + /* Now get the cmux */ + cmux = chan->cmux; - if (!next && !prev) { - /* Already inactive. */ - return; - } - - assert_active_circuits_ok_paranoid(conn); - - tor_assert(next && prev); - tor_assert(*prev_circ_on_conn_p(next, conn) == circ); - tor_assert(*next_circ_on_conn_p(prev, conn) == circ); + /* Cmux sanity check */ + tor_assert(circuitmux_is_circuit_attached(cmux, circ)); + tor_assert(circuitmux_attached_circuit_direction(cmux, circ) == direction); - if (next == circ) { - conn->active_circuits = NULL; - } else { - *prev_circ_on_conn_p(next, conn) = prev; - *next_circ_on_conn_p(prev, conn) = next; - if (conn->active_circuits == circ) - conn->active_circuits = next; - } - *prevp = *nextp = NULL; + assert_cmux_ok_paranoid(chan); - if (circ->n_conn == conn) { - remove_cell_ewma_from_conn(conn, &circ->n_cell_ewma); + /* Update the number of cells we have for the circuit mux */ + if (direction == CELL_DIRECTION_OUT) { + circuitmux_set_num_cells(cmux, circ, circ->n_chan_cells.n); } else { - or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); - tor_assert(conn == orcirc->p_conn); - remove_cell_ewma_from_conn(conn, &orcirc->p_cell_ewma); + circuitmux_set_num_cells(cmux, circ, or_circ->p_chan_cells.n); } - assert_active_circuits_ok_paranoid(conn); + assert_cmux_ok_paranoid(chan); } -/** Remove all circuits from the list of circuits with pending cells on - * <b>conn</b>. */ +/** Remove all circuits from the cmux on <b>chan</b>. */ void -connection_or_unlink_all_active_circs(or_connection_t *orconn) +channel_unlink_all_circuits(channel_t *chan) { - circuit_t *head = orconn->active_circuits; - circuit_t *cur = head; - if (! head) - return; - do { - circuit_t *next = *next_circ_on_conn_p(cur, orconn); - *prev_circ_on_conn_p(cur, orconn) = NULL; - *next_circ_on_conn_p(cur, orconn) = NULL; - cur = next; - } while (cur != head); - orconn->active_circuits = NULL; - - SMARTLIST_FOREACH(orconn->active_circuit_pqueue, cell_ewma_t *, e, - e->heap_index = -1); - smartlist_clear(orconn->active_circuit_pqueue); + tor_assert(chan); + tor_assert(chan->cmux); + + circuitmux_detach_all_circuits(chan->cmux); + chan->num_n_circuits = 0; + chan->num_p_circuits = 0; } /** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false) - * every edge connection that is using <b>circ</b> to write to <b>orconn</b>, + * every edge connection that is using <b>circ</b> to write to <b>chan</b>, * and start or stop reading as appropriate. * * If <b>stream_id</b> is nonzero, block only the edge connection whose @@ -2326,17 +2036,17 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn) * Returns the number of streams whose status we changed. */ static int -set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, +set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan, int block, streamid_t stream_id) { edge_connection_t *edge = NULL; int n = 0; - if (circ->n_conn == orconn) { - circ->streams_blocked_on_n_conn = block; + if (circ->n_chan == chan) { + circ->streams_blocked_on_n_chan = block; if (CIRCUIT_IS_ORIGIN(circ)) edge = TO_ORIGIN_CIRCUIT(circ)->p_streams; } else { - circ->streams_blocked_on_p_conn = block; + circ->streams_blocked_on_p_chan = block; tor_assert(!CIRCUIT_IS_ORIGIN(circ)); edge = TO_OR_CIRCUIT(circ)->n_streams; } @@ -2371,58 +2081,51 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, } /** Pull as many cells as possible (but no more than <b>max</b>) from the - * queue of the first active circuit on <b>conn</b>, and write them to - * <b>conn</b>->outbuf. Return the number of cells written. Advance + * queue of the first active circuit on <b>chan</b>, and write them to + * <b>chan</b>->outbuf. Return the number of cells written. Advance * the active circuit pointer to the next active circuit in the ring. */ int -connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, - time_t now) +channel_flush_from_first_active_circuit(channel_t *chan, int max) { - int n_flushed; + circuitmux_t *cmux = NULL; + int n_flushed = 0; cell_queue_t *queue; circuit_t *circ; + or_circuit_t *or_circ; int streams_blocked; - - /* The current (hi-res) time */ - struct timeval now_hires; - - /* The EWMA cell counter for the circuit we're flushing. */ - cell_ewma_t *cell_ewma = NULL; - double ewma_increment = -1; - - circ = conn->active_circuits; - if (!circ) return 0; - assert_active_circuits_ok_paranoid(conn); - - /* See if we're doing the ewma circuit selection algorithm. */ - if (ewma_enabled) { - unsigned tick; - double fractional_tick; - tor_gettimeofday_cached(&now_hires); - tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick); - - if (tick != conn->active_circuit_pqueue_last_recalibrated) { - scale_active_circuits(conn, tick); + packed_cell_t *cell; + + /* Get the cmux */ + tor_assert(chan); + tor_assert(chan->cmux); + cmux = chan->cmux; + + /* Main loop: pick a circuit, send a cell, update the cmux */ + while (n_flushed < max) { + circ = circuitmux_get_first_active_circuit(cmux); + /* If it returns NULL, no cells left to send */ + if (!circ) break; + assert_cmux_ok_paranoid(chan); + + if (circ->n_chan == chan) { + queue = &circ->n_chan_cells; + streams_blocked = circ->streams_blocked_on_n_chan; + } else { + or_circ = TO_OR_CIRCUIT(circ); + tor_assert(or_circ->p_chan == chan); + queue = &TO_OR_CIRCUIT(circ)->p_chan_cells; + streams_blocked = circ->streams_blocked_on_p_chan; } - ewma_increment = pow(ewma_scale_factor, -fractional_tick); + /* Circuitmux told us this was active, so it should have cells */ + tor_assert(queue->n > 0); - cell_ewma = smartlist_get(conn->active_circuit_pqueue, 0); - circ = cell_ewma_to_circuit(cell_ewma); - } - - if (circ->n_conn == conn) { - queue = &circ->n_conn_cells; - streams_blocked = circ->streams_blocked_on_n_conn; - } else { - queue = &TO_OR_CIRCUIT(circ)->p_conn_cells; - streams_blocked = circ->streams_blocked_on_p_conn; - } - tor_assert(*next_circ_on_conn_p(circ,conn)); - - for (n_flushed = 0; n_flushed < max && queue->head; ) { - packed_cell_t *cell = cell_queue_pop(queue); - tor_assert(*next_circ_on_conn_p(circ,conn)); + /* + * Get just one cell here; once we've sent it, that can change the circuit + * selection, so we have to loop around for another even if this circuit + * has more than one. + */ + cell = cell_queue_pop(queue); /* Calculate the exact time that this cell has spent in the queue. */ if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) { @@ -2438,8 +2141,8 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, "Looks like the CellStatistics option was " "recently enabled."); } else { - or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); insertion_time_elem_t *elem = it_queue->first; + or_circ = TO_OR_CIRCUIT(circ); cell_waiting_time = (uint32_t)((flushed * 10L + SECONDS_IN_A_DAY * 1000L - elem->insertion_time * 10L) % @@ -2452,66 +2155,58 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, it_queue->last = NULL; mp_pool_release(elem); } - orcirc->total_cell_waiting_time += cell_waiting_time; - orcirc->processed_cells++; + or_circ->total_cell_waiting_time += cell_waiting_time; + or_circ->processed_cells++; } } /* If we just flushed our queue and this circuit is used for a * tunneled directory request, possibly advance its state. */ - if (queue->n == 0 && TO_CONN(conn)->dirreq_id) - geoip_change_dirreq_state(TO_CONN(conn)->dirreq_id, + if (queue->n == 0 && chan->dirreq_id) + geoip_change_dirreq_state(chan->dirreq_id, DIRREQ_TUNNELED, DIRREQ_CIRC_QUEUE_FLUSHED); - connection_write_to_buf(cell->body, CELL_NETWORK_SIZE, TO_CONN(conn)); + /* Now send the cell */ + channel_write_packed_cell(chan, cell); + cell = NULL; - packed_cell_free_unchecked(cell); + /* + * Don't packed_cell_free_unchecked(cell) here because the channel will + * do so when it gets out of the channel queue (probably already did, in + * which case that was an immediate double-free bug). + */ + + /* Update the counter */ ++n_flushed; - if (cell_ewma) { - cell_ewma_t *tmp; - cell_ewma->cell_count += ewma_increment; - /* We pop and re-add the cell_ewma_t here, not above, since we need to - * re-add it immediately to keep the priority queue consistent with - * the linked-list implementation */ - tmp = pop_first_cell_ewma_from_conn(conn); - tor_assert(tmp == cell_ewma); - add_cell_ewma_to_conn(conn, cell_ewma); - } - if (!ewma_enabled && circ != conn->active_circuits) { - /* If this happens, the current circuit just got made inactive by - * a call in connection_write_to_buf(). That's nothing to worry about: - * circuit_make_inactive_on_conn() already advanced conn->active_circuits - * for us. - */ - assert_active_circuits_ok_paranoid(conn); - goto done; - } - } - tor_assert(*next_circ_on_conn_p(circ,conn)); - assert_active_circuits_ok_paranoid(conn); - conn->active_circuits = *next_circ_on_conn_p(circ, conn); - /* Is the cell queue low enough to unblock all the streams that are waiting - * to write to this circuit? */ - if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) - set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */ + /* + * Now update the cmux; tell it we've just sent a cell, and how many + * we have left. + */ + circuitmux_notify_xmit_cells(cmux, circ, 1); + circuitmux_set_num_cells(cmux, circ, queue->n); + if (queue->n == 0) + log_debug(LD_GENERAL, "Made a circuit inactive."); + + /* Is the cell queue low enough to unblock all the streams that are waiting + * to write to this circuit? */ + if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) + set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */ - /* Did we just run out of cells on this circuit's queue? */ - if (queue->n == 0) { - log_debug(LD_GENERAL, "Made a circuit inactive."); - make_circuit_inactive_on_conn(circ, conn); + /* If n_flushed < max still, loop around and pick another circuit */ } - done: - if (n_flushed) - conn->timestamp_last_added_nonpadding = now; + + /* Okay, we're done sending now */ + assert_cmux_ok_paranoid(chan); + return n_flushed; } -/** Add <b>cell</b> to the queue of <b>circ</b> writing to <b>orconn</b> +/** Add <b>cell</b> to the queue of <b>circ</b> writing to <b>chan</b> * transmitting in <b>direction</b>. */ void -append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, +append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, cell_t *cell, cell_direction_t direction, streamid_t fromstream) { @@ -2521,12 +2216,12 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, return; if (direction == CELL_DIRECTION_OUT) { - queue = &circ->n_conn_cells; - streams_blocked = circ->streams_blocked_on_n_conn; + queue = &circ->n_chan_cells; + streams_blocked = circ->streams_blocked_on_n_chan; } else { or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); - queue = &orcirc->p_conn_cells; - streams_blocked = circ->streams_blocked_on_p_conn; + queue = &orcirc->p_chan_cells; + streams_blocked = circ->streams_blocked_on_p_chan; } cell_queue_append_packed_copy(queue, cell); @@ -2534,27 +2229,27 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, /* If we have too many cells on the circuit, we should stop reading from * the edge streams for a while. */ if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE) - set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */ + set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */ if (streams_blocked && fromstream) { /* This edge connection is apparently not blocked; block it. */ - set_streams_blocked_on_circ(circ, orconn, 1, fromstream); + set_streams_blocked_on_circ(circ, chan, 1, fromstream); } + update_circuit_on_cmux(circ, direction); if (queue->n == 1) { - /* This was the first cell added to the queue. We need to make this + /* This was the first cell added to the queue. We just made this * circuit active. */ log_debug(LD_GENERAL, "Made a circuit active."); - make_circuit_active_on_conn(circ, orconn); } - if (! connection_get_outbuf_len(TO_CONN(orconn))) { + if (!channel_has_queued_writes(chan)) { /* There is no data at all waiting to be sent on the outbuf. Add a * cell, so that we can notice when it gets flushed, flushed_some can * get called, and we can start putting more data onto the buffer then. */ log_debug(LD_GENERAL, "Primed a buffer."); - connection_or_flush_from_first_active_circuit(orconn, 1, approx_time()); + channel_flush_from_first_active_circuit(chan, 1); } } @@ -2618,58 +2313,39 @@ decode_address_from_payload(tor_addr_t *addr_out, const uint8_t *payload, return payload + 2 + payload[1]; } -/** Remove all the cells queued on <b>circ</b> for <b>orconn</b>. */ +/** Remove all the cells queued on <b>circ</b> for <b>chan</b>. */ void -circuit_clear_cell_queue(circuit_t *circ, or_connection_t *orconn) +circuit_clear_cell_queue(circuit_t *circ, channel_t *chan) { cell_queue_t *queue; - if (circ->n_conn == orconn) { - queue = &circ->n_conn_cells; + cell_direction_t direction; + + if (circ->n_chan == chan) { + queue = &circ->n_chan_cells; + direction = CELL_DIRECTION_OUT; } else { or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); - tor_assert(orcirc->p_conn == orconn); - queue = &orcirc->p_conn_cells; + tor_assert(orcirc->p_chan == chan); + queue = &orcirc->p_chan_cells; + direction = CELL_DIRECTION_IN; } - if (queue->n) - make_circuit_inactive_on_conn(circ,orconn); - + /* Clear the queue */ cell_queue_clear(queue); + + /* Update the cell counter in the cmux */ + update_circuit_on_cmux(circ, direction); } -/** Fail with an assert if the active circuits ring on <b>orconn</b> is - * corrupt. */ +/** Fail with an assert if the circuit mux on chan is corrupt + */ void -assert_active_circuits_ok(or_connection_t *orconn) +assert_circuit_mux_okay(channel_t *chan) { - circuit_t *head = orconn->active_circuits; - circuit_t *cur = head; - int n = 0; - if (! head) - return; - do { - circuit_t *next = *next_circ_on_conn_p(cur, orconn); - circuit_t *prev = *prev_circ_on_conn_p(cur, orconn); - cell_ewma_t *ewma; - tor_assert(next); - tor_assert(prev); - tor_assert(*next_circ_on_conn_p(prev, orconn) == cur); - tor_assert(*prev_circ_on_conn_p(next, orconn) == cur); - if (orconn == cur->n_conn) { - ewma = &cur->n_cell_ewma; - tor_assert(!ewma->is_for_p_conn); - } else { - ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma; - tor_assert(ewma->is_for_p_conn); - } - tor_assert(ewma->heap_index != -1); - tor_assert(ewma == smartlist_get(orconn->active_circuit_pqueue, - ewma->heap_index)); - n++; - cur = next; - } while (cur != head); - - tor_assert(n == smartlist_len(orconn->active_circuit_pqueue)); + tor_assert(chan); + tor_assert(chan->cmux); + + circuitmux_assert_okay(chan->cmux); } /** Return 1 if we shouldn't restart reading on this circuit, even if @@ -2679,9 +2355,9 @@ static int circuit_queue_streams_are_blocked(circuit_t *circ) { if (CIRCUIT_IS_ORIGIN(circ)) { - return circ->streams_blocked_on_n_conn; + return circ->streams_blocked_on_n_chan; } else { - return circ->streams_blocked_on_p_conn; + return circ->streams_blocked_on_p_chan; } } |