summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2007-04-21 17:26:12 +0000
committerNick Mathewson <nickm@torproject.org>2007-04-21 17:26:12 +0000
commit648065fcb4604bcd8abaa86ae1d8c8ef767631df (patch)
treebc0d6a488ee0bb317cdb7bca18a0113b519462b2 /src
parent227b2e0226445d0c4f39572f51f55451f9fa90f3 (diff)
downloadtor-648065fcb4604bcd8abaa86ae1d8c8ef767631df.tar.gz
tor-648065fcb4604bcd8abaa86ae1d8c8ef767631df.zip
r12763@Kushana: nickm | 2007-04-20 18:42:58 -0400
Initial version of code to stop using socket pairs for linked connections. Superficially, it seems to work, but it probably needs a lot more testing and attention. svn:r9995
Diffstat (limited to 'src')
-rw-r--r--src/or/buffers.c26
-rw-r--r--src/or/connection.c128
-rw-r--r--src/or/connection_edge.c64
-rw-r--r--src/or/directory.c11
-rw-r--r--src/or/main.c201
-rw-r--r--src/or/or.h22
-rw-r--r--src/or/test.c36
7 files changed, 376 insertions, 112 deletions
diff --git a/src/or/buffers.c b/src/or/buffers.c
index 2caec58603..40f8a557ce 100644
--- a/src/or/buffers.c
+++ b/src/or/buffers.c
@@ -817,6 +817,32 @@ fetch_from_buf(char *string, size_t string_len, buf_t *buf)
return buf->datalen;
}
+/** Move up to <b>buf_flushlen</b> bytes from <b>buf_in</b> to <b>buf_out</b>.
+ * Return the number of bytes actually copied.
+ */
+int
+move_buf_to_buf(buf_t *buf_out, buf_t *buf_in, size_t *buf_flushlen)
+{
+ char b[4096];
+ size_t cp, len;
+ len = *buf_flushlen;
+ if (len > buf_in->datalen)
+ len = buf_in->datalen;
+
+ cp = len; /* Remember the number of bytes we intend to copy. */
+ while (len) {
+ /* This isn't the most efficient implementation one could imagine, since
+ * it does two copies instead of 1, but I kinda doubt that this will be
+ * critical path. */
+ size_t n = len > sizeof(b) ? sizeof(b) : len;
+ fetch_from_buf(b, n, buf_in);
+ write_to_buf(b, n, buf_out);
+ len -= n;
+ }
+ *buf_flushlen -= cp;
+ return cp;
+}
+
/** There is a (possibly incomplete) http statement on <b>buf</b>, of the
* form "\%s\\r\\n\\r\\n\%s", headers, body. (body may contain nuls.)
* If a) the headers include a Content-Length field and all bytes in
diff --git a/src/or/connection.c b/src/or/connection.c
index 165d1290bf..fd981e5e8a 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -113,6 +113,7 @@ conn_state_to_string(int type, int state)
case DIR_CONN_STATE_CONNECTING: return "connecting";
case DIR_CONN_STATE_CLIENT_SENDING: return "client sending";
case DIR_CONN_STATE_CLIENT_READING: return "client reading";
+ case DIR_CONN_STATE_CLIENT_FINISHED: return "client finished";
case DIR_CONN_STATE_SERVER_COMMAND_WAIT: return "waiting for command";
case DIR_CONN_STATE_SERVER_WRITING: return "writing";
}
@@ -212,9 +213,22 @@ connection_new(int type)
return conn;
}
+/** Create a link between <b>conn_a</b> and <b>conn_b</b> */
+void
+connection_link_connections(connection_t *conn_a, connection_t *conn_b)
+{
+ tor_assert(conn_a->s < 0);
+ tor_assert(conn_b->s < 0);
+
+ conn_a->linked = 1;
+ conn_b->linked = 1;
+ conn_a->linked_conn = conn_b;
+ conn_b->linked_conn = conn_a;
+}
+
/** Tell libevent that we don't care about <b>conn</b> any more. */
void
-connection_unregister(connection_t *conn)
+connection_unregister_events(connection_t *conn)
{
if (conn->read_event) {
if (event_del(conn->read_event))
@@ -260,6 +274,17 @@ _connection_free(connection_t *conn)
break;
}
+ if (conn->linked) {
+ int severity = buf_datalen(conn->inbuf)+buf_datalen(conn->outbuf)
+ ? LOG_NOTICE : LOG_INFO;
+ log_fn(severity, LD_GENERAL, "Freeing linked %s connection [%s] with %d "
+ "bytes on inbuf, %d on outbuf.",
+ conn_type_to_string(conn->type),
+ conn_state_to_string(conn->type, conn->state),
+ (int)buf_datalen(conn->inbuf), (int)buf_datalen(conn->outbuf));
+ // tor_assert(!buf_datalen(conn->outbuf)); /*XXXX020 remove me.*/
+ }
+
if (!connection_is_listener(conn)) {
buf_free(conn->inbuf);
buf_free(conn->outbuf);
@@ -325,6 +350,15 @@ connection_free(connection_t *conn)
tor_assert(conn);
tor_assert(!connection_is_on_closeable_list(conn));
tor_assert(!connection_in_array(conn));
+ if (conn->linked_conn) {
+ log_err(LD_BUG, "Called with conn->linked_conn still set.");
+ tor_fragile_assert();
+ conn->linked_conn->linked_conn = NULL;
+ if (! conn->linked_conn->marked_for_close &&
+ conn->linked_conn->reading_from_linked_conn)
+ connection_start_reading(conn->linked_conn);
+ conn->linked_conn = NULL;
+ }
if (connection_speaks_cells(conn)) {
if (conn->state == OR_CONN_STATE_OPEN)
directory_set_dirty();
@@ -336,7 +370,7 @@ connection_free(connection_t *conn)
TO_CONTROL_CONN(conn)->event_mask = 0;
control_update_global_event_mask();
}
- connection_unregister(conn);
+ connection_unregister_events(conn);
_connection_free(conn);
}
@@ -486,7 +520,7 @@ void
connection_close_immediate(connection_t *conn)
{
assert_connection_ok(conn,0);
- if (conn->s < 0) {
+ if (conn->s < 0 && !conn->linked) {
log_err(LD_BUG,"Attempt to close already-closed connection.");
tor_fragile_assert();
return;
@@ -498,9 +532,10 @@ connection_close_immediate(connection_t *conn)
(int)conn->outbuf_flushlen);
}
- connection_unregister(conn);
+ connection_unregister_events(conn);
- tor_close_socket(conn->s);
+ if (conn->s >= 0)
+ tor_close_socket(conn->s);
conn->s = -1;
if (!connection_is_listener(conn)) {
buf_clear(conn->outbuf);
@@ -529,6 +564,12 @@ _connection_mark_for_close(connection_t *conn, int line, const char *file)
conn->marked_for_close_file = file;
add_connection_to_closeable_list(conn);
+#if 0
+ /* XXXX020 Actually, I don't think this is right. */
+ if (conn->linked_conn && !conn->linked_conn->marked_for_close)
+ _connection_mark_for_close(conn->linked_conn, line, file);
+#endif
+
/* in case we're going to be held-open-til-flushed, reset
* the number of seconds since last successful write, so
* we get our whole 15 seconds */
@@ -1101,11 +1142,14 @@ retry_all_listeners(int force, smartlist_t *replaced_conns,
/** Return 1 if we should apply rate limiting to <b>conn</b>,
* and 0 otherwise. Right now this just checks if it's an internal
- * IP address. */
+ * IP address or an internal connection. */
static int
connection_is_rate_limited(connection_t *conn)
{
- return !is_internal_IP(conn->addr, 0);
+ if (conn->linked || is_internal_IP(conn->addr, 0))
+ return 0;
+ else
+ return 1;
}
extern int global_read_bucket, global_write_bucket;
@@ -1483,6 +1527,7 @@ int
connection_handle_read(connection_t *conn)
{
int max_to_read=-1, try_to_read;
+ size_t before, n_read = 0;
if (conn->marked_for_close)
return 0; /* do nothing */
@@ -1505,6 +1550,8 @@ connection_handle_read(connection_t *conn)
loop_again:
try_to_read = max_to_read;
tor_assert(!conn->marked_for_close);
+
+ before = buf_datalen(conn->inbuf);
if (connection_read_to_buf(conn, &max_to_read) < 0) {
/* There's a read error; kill the connection.*/
connection_close_immediate(conn); /* Don't flush; connection is dead. */
@@ -1517,6 +1564,7 @@ loop_again:
connection_mark_for_close(conn);
return -1;
}
+ n_read += buf_datalen(conn->inbuf) - before;
if (CONN_IS_EDGE(conn) && try_to_read != max_to_read) {
/* instruct it not to try to package partial cells. */
if (connection_process_inbuf(conn, 0) < 0) {
@@ -1533,6 +1581,27 @@ loop_again:
connection_process_inbuf(conn, 1) < 0) {
return -1;
}
+ if (conn->linked_conn) {
+ /* The other side's handle_write will never actually get called, so
+ * we need to invoke the appropriate callbacks ourself. */
+ connection_t *linked = conn->linked_conn;
+ /* XXXX020 Do we need to ensure that this stuff is called even if
+ * conn dies in a way that causes us to return -1 earlier? */
+
+ if (n_read) {
+ /* Probably a no-op, but hey. */
+ connection_buckets_decrement(linked, time(NULL), 0, n_read);
+
+ if (connection_flushed_some(linked) < 0)
+ connection_mark_for_close(linked);
+ if (!connection_wants_to_flush(linked))
+ connection_finished_flushing(linked);
+ }
+
+ if (!buf_datalen(linked->outbuf) && conn->active_on_link)
+ connection_stop_reading_from_linked_conn(conn);
+ }
+ /* If we hit the EOF, call connection_reached_eof. */
if (!conn->marked_for_close &&
conn->inbuf_reached_eof &&
connection_reached_eof(conn) < 0) {
@@ -1541,9 +1610,9 @@ loop_again:
return 0;
}
-/** Pull in new bytes from conn-\>s onto conn-\>inbuf, either
- * directly or via TLS. Reduce the token buckets by the number of
- * bytes read.
+/** Pull in new bytes from conn-\>s or conn-\>linked_conn onto conn-\>inbuf,
+ * either directly or via TLS. Reduce the token buckets by the number of bytes
+ * read.
*
* If *max_to_read is -1, then decide it ourselves, else go with the
* value passed to us. When returning, if it's changed, subtract the
@@ -1633,7 +1702,24 @@ connection_read_to_buf(connection_t *conn, int *max_to_read)
tor_tls_get_n_raw_bytes(or_conn->tls, &n_read, &n_written);
log_debug(LD_GENERAL, "After TLS read of %d: %ld read, %ld written",
result, (long)n_read, (long)n_written);
+ } else if (conn->linked) {
+ if (conn->linked_conn) {
+ result = move_buf_to_buf(conn->inbuf, conn->linked_conn->outbuf,
+ &conn->linked_conn->outbuf_flushlen);
+ } else {
+ result = 0;
+ }
+ //log_notice(LD_GENERAL, "Moved %d bytes on an internal link!", result);
+ /* If the other side has disappeared, or if it's been marked for close and
+ * we flushed its outbuf, then we should set our inbuf_reached_eof. */
+ if (!conn->linked_conn ||
+ (conn->linked_conn->marked_for_close &&
+ buf_datalen(conn->linked_conn->outbuf) == 0))
+ conn->inbuf_reached_eof = 1;
+
+ n_read = (size_t) result;
} else {
+ /* !connection_speaks_cells, !conn->linked_conn. */
int reached_eof = 0;
CONN_LOG_PROTECT(conn,
result = read_to_buf(conn->s, at_most, conn->inbuf, &reached_eof));
@@ -1687,7 +1773,7 @@ connection_fetch_from_buf(char *string, size_t len, connection_t *conn)
int
connection_wants_to_flush(connection_t *conn)
{
- return conn->outbuf_flushlen;
+ return conn->outbuf_flushlen > 0;
}
/** Are there too many bytes on edge connection <b>conn</b>'s outbuf to
@@ -2203,6 +2289,19 @@ connection_state_is_connecting(connection_t *conn)
return 0;
}
+/** DOCDOC */
+int
+connection_should_read_from_linked_conn(connection_t *conn)
+{
+ if (conn->linked && conn->reading_from_linked_conn) {
+ if (! conn->linked_conn ||
+ (conn->linked_conn->writing_to_linked_conn &&
+ buf_datalen(conn->linked_conn->outbuf)))
+ return 1;
+ }
+ return 0;
+}
+
/** Allocates a base64'ed authenticator for use in http or https
* auth, based on the input string <b>authenticator</b>. Returns it
* if success, else returns NULL. */
@@ -2433,6 +2532,13 @@ assert_connection_ok(connection_t *conn, time_t now)
break;
}
+ if (conn->linked_conn) {
+ tor_assert(conn->linked_conn->linked_conn == conn);
+ tor_assert(conn->linked != 0);
+ }
+ if (conn->linked)
+ tor_assert(conn->s < 0);
+
if (conn->outbuf_flushlen > 0) {
tor_assert(connection_is_writing(conn) || conn->wants_to_write ||
conn->edge_blocked_on_circ);
diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c
index 6583b97f1f..0e5aa8fd72 100644
--- a/src/or/connection_edge.c
+++ b/src/or/connection_edge.c
@@ -1860,32 +1860,21 @@ connection_ap_handshake_send_resolve(edge_connection_t *ap_conn)
* and call connection_ap_handshake_attach_circuit(conn) on it.
*
* Return the other end of the socketpair, or -1 if error.
+ *
+ * DOCDOC The above is now wrong; we use links.
+ * DOCDOC start_reading
*/
-int
+edge_connection_t *
connection_ap_make_bridge(char *address, uint16_t port,
const char *digest, int command)
{
- int fd[2];
edge_connection_t *conn;
- int err;
-
- log_info(LD_APP,"Making AP bridge to %s:%d ...",safe_str(address),port);
-
- if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) < 0) {
- log_warn(LD_NET,
- "Couldn't construct socketpair (%s). Network down? Delaying.",
- tor_socket_strerror(-err));
- return -1;
- }
-
- tor_assert(fd[0] >= 0);
- tor_assert(fd[1] >= 0);
- set_socket_nonblocking(fd[0]);
- set_socket_nonblocking(fd[1]);
+ log_notice(LD_APP,"Making internal anonymized tunnel to %s:%d ...",
+ safe_str(address),port); /* XXXX020 Downgrade back to info. */
conn = TO_EDGE_CONN(connection_new(CONN_TYPE_AP));
- conn->_base.s = fd[0];
+ conn->_base.linked = 1; /* so that we can add it safely below. */
/* populate conn->socks_request */
@@ -1903,28 +1892,25 @@ connection_ap_make_bridge(char *address, uint16_t port,
digest, DIGEST_LEN);
}
- conn->_base.address = tor_strdup("(local bridge)");
+ conn->_base.address = tor_strdup("(local bridge)"); /*XXXX020 no "bridge"*/
conn->_base.addr = 0;
conn->_base.port = 0;
if (connection_add(TO_CONN(conn)) < 0) { /* no space, forget it */
- connection_free(TO_CONN(conn)); /* this closes fd[0] */
- tor_close_socket(fd[1]);
- return -1;
+ connection_free(TO_CONN(conn));
+ return NULL;
}
conn->_base.state = AP_CONN_STATE_CIRCUIT_WAIT;
- connection_start_reading(TO_CONN(conn));
/* attaching to a dirty circuit is fine */
if (connection_ap_handshake_attach_circuit(conn) < 0) {
connection_mark_unattached_ap(conn, END_STREAM_REASON_CANT_ATTACH);
- tor_close_socket(fd[1]);
- return -1;
+ return NULL;
}
log_info(LD_APP,"... AP bridge created and connected.");
- return fd[1];
+ return conn;
}
/** Send an answer to an AP connection that has requested a DNS lookup
@@ -2406,37 +2392,19 @@ connection_exit_connect(edge_connection_t *edge_conn)
* back an end cell for). Return -(some circuit end reason) if the circuit
* needs to be torn down. Either connects exit_conn, frees it, or marks it,
* as appropriate.
+ *
+ * DOCDOC no longer uses socketpair
*/
static int
connection_exit_connect_dir(edge_connection_t *exit_conn)
{
- int fd[2];
- int err;
dir_connection_t *dir_conn = NULL;
- log_info(LD_EXIT, "Opening dir bridge");
-
- if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) < 0) {
- log_warn(LD_NET,
- "Couldn't construct socketpair (%s). "
- "Network down? Out of sockets?",
- tor_socket_strerror(-err));
- connection_edge_end(exit_conn, END_STREAM_REASON_RESOURCELIMIT);
- connection_free(TO_CONN(exit_conn));
- return 0;
- }
-
- tor_assert(fd[0] >= 0);
- tor_assert(fd[1] >= 0);
+ log_info(LD_EXIT, "Opening local connection for anonymized directory exit");
- set_socket_nonblocking(fd[0]);
- set_socket_nonblocking(fd[1]);
-
- exit_conn->_base.s = fd[0];
exit_conn->_base.state = EXIT_CONN_STATE_OPEN;
dir_conn = TO_DIR_CONN(connection_new(CONN_TYPE_DIR));
- dir_conn->_base.s = fd[1];
dir_conn->_base.addr = 0x7f000001;
dir_conn->_base.port = 0;
@@ -2445,6 +2413,8 @@ connection_exit_connect_dir(edge_connection_t *exit_conn)
dir_conn->_base.purpose = DIR_PURPOSE_SERVER;
dir_conn->_base.state = DIR_CONN_STATE_SERVER_COMMAND_WAIT;
+ connection_link_connections(TO_CONN(dir_conn), TO_CONN(exit_conn));
+
if (connection_add(TO_CONN(exit_conn))<0) {
connection_edge_end(exit_conn, END_STREAM_REASON_RESOURCELIMIT);
connection_free(TO_CONN(exit_conn));
diff --git a/src/or/directory.c b/src/or/directory.c
index 2c1261bd32..67bfcf8e95 100644
--- a/src/or/directory.c
+++ b/src/or/directory.c
@@ -451,22 +451,25 @@ directory_initiate_command(const char *address, uint32_t addr,
error indicates broken link in windowsland. */
}
} else { /* we want to connect via tor */
+ edge_connection_t *linked_conn;
/* make an AP connection
* populate it and add it at the right state
* socketpair and hook up both sides
*/
conn->dirconn_direct = 0;
- conn->_base.s =
+ linked_conn =
connection_ap_make_bridge(conn->_base.address, conn->_base.port,
digest,
private_connection ?
SOCKS_COMMAND_CONNECT :
SOCKS_COMMAND_CONNECT_DIR);
- if (conn->_base.s < 0) {
+ if (!linked_conn) {
log_warn(LD_NET,"Making AP bridge to dirserver failed.");
connection_mark_for_close(TO_CONN(conn));
+ connection_mark_for_close(TO_CONN(linked_conn));
return;
}
+ connection_link_connections(TO_CONN(conn), TO_CONN(linked_conn));
if (connection_add(TO_CONN(conn)) < 0) {
log_warn(LD_NET,"Unable to add AP bridge to dirserver.");
@@ -478,6 +481,7 @@ directory_initiate_command(const char *address, uint32_t addr,
directory_send_command(conn, purpose, 0, resource,
payload, payload_len);
connection_watch_events(TO_CONN(conn), EV_READ | EV_WRITE);
+ connection_start_reading(TO_CONN(linked_conn));
}
}
@@ -1297,7 +1301,8 @@ connection_dir_reached_eof(dir_connection_t *conn)
{
int retval;
if (conn->_base.state != DIR_CONN_STATE_CLIENT_READING) {
- log_info(LD_HTTP,"conn reached eof, not reading. Closing.");
+ log_info(LD_HTTP,"conn reached eof, not reading. [state=%d] Closing.",
+ conn->_base.state);
connection_close_immediate(TO_CONN(conn)); /* error: give up on flushing */
connection_mark_for_close(TO_CONN(conn));
return -1;
diff --git a/src/or/main.c b/src/or/main.c
index f15b18243b..9e96afce3a 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -74,6 +74,10 @@ static connection_t *connection_array[MAXCONNECTIONS+1] =
/** List of connections that have been marked for close and need to be freed
* and removed from connection_array. */
static smartlist_t *closeable_connection_lst = NULL;
+/** DOCDOC */
+static smartlist_t *active_linked_connection_lst = NULL;
+/** DOCDOC */
+static int called_loop_once = 0;
static int n_conns=0; /**< Number of connections currently active. */
@@ -155,7 +159,7 @@ int
connection_add(connection_t *conn)
{
tor_assert(conn);
- tor_assert(conn->s >= 0);
+ tor_assert(conn->s >= 0 || conn->linked);
tor_assert(conn->conn_array_index == -1); /* can only connection_add once */
if (n_conns == MAXCONNECTIONS) {
@@ -198,13 +202,12 @@ connection_remove(connection_t *conn)
tor_assert(conn->conn_array_index >= 0);
current_index = conn->conn_array_index;
+ connection_unregister_events(conn); /* This is redundant, but cheap. */
if (current_index == n_conns-1) { /* this is the end */
n_conns--;
return 0;
}
- connection_unregister(conn);
-
/* replace this one with the one at the end */
n_conns--;
connection_array[current_index] = connection_array[n_conns];
@@ -213,23 +216,31 @@ connection_remove(connection_t *conn)
return 0;
}
-/** If it's an edge conn, remove it from the list
+/** If <b>conn</b> is an edge conn, remove it from the list
* of conn's on this circuit. If it's not on an edge,
* flush and send destroys for all circuits on this conn.
*
- * If <b>remove</b> is non-zero, then remove it from the
- * connection_array and closeable_connection_lst.
+ * Remove it from connection_array (if applicable) and
+ * from closeable_connection_list.
*
* Then free it.
*/
static void
-connection_unlink(connection_t *conn, int remove)
+connection_unlink(connection_t *conn)
{
connection_about_to_close_connection(conn);
- if (remove) {
+ if (conn->conn_array_index >= 0) {
connection_remove(conn);
}
+ if (conn->linked_conn) {
+ conn->linked_conn->linked_conn = NULL;
+ if (! conn->linked_conn->marked_for_close &&
+ conn->linked_conn->reading_from_linked_conn)
+ connection_start_reading(conn->linked_conn);
+ conn->linked_conn = NULL;
+ }
smartlist_remove(closeable_connection_lst, conn);
+ smartlist_remove(active_linked_connection_lst, conn);
if (conn->type == CONN_TYPE_EXIT) {
assert_connection_edge_not_dns_pending(TO_EDGE_CONN(conn));
}
@@ -286,16 +297,23 @@ get_connection_array(connection_t ***array, int *n)
void
connection_watch_events(connection_t *conn, short events)
{
- int r;
+ int r = 0;
tor_assert(conn);
tor_assert(conn->read_event);
tor_assert(conn->write_event);
- if (events & EV_READ) {
- r = event_add(conn->read_event, NULL);
+ if (conn->linked) {
+ if (events & EV_READ)
+ connection_start_reading(conn);
+ else
+ connection_stop_reading(conn);
} else {
- r = event_del(conn->read_event);
+ if (events & EV_READ) {
+ r = event_add(conn->read_event, NULL);
+ } else {
+ r = event_del(conn->read_event);
+ }
}
if (r<0)
@@ -305,10 +323,17 @@ connection_watch_events(connection_t *conn, short events)
conn->s, (events & EV_READ)?"":"un",
tor_socket_strerror(tor_socket_errno(conn->s)));
- if (events & EV_WRITE) {
- r = event_add(conn->write_event, NULL);
+ if (conn->linked) {
+ if (events & EV_WRITE)
+ connection_start_writing(conn);
+ else
+ connection_stop_writing(conn);
} else {
- r = event_del(conn->write_event);
+ if (events & EV_WRITE) {
+ r = event_add(conn->write_event, NULL);
+ } else {
+ r = event_del(conn->write_event);
+ }
}
if (r<0)
@@ -325,7 +350,8 @@ connection_is_reading(connection_t *conn)
{
tor_assert(conn);
- return conn->read_event && event_pending(conn->read_event, EV_READ, NULL);
+ return conn->reading_from_linked_conn ||
+ (conn->read_event && event_pending(conn->read_event, EV_READ, NULL));
}
/** Tell the main loop to stop notifying <b>conn</b> of any read events. */
@@ -335,12 +361,16 @@ connection_stop_reading(connection_t *conn)
tor_assert(conn);
tor_assert(conn->read_event);
- log_debug(LD_NET,"entering.");
- if (event_del(conn->read_event))
- log_warn(LD_NET, "Error from libevent setting read event state for %d "
- "to unwatched: %s",
- conn->s,
- tor_socket_strerror(tor_socket_errno(conn->s)));
+ if (conn->linked) {
+ conn->reading_from_linked_conn = 0;
+ connection_stop_reading_from_linked_conn(conn);
+ } else {
+ if (event_del(conn->read_event))
+ log_warn(LD_NET, "Error from libevent setting read event state for %d "
+ "to unwatched: %s",
+ conn->s,
+ tor_socket_strerror(tor_socket_errno(conn->s)));
+ }
}
/** Tell the main loop to start notifying <b>conn</b> of any read events. */
@@ -350,11 +380,17 @@ connection_start_reading(connection_t *conn)
tor_assert(conn);
tor_assert(conn->read_event);
- if (event_add(conn->read_event, NULL))
- log_warn(LD_NET, "Error from libevent setting read event state for %d "
- "to watched: %s",
- conn->s,
- tor_socket_strerror(tor_socket_errno(conn->s)));
+ if (conn->linked) {
+ conn->reading_from_linked_conn = 1;
+ if (connection_should_read_from_linked_conn(conn))
+ connection_start_reading_from_linked_conn(conn);
+ } else {
+ if (event_add(conn->read_event, NULL))
+ log_warn(LD_NET, "Error from libevent setting read event state for %d "
+ "to watched: %s",
+ conn->s,
+ tor_socket_strerror(tor_socket_errno(conn->s)));
+ }
}
/** Return true iff <b>conn</b> is listening for write events. */
@@ -363,7 +399,8 @@ connection_is_writing(connection_t *conn)
{
tor_assert(conn);
- return conn->write_event && event_pending(conn->write_event, EV_WRITE, NULL);
+ return conn->writing_to_linked_conn ||
+ (conn->write_event && event_pending(conn->write_event, EV_WRITE, NULL));
}
/** Tell the main loop to stop notifying <b>conn</b> of any write events. */
@@ -373,11 +410,17 @@ connection_stop_writing(connection_t *conn)
tor_assert(conn);
tor_assert(conn->write_event);
- if (event_del(conn->write_event))
- log_warn(LD_NET, "Error from libevent setting write event state for %d "
- "to unwatched: %s",
- conn->s,
- tor_socket_strerror(tor_socket_errno(conn->s)));
+ if (conn->linked) {
+ conn->writing_to_linked_conn = 0;
+ if (conn->linked_conn)
+ connection_stop_reading_from_linked_conn(conn->linked_conn);
+ } else {
+ if (event_del(conn->write_event))
+ log_warn(LD_NET, "Error from libevent setting write event state for %d "
+ "to unwatched: %s",
+ conn->s,
+ tor_socket_strerror(tor_socket_errno(conn->s)));
+ }
}
/** Tell the main loop to start notifying <b>conn</b> of any write events. */
@@ -387,11 +430,58 @@ connection_start_writing(connection_t *conn)
tor_assert(conn);
tor_assert(conn->write_event);
- if (event_add(conn->write_event, NULL))
- log_warn(LD_NET, "Error from libevent setting write event state for %d "
- "to watched: %s",
- conn->s,
- tor_socket_strerror(tor_socket_errno(conn->s)));
+ if (conn->linked) {
+ conn->writing_to_linked_conn = 1;
+ if (conn->linked_conn &&
+ connection_should_read_from_linked_conn(conn->linked_conn))
+ connection_start_reading_from_linked_conn(conn->linked_conn);
+ } else {
+ if (event_add(conn->write_event, NULL))
+ log_warn(LD_NET, "Error from libevent setting write event state for %d "
+ "to watched: %s",
+ conn->s,
+ tor_socket_strerror(tor_socket_errno(conn->s)));
+ }
+}
+
+/** DOCDOC*/
+void
+connection_start_reading_from_linked_conn(connection_t *conn)
+{
+ tor_assert(conn);
+ tor_assert(conn->linked == 1);
+
+ if (!conn->active_on_link) {
+ conn->active_on_link = 1;
+ smartlist_add(active_linked_connection_lst, conn);
+ if (!called_loop_once) {
+ /* This is the first event on the list; we won't be in LOOP_ONCE mode,
+ * so we need to make sure that the event_loop() actually exits at the
+ * end of its run through the current connections and
+ * lets us activate read events for linked connections. */
+ struct timeval tv = { 0, 0 };
+ event_loopexit(&tv);
+ }
+ } else {
+ tor_assert(smartlist_isin(active_linked_connection_lst, conn));
+ }
+}
+
+/** DOCDOC*/
+void
+connection_stop_reading_from_linked_conn(connection_t *conn)
+{
+ tor_assert(conn);
+ tor_assert(conn->linked == 1);
+
+ if (conn->active_on_link) {
+ conn->active_on_link = 0;
+ /* XXXX020 maybe we should keep an index here so we can smartlist_del
+ * cleanly. */
+ smartlist_remove(active_linked_connection_lst, conn);
+ } else {
+ tor_assert(!smartlist_isin(active_linked_connection_lst, conn));
+ }
}
/** Close all connections that have been scheduled to get closed */
@@ -402,7 +492,7 @@ close_closeable_connections(void)
for (i = 0; i < smartlist_len(closeable_connection_lst); ) {
connection_t *conn = smartlist_get(closeable_connection_lst, i);
if (conn->conn_array_index < 0) {
- connection_unlink(conn, 0); /* blow it away right now */
+ connection_unlink(conn); /* blow it away right now */
} else {
if (!conn_close_if_marked(conn->conn_array_index))
++i;
@@ -500,7 +590,7 @@ conn_close_if_marked(int i)
assert_all_pending_dns_resolves_ok();
log_debug(LD_NET,"Cleaning up connection (fd %d).",conn->s);
- if (conn->s >= 0 && connection_wants_to_flush(conn)) {
+ if ((conn->s >= 0 || conn->linked_conn) && connection_wants_to_flush(conn)) {
/* s == -1 means it's an incomplete edge connection, or that the socket
* has already been closed as unflushable. */
int sz = connection_bucket_write_limit(conn);
@@ -512,7 +602,21 @@ conn_close_if_marked(int i)
conn->s, conn_type_to_string(conn->type), conn->state,
(int)conn->outbuf_flushlen,
conn->marked_for_close_file, conn->marked_for_close);
- if (connection_speaks_cells(conn)) {
+ if (conn->linked_conn) {
+ retval = move_buf_to_buf(conn->linked_conn->inbuf, conn->outbuf,
+ &conn->outbuf_flushlen);
+ if (retval >= 0) {
+ /* The linked conn will notice that it has data when it notices that
+ * we're gone. */
+ connection_start_reading_from_linked_conn(conn->linked_conn);
+ }
+ /* XXXX020 Downgrade to debug. */
+ log_info(LD_GENERAL, "Flushed last %d bytes from a linked conn; "
+ "%d left; flushlen %d; wants-to-flush==%d", retval,
+ (int)buf_datalen(conn->outbuf),
+ (int)conn->outbuf_flushlen,
+ connection_wants_to_flush(conn));
+ } else if (connection_speaks_cells(conn)) {
if (conn->state == OR_CONN_STATE_OPEN) {
retval = flush_buf_tls(TO_OR_CONN(conn)->tls, conn->outbuf, sz,
&conn->outbuf_flushlen);
@@ -553,7 +657,7 @@ conn_close_if_marked(int i)
conn->marked_for_close);
}
}
- connection_unlink(conn, 1); /* unlink, remove, free */
+ connection_unlink(conn); /* unlink, remove, free */
return 1;
}
@@ -1270,8 +1374,14 @@ do_main_loop(void)
/* Make it easier to tell whether libevent failure is our fault or not. */
errno = 0;
#endif
- /* poll until we have an event, or the second ends */
- loop_result = event_dispatch();
+ /* All active linked conns should get their read events activated. */
+ SMARTLIST_FOREACH(active_linked_connection_lst, connection_t *, conn,
+ event_active(conn->read_event, EV_READ, 1));
+ called_loop_once = smartlist_len(active_linked_connection_lst) ? 1 : 0;
+
+ /* poll until we have an event, or the second ends, or until we have
+ * some active linked connections to triggger events for. */
+ loop_result = event_loop(called_loop_once ? EVLOOP_ONCE : 0);
/* let catch() handle things like ^c, and otherwise don't worry about it */
if (loop_result < 0) {
@@ -1601,6 +1711,8 @@ tor_init(int argc, char *argv[])
time_of_process_start = time(NULL);
if (!closeable_connection_lst)
closeable_connection_lst = smartlist_create();
+ if (!active_linked_connection_lst)
+ active_linked_connection_lst = smartlist_create();
/* Initialize the history structures. */
rep_hist_init();
/* Initialize the service cache. */
@@ -1673,6 +1785,7 @@ tor_free_all(int postfork)
tor_tls_free_all();
/* stuff in main.c */
smartlist_free(closeable_connection_lst);
+ smartlist_free(active_linked_connection_lst);
tor_free(timeout_event);
/* Stuff in util.c */
escaped(NULL);
diff --git a/src/or/or.h b/src/or/or.h
index 1fbcb1b7aa..a090f2e0ce 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -748,6 +748,7 @@ typedef struct connection_t {
/* The next fields are all one-bit booleans. Some are only applicable
* to connection subtypes, but we hold them here anyway, to save space.
* (Currently, they all fit into a single byte.) */
+ /*XXXX020 rename wants_to_*; the names are misleading. */
unsigned wants_to_read:1; /**< Boolean: should we start reading again once
* the bandwidth throttler allows it? */
unsigned wants_to_write:1; /**< Boolean: should we start writing again once
@@ -771,7 +772,7 @@ typedef struct connection_t {
unsigned int chosen_exit_optional:1;
int s; /**< Our socket; -1 if this connection is closed, or has no
- * sockets. */
+ * socket. */
int conn_array_index; /**< Index into the global connection array. */
struct event *read_event; /**< Libevent event structure. */
struct event *write_event; /**< Libevent event structure. */
@@ -797,6 +798,13 @@ typedef struct connection_t {
* we marked for close? */
char *address; /**< FQDN (or IP) of the guy on the other end.
* strdup into this, because free_connection frees it. */
+ /** Annother connection that's connected to this one in lieu of a socket. */
+ struct connection_t *linked_conn;
+ /* XXXX020 NM move these up to the other 1-bit flags. */
+ unsigned int linked:1; /**< True if there is, or has been, a linked_conn. */
+ unsigned int reading_from_linked_conn:1; /**DOCDOC*/
+ unsigned int writing_to_linked_conn:1; /**DOCDOC*/
+ unsigned int active_on_link:1; /**DOCDOC*/
} connection_t;
@@ -1967,6 +1975,7 @@ int flush_buf_tls(tor_tls_t *tls, buf_t *buf, size_t sz, size_t *buf_flushlen);
int write_to_buf(const char *string, size_t string_len, buf_t *buf);
int write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state,
const char *data, size_t data_len, int done);
+int move_buf_to_buf(buf_t *buf_out, buf_t *buf_in, size_t *buf_flushlen);
int fetch_from_buf(char *string, size_t string_len, buf_t *buf);
int fetch_from_buf_http(buf_t *buf,
char **headers_out, size_t max_headerlen,
@@ -2163,7 +2172,8 @@ const char *conn_type_to_string(int type);
const char *conn_state_to_string(int type, int state);
connection_t *connection_new(int type);
-void connection_unregister(connection_t *conn);
+void connection_link_connections(connection_t *conn_a, connection_t *conn_b);
+void connection_unregister_events(connection_t *conn);
void connection_free(connection_t *conn);
void connection_free_all(void);
void connection_about_to_close_connection(connection_t *conn);
@@ -2227,6 +2237,7 @@ connection_t *connection_get_by_type_state_rendquery(int type, int state,
int connection_is_listener(connection_t *conn);
int connection_state_is_open(connection_t *conn);
int connection_state_is_connecting(connection_t *conn);
+int connection_should_read_from_linked_conn(connection_t *conn);
char *alloc_http_authenticator(const char *authenticator);
@@ -2252,8 +2263,8 @@ int connection_edge_finished_connecting(edge_connection_t *conn);
int connection_ap_handshake_send_begin(edge_connection_t *ap_conn);
int connection_ap_handshake_send_resolve(edge_connection_t *ap_conn);
-int connection_ap_make_bridge(char *address, uint16_t port,
- const char *digest, int command);
+edge_connection_t *connection_ap_make_bridge(char *address, uint16_t port,
+ const char *digest, int command);
void connection_ap_handshake_socks_reply(edge_connection_t *conn, char *reply,
size_t replylen,
int endreason);
@@ -2580,6 +2591,9 @@ int connection_is_writing(connection_t *conn);
void connection_stop_writing(connection_t *conn);
void connection_start_writing(connection_t *conn);
+void connection_stop_reading_from_linked_conn(connection_t *conn);
+void connection_start_reading_from_linked_conn(connection_t *conn);
+
void directory_all_unreachable(time_t now);
void directory_info_has_arrived(time_t now, int from_cache);
diff --git a/src/or/test.c b/src/or/test.c
index a03f17134e..21916c8a0e 100644
--- a/src/or/test.c
+++ b/src/or/test.c
@@ -111,9 +111,10 @@ test_buffers(void)
char str[256];
char str2[256];
- buf_t *buf;
+ buf_t *buf, *buf2;
int j;
+ size_t r;
/****
* buf_new
@@ -218,6 +219,37 @@ test_buffers(void)
test_memeq(str2, str, 255);
}
+ /* Move from buf to buf. */
+ buf_free(buf);
+ buf = buf_new_with_capacity(4096);
+ buf2 = buf_new_with_capacity(4096);
+ for (j=0;j<100;++j)
+ write_to_buf(str, 255, buf);
+ test_eq(buf_datalen(buf), 25500);
+ for (j=0;j<100;++j) {
+ r = 10;
+ move_buf_to_buf(buf2, buf, &r);
+ test_eq(r, 0);
+ }
+ test_eq(buf_datalen(buf), 24500);
+ test_eq(buf_datalen(buf2), 1000);
+ for (j=0;j<3;++j) {
+ fetch_from_buf(str2, 255, buf2);
+ test_memeq(str2, str, 255);
+ }
+ r = 8192; /*big move*/
+ move_buf_to_buf(buf2, buf, &r);
+ test_eq(r, 0);
+ r = 30000; /* incomplete move */
+ move_buf_to_buf(buf2, buf, &r);
+ test_eq(r, 13692);
+ for (j=0;j<97;++j) {
+ fetch_from_buf(str2, 255, buf2);
+ test_memeq(str2, str, 255);
+ }
+ buf_free(buf);
+ buf_free(buf2);
+
#if 0
{
int s;
@@ -285,8 +317,6 @@ test_buffers(void)
test_eq(eof, 1);
}
#endif
-
- buf_free(buf);
}
static void