aboutsummaryrefslogtreecommitdiff
path: root/src/or/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/or/connection.c')
-rw-r--r--src/or/connection.c501
1 files changed, 467 insertions, 34 deletions
diff --git a/src/or/connection.c b/src/or/connection.c
index 5054909df9..50acec32a3 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -36,6 +36,10 @@
#include "router.h"
#include "routerparse.h"
+#ifdef USE_BUFFEREVENTS
+#include <event2/event.h>
+#endif
+
static connection_t *connection_create_listener(
const struct sockaddr *listensockaddr,
socklen_t listensocklen, int type,
@@ -45,8 +49,10 @@ static void connection_init(time_t now, connection_t *conn, int type,
static int connection_init_accepted_conn(connection_t *conn,
uint8_t listener_type);
static int connection_handle_listener_read(connection_t *conn, int new_type);
+#ifndef USE_BUFFEREVENTS
static int connection_bucket_should_increase(int bucket,
or_connection_t *conn);
+#endif
static int connection_finished_flushing(connection_t *conn);
static int connection_flushed_some(connection_t *conn);
static int connection_finished_connecting(connection_t *conn);
@@ -183,6 +189,26 @@ conn_state_to_string(int type, int state)
return buf;
}
+#ifdef USE_BUFFEREVENTS
+/** Return true iff the connection's type is one that can use a
+ bufferevent-based implementation. */
+int
+connection_type_uses_bufferevent(connection_t *conn)
+{
+ switch (conn->type) {
+ case CONN_TYPE_AP:
+ case CONN_TYPE_EXIT:
+ case CONN_TYPE_DIR:
+ case CONN_TYPE_CONTROL:
+ case CONN_TYPE_OR:
+ case CONN_TYPE_CPUWORKER:
+ return 1;
+ default:
+ return 0;
+ }
+}
+#endif
+
/** Allocate and return a new dir_connection_t, initialized as by
* connection_init(). */
dir_connection_t *
@@ -308,10 +334,13 @@ connection_init(time_t now, connection_t *conn, int type, int socket_family)
conn->type = type;
conn->socket_family = socket_family;
- if (!connection_is_listener(conn)) { /* listeners never use their buf */
+#ifndef USE_BUFFEREVENTS
+ if (!connection_is_listener(conn)) {
+ /* listeners never use their buf */
conn->inbuf = buf_new();
conn->outbuf = buf_new();
}
+#endif
conn->timestamp_created = now;
conn->timestamp_lastread = now;
@@ -377,7 +406,8 @@ _connection_free(connection_t *conn)
"bytes on inbuf, %d on outbuf.",
conn_type_to_string(conn->type),
conn_state_to_string(conn->type, conn->state),
- (int)buf_datalen(conn->inbuf), (int)buf_datalen(conn->outbuf));
+ (int)connection_get_inbuf_len(conn),
+ (int)connection_get_outbuf_len(conn));
}
if (!connection_is_listener(conn)) {
@@ -424,6 +454,15 @@ _connection_free(connection_t *conn)
tor_free(conn->read_event); /* Probably already freed by connection_free. */
tor_free(conn->write_event); /* Probably already freed by connection_free. */
+ IF_HAS_BUFFEREVENT(conn, {
+ /* This was a workaround to handle bugs in some old versions of libevent
+ * where callbacks can occur after calling bufferevent_free(). Setting
+ * the callbacks to NULL prevented this. It shouldn't be necessary any
+ * more, but let's not tempt fate for now. */
+ bufferevent_setcb(conn->bufev, NULL, NULL, NULL, NULL);
+ bufferevent_free(conn->bufev);
+ conn->bufev = NULL;
+ });
if (conn->type == CONN_TYPE_DIR) {
dir_connection_t *dir_conn = TO_DIR_CONN(conn);
@@ -450,6 +489,11 @@ _connection_free(connection_t *conn)
log_warn(LD_BUG, "called on OR conn with non-zeroed identity_digest");
connection_or_remove_from_identity_map(TO_OR_CONN(conn));
}
+#ifdef USE_BUFFEREVENTS
+ if (conn->type == CONN_TYPE_OR && TO_OR_CONN(conn)->bucket_cfg) {
+ ev_token_bucket_cfg_free(TO_OR_CONN(conn)->bucket_cfg);
+ }
+#endif
memset(mem, 0xCC, memlen); /* poison memory */
tor_free(mem);
@@ -675,10 +719,9 @@ connection_close_immediate(connection_t *conn)
conn->s = -1;
if (conn->linked)
conn->linked_conn_is_closed = 1;
- if (!connection_is_listener(conn)) {
+ if (conn->outbuf)
buf_clear(conn->outbuf);
- conn->outbuf_flushlen = 0;
- }
+ conn->outbuf_flushlen = 0;
}
/** Mark <b>conn</b> to be closed next time we loop through
@@ -1322,7 +1365,7 @@ connection_connect(connection_t *conn, const char *address,
int s, inprogress = 0;
char addrbuf[256];
struct sockaddr *dest_addr;
- socklen_t dest_addr_len;
+ int dest_addr_len;
or_options_t *options = get_options();
int protocol_family;
@@ -1380,7 +1423,7 @@ connection_connect(connection_t *conn, const char *address,
make_socket_reuseable(s);
- if (connect(s, dest_addr, dest_addr_len) < 0) {
+ if (connect(s, dest_addr, (socklen_t)dest_addr_len) < 0) {
int e = tor_socket_errno(s);
if (!ERRNO_IS_CONN_EINPROGRESS(e)) {
/* yuck. kill it. */
@@ -1405,7 +1448,7 @@ connection_connect(connection_t *conn, const char *address,
escaped_safe_str_client(address),
port, inprogress?"in progress":"established", s);
conn->s = s;
- if (connection_add(conn) < 0) /* no space, forget it */
+ if (connection_add_connecting(conn) < 0) /* no space, forget it */
return -1;
return inprogress ? 0 : 1;
}
@@ -1638,6 +1681,19 @@ connection_send_socks5_connect(connection_t *conn)
conn->proxy_state = PROXY_SOCKS5_WANT_CONNECT_OK;
}
+/** DOCDOC */
+static int
+connection_fetch_from_buf_socks_client(connection_t *conn,
+ int state, char **reason)
+{
+ IF_HAS_BUFFEREVENT(conn, {
+ struct evbuffer *input = bufferevent_get_input(conn->bufev);
+ return fetch_from_evbuffer_socks_client(input, state, reason);
+ }) ELSE_IF_NO_BUFFEREVENT {
+ return fetch_from_buf_socks_client(conn->inbuf, state, reason);
+ }
+}
+
/** Call this from connection_*_process_inbuf() to advance the proxy
* handshake.
*
@@ -1665,17 +1721,17 @@ connection_read_proxy_handshake(connection_t *conn)
break;
case PROXY_SOCKS4_WANT_CONNECT_OK:
- ret = fetch_from_buf_socks_client(conn->inbuf,
- conn->proxy_state,
- &reason);
+ ret = connection_fetch_from_buf_socks_client(conn,
+ conn->proxy_state,
+ &reason);
if (ret == 1)
conn->proxy_state = PROXY_CONNECTED;
break;
case PROXY_SOCKS5_WANT_AUTH_METHOD_NONE:
- ret = fetch_from_buf_socks_client(conn->inbuf,
- conn->proxy_state,
- &reason);
+ ret = connection_fetch_from_buf_socks_client(conn,
+ conn->proxy_state,
+ &reason);
/* no auth needed, do connect */
if (ret == 1) {
connection_send_socks5_connect(conn);
@@ -1684,9 +1740,9 @@ connection_read_proxy_handshake(connection_t *conn)
break;
case PROXY_SOCKS5_WANT_AUTH_METHOD_RFC1929:
- ret = fetch_from_buf_socks_client(conn->inbuf,
- conn->proxy_state,
- &reason);
+ ret = connection_fetch_from_buf_socks_client(conn,
+ conn->proxy_state,
+ &reason);
/* send auth if needed, otherwise do connect */
if (ret == 1) {
@@ -1721,9 +1777,9 @@ connection_read_proxy_handshake(connection_t *conn)
break;
case PROXY_SOCKS5_WANT_AUTH_RFC1929_OK:
- ret = fetch_from_buf_socks_client(conn->inbuf,
- conn->proxy_state,
- &reason);
+ ret = connection_fetch_from_buf_socks_client(conn,
+ conn->proxy_state,
+ &reason);
/* send the connect request */
if (ret == 1) {
connection_send_socks5_connect(conn);
@@ -1732,9 +1788,9 @@ connection_read_proxy_handshake(connection_t *conn)
break;
case PROXY_SOCKS5_WANT_CONNECT_OK:
- ret = fetch_from_buf_socks_client(conn->inbuf,
- conn->proxy_state,
- &reason);
+ ret = connection_fetch_from_buf_socks_client(conn,
+ conn->proxy_state,
+ &reason);
if (ret == 1)
conn->proxy_state = PROXY_CONNECTED;
break;
@@ -2003,14 +2059,20 @@ retry_all_listeners(smartlist_t *replaced_conns,
static int
connection_is_rate_limited(connection_t *conn)
{
- if (conn->linked || /* internal connection */
- tor_addr_family(&conn->addr) == AF_UNSPEC || /* no address */
- tor_addr_is_internal(&conn->addr, 0)) /* internal address */
- return 0;
+ or_options_t *options = get_options();
+ if (conn->linked)
+ return 0; /* Internal connection */
+ else if (! options->CountPrivateBandwidth &&
+ (tor_addr_family(&conn->addr) == AF_UNSPEC || /* no address */
+ tor_addr_is_internal(&conn->addr, 0)))
+ return 0; /* Internal address */
else
return 1;
}
+#ifdef USE_BUFFEREVENTS
+static struct bufferevent_rate_limit_group *global_rate_limit = NULL;
+#else
extern int global_read_bucket, global_write_bucket;
extern int global_relayed_read_bucket, global_relayed_write_bucket;
@@ -2018,11 +2080,13 @@ extern int global_relayed_read_bucket, global_relayed_write_bucket;
* we are likely to run dry again this second, so be stingy with the
* tokens we just put in. */
static int write_buckets_empty_last_second = 0;
+#endif
/** How many seconds of no active local circuits will make the
* connection revert to the "relayed" bandwidth class? */
#define CLIENT_IDLE_TIME_FOR_PRIORITY 30
+#ifndef USE_BUFFEREVENTS
/** Return 1 if <b>conn</b> should use tokens from the "relayed"
* bandwidth rates, else 0. Currently, only OR conns with bandwidth
* class 1, and directory conns that are serving data out, count.
@@ -2133,6 +2197,20 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
return connection_bucket_round_robin(base, priority,
global_bucket, conn_bucket);
}
+#else
+static ssize_t
+connection_bucket_read_limit(connection_t *conn, time_t now)
+{
+ (void) now;
+ return bufferevent_get_max_to_read(conn->bufev);
+}
+ssize_t
+connection_bucket_write_limit(connection_t *conn, time_t now)
+{
+ (void) now;
+ return bufferevent_get_max_to_write(conn->bufev);
+}
+#endif
/** Return 1 if the global write buckets are low enough that we
* shouldn't send <b>attempt</b> bytes of low-priority directory stuff
@@ -2157,8 +2235,12 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
int
global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
{
+#ifdef USE_BUFFEREVENTS
+ ssize_t smaller_bucket = bufferevent_get_max_to_write(conn->bufev);
+#else
int smaller_bucket = global_write_bucket < global_relayed_write_bucket ?
global_write_bucket : global_relayed_write_bucket;
+#endif
if (authdir_mode(get_options()) && priority>1)
return 0; /* there's always room to answer v2 if we're an auth dir */
@@ -2168,8 +2250,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
if (smaller_bucket < (int)attempt)
return 1; /* not enough space no matter the priority */
+#ifndef USE_BUFFEREVENTS
if (write_buckets_empty_last_second)
return 1; /* we're already hitting our limits, no more please */
+#endif
if (priority == 1) { /* old-style v1 query */
/* Could we handle *two* of these requests within the next two seconds? */
@@ -2185,6 +2269,7 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
return 0;
}
+#ifndef USE_BUFFEREVENTS
/** We just read <b>num_read</b> and wrote <b>num_written</b> bytes
* onto <b>conn</b>. Decrement buckets appropriately. */
static void
@@ -2212,6 +2297,11 @@ connection_buckets_decrement(connection_t *conn, time_t now,
if (!connection_is_rate_limited(conn))
return; /* local IPs are free */
+
+ if (conn->type == CONN_TYPE_OR)
+ rep_hist_note_or_conn_bytes(conn->global_identifier, num_read,
+ num_written, now);
+
if (num_read > 0) {
rep_hist_note_bytes_read(num_read, now);
}
@@ -2428,6 +2518,88 @@ connection_bucket_should_increase(int bucket, or_connection_t *conn)
return 1;
}
+#else
+
+static void
+connection_buckets_decrement(connection_t *conn, time_t now,
+ size_t num_read, size_t num_written)
+{
+ (void) conn;
+ (void) now;
+ (void) num_read;
+ (void) num_written;
+ /* Libevent does this for us. */
+}
+void
+connection_bucket_refill(int seconds_elapsed, time_t now)
+{
+ (void) seconds_elapsed;
+ (void) now;
+ /* Libevent does this for us. */
+}
+void
+connection_bucket_init(void)
+{
+ or_options_t *options = get_options();
+ const struct timeval *tick = tor_libevent_get_one_tick_timeout();
+ struct ev_token_bucket_cfg *bucket_cfg;
+
+ uint64_t rate, burst;
+ if (options->RelayBandwidthRate) {
+ rate = options->RelayBandwidthRate;
+ burst = options->RelayBandwidthBurst;
+ } else {
+ rate = options->BandwidthRate;
+ burst = options->BandwidthBurst;
+ }
+
+ rate /= TOR_LIBEVENT_TICKS_PER_SECOND;
+ bucket_cfg = ev_token_bucket_cfg_new((uint32_t)rate, (uint32_t)burst,
+ (uint32_t)rate, (uint32_t)burst,
+ tick);
+
+ if (!global_rate_limit) {
+ global_rate_limit =
+ bufferevent_rate_limit_group_new(tor_libevent_get_base(), bucket_cfg);
+ } else {
+ bufferevent_rate_limit_group_set_cfg(global_rate_limit, bucket_cfg);
+ }
+ ev_token_bucket_cfg_free(bucket_cfg);
+}
+
+void
+connection_get_rate_limit_totals(uint64_t *read_out, uint64_t *written_out)
+{
+ if (global_rate_limit == NULL) {
+ *read_out = *written_out = 0;
+ } else {
+ bufferevent_rate_limit_group_get_totals(
+ global_rate_limit, read_out, written_out);
+ }
+}
+
+/** DOCDOC */
+void
+connection_enable_rate_limiting(connection_t *conn)
+{
+ if (conn->bufev) {
+ if (!global_rate_limit)
+ connection_bucket_init();
+ bufferevent_add_to_rate_limit_group(conn->bufev, global_rate_limit);
+ }
+}
+
+static void
+connection_consider_empty_write_buckets(connection_t *conn)
+{
+ (void) conn;
+}
+static void
+connection_consider_empty_read_buckets(connection_t *conn)
+{
+ (void) conn;
+}
+#endif
/** Read bytes from conn-\>s and process them.
*
@@ -2682,7 +2854,7 @@ connection_read_to_buf(connection_t *conn, ssize_t *max_to_read,
}
if (n_read > 0) {
- /* change *max_to_read */
+ /* change *max_to_read */
*max_to_read = at_most - n_read;
/* Update edge_conn->n_read */
@@ -2714,11 +2886,202 @@ connection_read_to_buf(connection_t *conn, ssize_t *max_to_read,
return 0;
}
+#ifdef USE_BUFFEREVENTS
+/* XXXX These generic versions could be simplified by making them
+ type-specific */
+
+/** Callback: Invoked whenever bytes are added to or drained from an input
+ * evbuffer. Used to track the number of bytes read. */
+static void
+evbuffer_inbuf_callback(struct evbuffer *buf,
+ const struct evbuffer_cb_info *info, void *arg)
+{
+ connection_t *conn = arg;
+ (void) buf;
+ /* XXXX These need to get real counts on the non-nested TLS case. - NM */
+ if (info->n_added) {
+ time_t now = approx_time();
+ conn->timestamp_lastread = now;
+ connection_buckets_decrement(conn, now, info->n_added, 0);
+ connection_consider_empty_read_buckets(conn);
+ if (conn->type == CONN_TYPE_AP) {
+ edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
+ /*XXXX022 check for overflow*/
+ edge_conn->n_read += (int)info->n_added;
+ }
+ }
+}
+
+/** Callback: Invoked whenever bytes are added to or drained from an output
+ * evbuffer. Used to track the number of bytes written. */
+static void
+evbuffer_outbuf_callback(struct evbuffer *buf,
+ const struct evbuffer_cb_info *info, void *arg)
+{
+ connection_t *conn = arg;
+ (void)buf;
+ if (info->n_deleted) {
+ time_t now = approx_time();
+ conn->timestamp_lastwritten = now;
+ connection_buckets_decrement(conn, now, 0, info->n_deleted);
+ connection_consider_empty_write_buckets(conn);
+ if (conn->type == CONN_TYPE_AP) {
+ edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
+ /*XXXX022 check for overflow*/
+ edge_conn->n_written += (int)info->n_deleted;
+ }
+ }
+}
+
+/** Callback: invoked whenever a bufferevent has read data. */
+void
+connection_handle_read_cb(struct bufferevent *bufev, void *arg)
+{
+ connection_t *conn = arg;
+ (void) bufev;
+ if (!conn->marked_for_close)
+ if (connection_process_inbuf(conn, 1)<0) /* XXXX Always 1? */
+ connection_mark_for_close(conn);
+}
+
+/** Callback: invoked whenever a bufferevent has written data. */
+void
+connection_handle_write_cb(struct bufferevent *bufev, void *arg)
+{
+ connection_t *conn = arg;
+ struct evbuffer *output;
+ if (connection_flushed_some(conn)<0) {
+ connection_mark_for_close(conn);
+ return;
+ }
+
+ output = bufferevent_get_output(bufev);
+ if (!evbuffer_get_length(output)) {
+ connection_finished_flushing(conn);
+ if (conn->marked_for_close && conn->hold_open_until_flushed) {
+ conn->hold_open_until_flushed = 0;
+ if (conn->linked) {
+ /* send eof */
+ bufferevent_flush(conn->bufev, EV_WRITE, BEV_FINISHED);
+ }
+ }
+ }
+}
+
+/** Callback: invoked whenever a bufferevent has had an event (like a
+ * connection, or an eof, or an error) occur. */
+void
+connection_handle_event_cb(struct bufferevent *bufev, short event, void *arg)
+{
+ connection_t *conn = arg;
+ (void) bufev;
+ if (event & BEV_EVENT_CONNECTED) {
+ tor_assert(connection_state_is_connecting(conn));
+ if (connection_finished_connecting(conn)<0)
+ return;
+ }
+ if (event & BEV_EVENT_EOF) {
+ if (!conn->marked_for_close) {
+ conn->inbuf_reached_eof = 1;
+ if (connection_reached_eof(conn)<0)
+ return;
+ }
+ }
+ if (event & BEV_EVENT_ERROR) {
+ int socket_error = evutil_socket_geterror(conn->s);
+ if (conn->type == CONN_TYPE_OR &&
+ conn->state == OR_CONN_STATE_CONNECTING) {
+ connection_or_connect_failed(TO_OR_CONN(conn),
+ errno_to_orconn_end_reason(socket_error),
+ tor_socket_strerror(socket_error));
+ } else if (CONN_IS_EDGE(conn)) {
+ edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
+ if (!edge_conn->edge_has_sent_end)
+ connection_edge_end_errno(edge_conn);
+ if (edge_conn->socks_request) /* broken, don't send a socks reply back */
+ edge_conn->socks_request->has_finished = 1;
+ }
+ connection_close_immediate(conn); /* Connection is dead. */
+ if (!conn->marked_for_close)
+ connection_mark_for_close(conn);
+ }
+}
+
+/** Set up the generic callbacks for the bufferevent on <b>conn</b>. */
+void
+connection_configure_bufferevent_callbacks(connection_t *conn)
+{
+ struct bufferevent *bufev;
+ struct evbuffer *input, *output;
+ tor_assert(conn->bufev);
+ bufev = conn->bufev;
+ bufferevent_setcb(bufev,
+ connection_handle_read_cb,
+ connection_handle_write_cb,
+ connection_handle_event_cb,
+ conn);
+
+ input = bufferevent_get_input(bufev);
+ output = bufferevent_get_output(bufev);
+ evbuffer_add_cb(input, evbuffer_inbuf_callback, conn);
+ evbuffer_add_cb(output, evbuffer_outbuf_callback, conn);
+}
+#endif
+
/** A pass-through to fetch_from_buf. */
int
connection_fetch_from_buf(char *string, size_t len, connection_t *conn)
{
- return fetch_from_buf(string, len, conn->inbuf);
+ IF_HAS_BUFFEREVENT(conn, {
+ /* XXX overflow -seb */
+ return (int)bufferevent_read(conn->bufev, string, len);
+ }) ELSE_IF_NO_BUFFEREVENT {
+ return fetch_from_buf(string, len, conn->inbuf);
+ }
+}
+
+/** As fetch_from_buf_line(), but read from a connection's input buffer. */
+int
+connection_fetch_from_buf_line(connection_t *conn, char *data,
+ size_t *data_len)
+{
+ IF_HAS_BUFFEREVENT(conn, {
+ int r;
+ size_t eol_len=0;
+ struct evbuffer *input = bufferevent_get_input(conn->bufev);
+ struct evbuffer_ptr ptr =
+ evbuffer_search_eol(input, NULL, &eol_len, EVBUFFER_EOL_LF);
+ if (ptr.pos == -1)
+ return 0; /* No EOL found. */
+ if ((size_t)ptr.pos+eol_len >= *data_len) {
+ return -1; /* Too long */
+ }
+ *data_len = ptr.pos+eol_len;
+ r = evbuffer_remove(input, data, ptr.pos+eol_len);
+ tor_assert(r >= 0);
+ data[ptr.pos+eol_len] = '\0';
+ return 1;
+ }) ELSE_IF_NO_BUFFEREVENT {
+ return fetch_from_buf_line(conn->inbuf, data, data_len);
+ }
+}
+
+/** As fetch_from_buf_http, but fetches from a conncetion's input buffer_t or
+ * its bufferevent as appropriate. */
+int
+connection_fetch_from_buf_http(connection_t *conn,
+ char **headers_out, size_t max_headerlen,
+ char **body_out, size_t *body_used,
+ size_t max_bodylen, int force_complete)
+{
+ IF_HAS_BUFFEREVENT(conn, {
+ struct evbuffer *input = bufferevent_get_input(conn->bufev);
+ return fetch_from_evbuffer_http(input, headers_out, max_headerlen,
+ body_out, body_used, max_bodylen, force_complete);
+ }) ELSE_IF_NO_BUFFEREVENT {
+ return fetch_from_buf_http(conn->inbuf, headers_out, max_headerlen,
+ body_out, body_used, max_bodylen, force_complete);
+ }
}
/** Return conn-\>outbuf_flushlen: how many bytes conn wants to flush
@@ -2839,6 +3202,7 @@ connection_handle_write_impl(connection_t *conn, int force)
/* If we just flushed the last bytes, check if this tunneled dir
* request is done. */
+ /* XXXX move this to flushed_some or finished_flushing -NM */
if (buf_datalen(conn->outbuf) == 0 && conn->dirreq_id)
geoip_change_dirreq_state(conn->dirreq_id, DIRREQ_TUNNELED,
DIRREQ_OR_CONN_BUFFER_FLUSHED);
@@ -2894,6 +3258,7 @@ connection_handle_write_impl(connection_t *conn, int force)
if (n_written && conn->type == CONN_TYPE_AP) {
edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
+
/* Check for overflow: */
if (PREDICT_LIKELY(UINT32_MAX - edge_conn->n_written > n_written))
edge_conn->n_written += (int)n_written;
@@ -2970,6 +3335,22 @@ _connection_write_to_buf_impl(const char *string, size_t len,
if (conn->marked_for_close && !conn->hold_open_until_flushed)
return;
+ IF_HAS_BUFFEREVENT(conn, {
+ if (zlib) {
+ int done = zlib < 0;
+ r = write_to_evbuffer_zlib(bufferevent_get_output(conn->bufev),
+ TO_DIR_CONN(conn)->zlib_state,
+ string, len, done);
+ } else {
+ r = bufferevent_write(conn->bufev, string, len);
+ }
+ if (r < 0) {
+ /* XXXX mark for close? */
+ log_warn(LD_NET, "bufferevent_write failed! That shouldn't happen.");
+ }
+ return;
+ });
+
old_datalen = buf_datalen(conn->outbuf);
if (zlib) {
dir_connection_t *dir_conn = TO_DIR_CONN(conn);
@@ -2996,7 +3377,13 @@ _connection_write_to_buf_impl(const char *string, size_t len,
return;
}
- connection_start_writing(conn);
+ /* If we receive optimistic data in the EXIT_CONN_STATE_RESOLVING
+ * state, we don't want to try to write it right away, since
+ * conn->write_event won't be set yet. Otherwise, write data from
+ * this conn as the socket is available. */
+ if (conn->write_event) {
+ connection_start_writing(conn);
+ }
if (zlib) {
conn->outbuf_flushlen += buf_datalen(conn->outbuf) - old_datalen;
} else {
@@ -3142,6 +3529,32 @@ connection_get_by_type_state_rendquery(int type, int state,
return NULL;
}
+/** Return a directory connection (if any one exists) that is fetching
+ * the item described by <b>state</b>/<b>resource</b> */
+dir_connection_t *
+connection_dir_get_by_purpose_and_resource(int purpose,
+ const char *resource)
+{
+ smartlist_t *conns = get_connection_array();
+
+ SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
+ dir_connection_t *dirconn;
+ if (conn->type != CONN_TYPE_DIR || conn->marked_for_close ||
+ conn->purpose != purpose)
+ continue;
+ dirconn = TO_DIR_CONN(conn);
+ if (dirconn->requested_resource == NULL) {
+ if (resource == NULL)
+ return dirconn;
+ } else if (resource) {
+ if (0 == strcmp(resource, dirconn->requested_resource))
+ return dirconn;
+ }
+ } SMARTLIST_FOREACH_END(conn);
+
+ return NULL;
+}
+
/** Return an open, non-marked connection of a given type and purpose, or NULL
* if no such connection exists. */
connection_t *
@@ -3383,6 +3796,9 @@ connection_finished_flushing(connection_t *conn)
// log_fn(LOG_DEBUG,"entered. Socket %u.", conn->s);
+ IF_HAS_NO_BUFFEREVENT(conn)
+ connection_stop_writing(conn);
+
switch (conn->type) {
case CONN_TYPE_OR:
return connection_or_finished_flushing(TO_OR_CONN(conn));
@@ -3508,6 +3924,16 @@ assert_connection_ok(connection_t *conn, time_t now)
tor_assert(conn);
tor_assert(conn->type >= _CONN_TYPE_MIN);
tor_assert(conn->type <= _CONN_TYPE_MAX);
+
+#ifdef USE_BUFFEREVENTS
+ if (conn->bufev) {
+ tor_assert(conn->read_event == NULL);
+ tor_assert(conn->write_event == NULL);
+ tor_assert(conn->inbuf == NULL);
+ tor_assert(conn->outbuf == NULL);
+ }
+#endif
+
switch (conn->type) {
case CONN_TYPE_OR:
tor_assert(conn->magic == OR_CONNECTION_MAGIC);
@@ -3535,8 +3961,15 @@ assert_connection_ok(connection_t *conn, time_t now)
tor_assert(conn->s < 0);
if (conn->outbuf_flushlen > 0) {
- tor_assert(connection_is_writing(conn) || conn->write_blocked_on_bw ||
- (CONN_IS_EDGE(conn) && TO_EDGE_CONN(conn)->edge_blocked_on_circ));
+ /* With optimistic data, we may have queued data in
+ * EXIT_CONN_STATE_RESOLVING while the conn is not yet marked to writing.
+ * */
+ tor_assert((conn->type == CONN_TYPE_EXIT &&
+ conn->state == EXIT_CONN_STATE_RESOLVING) ||
+ connection_is_writing(conn) ||
+ conn->write_blocked_on_bw ||
+ (CONN_IS_EDGE(conn) &&
+ TO_EDGE_CONN(conn)->edge_blocked_on_circ));
}
if (conn->hold_open_until_flushed)
@@ -3546,10 +3979,10 @@ assert_connection_ok(connection_t *conn, time_t now)
* marked_for_close. */
/* buffers */
- if (!connection_is_listener(conn)) {
+ if (conn->inbuf)
assert_buf_ok(conn->inbuf);
+ if (conn->outbuf)
assert_buf_ok(conn->outbuf);
- }
if (conn->type == CONN_TYPE_OR) {
or_connection_t *or_conn = TO_OR_CONN(conn);