summaryrefslogtreecommitdiff
path: root/src/or/main.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/or/main.c')
-rw-r--r--src/or/main.c201
1 files changed, 184 insertions, 17 deletions
diff --git a/src/or/main.c b/src/or/main.c
index 477a274d54..adc2d3dae3 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -56,6 +56,10 @@
#include <event.h>
#endif
+#ifdef USE_BUFFEREVENTS
+#include <event2/bufferevent.h>
+#endif
+
void evdns_shutdown(int);
/********* PROTOTYPES **********/
@@ -72,6 +76,7 @@ static int connection_should_read_from_linked_conn(connection_t *conn);
/********* START VARIABLES **********/
+#ifndef USE_BUFFEREVENTS
int global_read_bucket; /**< Max number of bytes I can read this second. */
int global_write_bucket; /**< Max number of bytes I can write this second. */
@@ -79,13 +84,17 @@ int global_write_bucket; /**< Max number of bytes I can write this second. */
int global_relayed_read_bucket;
/** Max number of relayed (bandwidth class 1) bytes I can write this second. */
int global_relayed_write_bucket;
-
/** What was the read bucket before the last second_elapsed_callback() call?
* (used to determine how many bytes we've read). */
static int stats_prev_global_read_bucket;
/** What was the write bucket before the last second_elapsed_callback() call?
* (used to determine how many bytes we've written). */
static int stats_prev_global_write_bucket;
+#else
+static uint64_t stats_prev_n_read = 0;
+static uint64_t stats_prev_n_written = 0;
+#endif
+
/* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/
/** How many bytes have we read since we started the process? */
static uint64_t stats_n_bytes_read = 0;
@@ -151,12 +160,38 @@ int can_complete_circuit=0;
*
****************************************************************************/
+#ifdef USE_BUFFEREVENTS
+static void
+free_old_inbuf(connection_t *conn)
+{
+ if (! conn->inbuf)
+ return;
+
+ tor_assert(conn->outbuf);
+ tor_assert(buf_datalen(conn->inbuf) == 0);
+ tor_assert(buf_datalen(conn->outbuf) == 0);
+ buf_free(conn->inbuf);
+ buf_free(conn->outbuf);
+ conn->inbuf = conn->outbuf = NULL;
+
+ if (conn->read_event) {
+ event_del(conn->read_event);
+ tor_event_free(conn->read_event);
+ }
+ if (conn->write_event) {
+ event_del(conn->read_event);
+ tor_event_free(conn->write_event);
+ }
+ conn->read_event = conn->write_event = NULL;
+}
+#endif
+
/** Add <b>conn</b> to the array of connections that we can poll on. The
* connection's socket must be set; the connection starts out
* non-reading and non-writing.
*/
int
-connection_add(connection_t *conn)
+connection_add_impl(connection_t *conn, int is_connecting)
{
tor_assert(conn);
tor_assert(conn->s >= 0 ||
@@ -168,11 +203,59 @@ connection_add(connection_t *conn)
conn->conn_array_index = smartlist_len(connection_array);
smartlist_add(connection_array, conn);
- if (conn->s >= 0 || conn->linked) {
+#ifdef USE_BUFFEREVENTS
+ if (connection_type_uses_bufferevent(conn)) {
+ if (conn->s >= 0 && !conn->linked) {
+ conn->bufev = bufferevent_socket_new(
+ tor_libevent_get_base(),
+ conn->s,
+ BEV_OPT_DEFER_CALLBACKS);
+ /* XXXX CHECK FOR NULL RETURN! */
+ if (is_connecting) {
+ /* Put the bufferevent into a "connecting" state so that we'll get
+ * a "connected" event callback on successful write. */
+ bufferevent_socket_connect(conn->bufev, NULL, 0);
+ }
+ connection_configure_bufferevent_callbacks(conn);
+ } else if (conn->linked && conn->linked_conn &&
+ connection_type_uses_bufferevent(conn->linked_conn)) {
+ tor_assert(conn->s < 0);
+ if (!conn->bufev) {
+ struct bufferevent *pair[2] = { NULL, NULL };
+ /* XXXX CHECK FOR ERROR RETURN! */
+ bufferevent_pair_new(tor_libevent_get_base(),
+ BEV_OPT_DEFER_CALLBACKS,
+ pair);
+ tor_assert(pair[0]);
+ conn->bufev = pair[0];
+ conn->linked_conn->bufev = pair[1];
+ } /* else the other side already was added, and got a bufferevent_pair */
+ connection_configure_bufferevent_callbacks(conn);
+ }
+
+ if (conn->bufev && conn->inbuf) {
+ /* XXX Instead we should assert that there is no inbuf, once we
+ * have linked connections using bufferevents. */
+ free_old_inbuf(conn);
+ }
+
+ if (conn->linked_conn && conn->linked_conn->bufev &&
+ conn->linked_conn->inbuf) {
+ /* XXX Instead we should assert that there is no inbuf, once we
+ * have linked connections using bufferevents. */
+ free_old_inbuf(conn->linked_conn);
+ }
+ }
+#else
+ (void) is_connecting;
+#endif
+
+ if (!HAS_BUFFEREVENT(conn) && (conn->s >= 0 || conn->linked)) {
conn->read_event = tor_event_new(tor_libevent_get_base(),
conn->s, EV_READ|EV_PERSIST, conn_read_callback, conn);
conn->write_event = tor_event_new(tor_libevent_get_base(),
conn->s, EV_WRITE|EV_PERSIST, conn_write_callback, conn);
+ /* XXXX CHECK FOR NULL RETURN! */
}
log_debug(LD_NET,"new conn type %s, socket %d, address %s, n_conns %d.",
@@ -196,6 +279,12 @@ connection_unregister_events(connection_t *conn)
log_warn(LD_BUG, "Error removing write event for %d", conn->s);
tor_free(conn->write_event);
}
+#ifdef USE_BUFFEREVENTS
+ if (conn->bufev) {
+ bufferevent_free(conn->bufev);
+ conn->bufev = NULL;
+ }
+#endif
if (conn->dns_server_port) {
dnsserv_close_listener(conn);
}
@@ -310,6 +399,17 @@ get_connection_array(void)
void
connection_watch_events(connection_t *conn, watchable_events_t events)
{
+ IF_HAS_BUFFEREVENT(conn, {
+ short ev = ((short)events) & (EV_READ|EV_WRITE);
+ short old_ev = bufferevent_get_enabled(conn->bufev);
+ if ((ev & ~old_ev) != 0) {
+ bufferevent_enable(conn->bufev, ev);
+ }
+ if ((old_ev & ~ev) != 0) {
+ bufferevent_disable(conn->bufev, old_ev & ~ev);
+ }
+ return;
+ });
if (events & READ_EVENT)
connection_start_reading(conn);
else
@@ -327,6 +427,9 @@ connection_is_reading(connection_t *conn)
{
tor_assert(conn);
+ IF_HAS_BUFFEREVENT(conn,
+ return (bufferevent_get_enabled(conn->bufev) & EV_READ) != 0;
+ );
return conn->reading_from_linked_conn ||
(conn->read_event && event_pending(conn->read_event, EV_READ, NULL));
}
@@ -336,6 +439,12 @@ void
connection_stop_reading(connection_t *conn)
{
tor_assert(conn);
+
+ IF_HAS_BUFFEREVENT(conn, {
+ bufferevent_disable(conn->bufev, EV_READ);
+ return;
+ });
+
tor_assert(conn->read_event);
if (conn->linked) {
@@ -355,6 +464,12 @@ void
connection_start_reading(connection_t *conn)
{
tor_assert(conn);
+
+ IF_HAS_BUFFEREVENT(conn, {
+ bufferevent_enable(conn->bufev, EV_READ);
+ return;
+ });
+
tor_assert(conn->read_event);
if (conn->linked) {
@@ -376,6 +491,10 @@ connection_is_writing(connection_t *conn)
{
tor_assert(conn);
+ IF_HAS_BUFFEREVENT(conn,
+ return (bufferevent_get_enabled(conn->bufev) & EV_WRITE) != 0;
+ );
+
return conn->writing_to_linked_conn ||
(conn->write_event && event_pending(conn->write_event, EV_WRITE, NULL));
}
@@ -385,6 +504,12 @@ void
connection_stop_writing(connection_t *conn)
{
tor_assert(conn);
+
+ IF_HAS_BUFFEREVENT(conn, {
+ bufferevent_disable(conn->bufev, EV_WRITE);
+ return;
+ });
+
tor_assert(conn->write_event);
if (conn->linked) {
@@ -405,6 +530,12 @@ void
connection_start_writing(connection_t *conn)
{
tor_assert(conn);
+
+ IF_HAS_BUFFEREVENT(conn, {
+ bufferevent_enable(conn->bufev, EV_WRITE);
+ return;
+ });
+
tor_assert(conn->write_event);
if (conn->linked) {
@@ -590,7 +721,20 @@ conn_close_if_marked(int i)
assert_connection_ok(conn, now);
/* assert_all_pending_dns_resolves_ok(); */
+#ifdef USE_BUFFEREVENTS
+ if (conn->bufev && conn->hold_open_until_flushed) {
+ if (conn->linked) {
+ /* We need to do this explicitly so that the linked connection
+ * notices that there was an EOF. */
+ bufferevent_flush(conn->bufev, EV_WRITE, BEV_FINISHED);
+ }
+ if (evbuffer_get_length(bufferevent_get_output(conn->bufev)))
+ return 0;
+ }
+#endif
+
log_debug(LD_NET,"Cleaning up connection (fd %d).",conn->s);
+ IF_HAS_BUFFEREVENT(conn, goto unlink);
if ((conn->s >= 0 || conn->linked_conn) && connection_wants_to_flush(conn)) {
/* s == -1 means it's an incomplete edge connection, or that the socket
* has already been closed as unflushable. */
@@ -613,8 +757,8 @@ conn_close_if_marked(int i)
}
log_debug(LD_GENERAL, "Flushed last %d bytes from a linked conn; "
"%d left; flushlen %d; wants-to-flush==%d", retval,
- (int)buf_datalen(conn->outbuf),
- (int)conn->outbuf_flushlen,
+ (int)connection_get_outbuf_len(conn),
+ (int)conn->outbuf_flushlen,
connection_wants_to_flush(conn));
} else if (connection_speaks_cells(conn)) {
if (conn->state == OR_CONN_STATE_OPEN) {
@@ -651,13 +795,17 @@ conn_close_if_marked(int i)
"something is wrong with your network connection, or "
"something is wrong with theirs. "
"(fd %d, type %s, state %d, marked at %s:%d).",
- (int)buf_datalen(conn->outbuf),
+ (int)connection_get_outbuf_len(conn),
escaped_safe_str_client(conn->address),
conn->s, conn_type_to_string(conn->type), conn->state,
conn->marked_for_close_file,
conn->marked_for_close);
}
}
+
+#ifdef USE_BUFFEREVENTS
+ unlink:
+#endif
connection_unlink(conn); /* unlink, remove, free */
return 1;
}
@@ -744,7 +892,8 @@ run_connection_housekeeping(int i, time_t now)
int past_keepalive =
now >= conn->timestamp_lastwritten + options->KeepalivePeriod;
- if (conn->outbuf && !buf_datalen(conn->outbuf) && conn->type == CONN_TYPE_OR)
+ if (conn->outbuf && !connection_get_outbuf_len(conn) &&
+ conn->type == CONN_TYPE_OR)
TO_OR_CONN(conn)->timestamp_lastempty = now;
if (conn->marked_for_close) {
@@ -764,7 +913,7 @@ run_connection_housekeeping(int i, time_t now)
/* This check is temporary; it's to let us know whether we should consider
* parsing partial serverdesc responses. */
if (conn->purpose == DIR_PURPOSE_FETCH_SERVERDESC &&
- buf_datalen(conn->inbuf)>=1024) {
+ connection_get_inbuf_len(conn) >= 1024) {
log_info(LD_DIR,"Trying to extract information from wedged server desc "
"download.");
connection_dir_reached_eof(TO_DIR_CONN(conn));
@@ -781,7 +930,11 @@ run_connection_housekeeping(int i, time_t now)
the connection or send a keepalive, depending. */
or_conn = TO_OR_CONN(conn);
+#ifdef USE_BUFFEREVENTS
+ tor_assert(conn->bufev);
+#else
tor_assert(conn->outbuf);
+#endif
if (or_conn->is_bad_for_new_circs && !or_conn->n_circuits) {
/* It's bad for new circuits, and has no unmarked circuits on it:
@@ -803,13 +956,12 @@ run_connection_housekeeping(int i, time_t now)
connection_mark_for_close(conn);
}
} else if (we_are_hibernating() && !or_conn->n_circuits &&
- !buf_datalen(conn->outbuf)) {
+ !connection_get_outbuf_len(conn)) {
/* We're hibernating, there's no circuits, and nothing to flush.*/
log_info(LD_OR,"Expiring non-used OR connection to fd %d (%s:%d) "
"[Hibernating or exiting].",
conn->s,conn->address, conn->port);
- connection_mark_for_close(conn);
- conn->hold_open_until_flushed = 1;
+ connection_mark_and_flush(conn);
} else if (!or_conn->n_circuits &&
now >= or_conn->timestamp_last_added_nonpadding +
IDLE_OR_CONN_TIMEOUT) {
@@ -817,7 +969,6 @@ run_connection_housekeeping(int i, time_t now)
"[idle %d].", conn->s,conn->address, conn->port,
(int)(now - or_conn->timestamp_last_added_nonpadding));
connection_mark_for_close(conn);
- conn->hold_open_until_flushed = 1;
} else if (
now >= or_conn->timestamp_lastempty + options->KeepalivePeriod*10 &&
now >= conn->timestamp_lastwritten + options->KeepalivePeriod*10) {
@@ -825,10 +976,10 @@ run_connection_housekeeping(int i, time_t now)
"Expiring stuck OR connection to fd %d (%s:%d). (%d bytes to "
"flush; %d seconds since last write)",
conn->s, conn->address, conn->port,
- (int)buf_datalen(conn->outbuf),
+ (int)connection_get_outbuf_len(conn),
(int)(now-conn->timestamp_lastwritten));
connection_mark_for_close(conn);
- } else if (past_keepalive && !buf_datalen(conn->outbuf)) {
+ } else if (past_keepalive && !connection_get_outbuf_len(conn)) {
/* send a padding cell */
log_fn(LOG_DEBUG,LD_OR,"Sending keepalive to (%s:%d)",
conn->address, conn->port);
@@ -1249,6 +1400,9 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
size_t bytes_written;
size_t bytes_read;
int seconds_elapsed;
+#ifdef USE_BUFFEREVENTS
+ uint64_t cur_read,cur_written;
+#endif
or_options_t *options = get_options();
(void)timer;
(void)arg;
@@ -1260,9 +1414,15 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
update_approx_time(now);
/* the second has rolled over. check more stuff. */
+ seconds_elapsed = current_second ? (int)(now - current_second) : 0;
+#ifdef USE_BUFFEREVENTS
+ connection_get_rate_limit_totals(&cur_read, &cur_written);
+ bytes_written = (size_t)(cur_written - stats_prev_n_written);
+ bytes_read = (size_t)(cur_read - stats_prev_n_read);
+#else
bytes_written = stats_prev_global_write_bucket - global_write_bucket;
bytes_read = stats_prev_global_read_bucket - global_read_bucket;
- seconds_elapsed = current_second ? (int)(now - current_second) : 0;
+#endif
stats_n_bytes_read += bytes_read;
stats_n_bytes_written += bytes_written;
if (accounting_is_enabled(options) && seconds_elapsed >= 0)
@@ -1272,8 +1432,13 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
if (seconds_elapsed > 0)
connection_bucket_refill(seconds_elapsed, now);
+#ifdef USE_BUFFEREVENTS
+ stats_prev_n_written = cur_written;
+ stats_prev_n_read = cur_read;
+#else
stats_prev_global_read_bucket = global_read_bucket;
stats_prev_global_write_bucket = global_write_bucket;
+#endif
if (server_mode(options) &&
!we_are_hibernating() &&
@@ -1474,8 +1639,10 @@ do_main_loop(void)
/* Set up our buckets */
connection_bucket_init();
+#ifndef USE_BUFFEREVENTS
stats_prev_global_read_bucket = global_read_bucket;
stats_prev_global_write_bucket = global_write_bucket;
+#endif
/* initialize the bootstrap status events to know we're starting up */
control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0);
@@ -1719,13 +1886,13 @@ dumpstats(int severity)
log(severity,LD_GENERAL,
"Conn %d: %d bytes waiting on inbuf (len %d, last read %d secs ago)",
i,
- (int)buf_datalen(conn->inbuf),
+ (int)connection_get_inbuf_len(conn),
(int)buf_allocation(conn->inbuf),
(int)(now - conn->timestamp_lastread));
log(severity,LD_GENERAL,
"Conn %d: %d bytes waiting on outbuf "
"(len %d, last written %d secs ago)",i,
- (int)buf_datalen(conn->outbuf),
+ (int)connection_get_outbuf_len(conn),
(int)buf_allocation(conn->outbuf),
(int)(now - conn->timestamp_lastwritten));
if (conn->type == CONN_TYPE_OR) {