diff options
author | Nick Mathewson <nickm@torproject.org> | 2007-04-21 17:26:12 +0000 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2007-04-21 17:26:12 +0000 |
commit | 648065fcb4604bcd8abaa86ae1d8c8ef767631df (patch) | |
tree | bc0d6a488ee0bb317cdb7bca18a0113b519462b2 /src | |
parent | 227b2e0226445d0c4f39572f51f55451f9fa90f3 (diff) | |
download | tor-648065fcb4604bcd8abaa86ae1d8c8ef767631df.tar.gz tor-648065fcb4604bcd8abaa86ae1d8c8ef767631df.zip |
r12763@Kushana: nickm | 2007-04-20 18:42:58 -0400
Initial version of code to stop using socket pairs for linked connections. Superficially, it seems to work, but it probably needs a lot more testing and attention.
svn:r9995
Diffstat (limited to 'src')
-rw-r--r-- | src/or/buffers.c | 26 | ||||
-rw-r--r-- | src/or/connection.c | 128 | ||||
-rw-r--r-- | src/or/connection_edge.c | 64 | ||||
-rw-r--r-- | src/or/directory.c | 11 | ||||
-rw-r--r-- | src/or/main.c | 201 | ||||
-rw-r--r-- | src/or/or.h | 22 | ||||
-rw-r--r-- | src/or/test.c | 36 |
7 files changed, 376 insertions, 112 deletions
diff --git a/src/or/buffers.c b/src/or/buffers.c index 2caec58603..40f8a557ce 100644 --- a/src/or/buffers.c +++ b/src/or/buffers.c @@ -817,6 +817,32 @@ fetch_from_buf(char *string, size_t string_len, buf_t *buf) return buf->datalen; } +/** Move up to <b>buf_flushlen</b> bytes from <b>buf_in</b> to <b>buf_out</b>. + * Return the number of bytes actually copied. + */ +int +move_buf_to_buf(buf_t *buf_out, buf_t *buf_in, size_t *buf_flushlen) +{ + char b[4096]; + size_t cp, len; + len = *buf_flushlen; + if (len > buf_in->datalen) + len = buf_in->datalen; + + cp = len; /* Remember the number of bytes we intend to copy. */ + while (len) { + /* This isn't the most efficient implementation one could imagine, since + * it does two copies instead of 1, but I kinda doubt that this will be + * critical path. */ + size_t n = len > sizeof(b) ? sizeof(b) : len; + fetch_from_buf(b, n, buf_in); + write_to_buf(b, n, buf_out); + len -= n; + } + *buf_flushlen -= cp; + return cp; +} + /** There is a (possibly incomplete) http statement on <b>buf</b>, of the * form "\%s\\r\\n\\r\\n\%s", headers, body. (body may contain nuls.) * If a) the headers include a Content-Length field and all bytes in diff --git a/src/or/connection.c b/src/or/connection.c index 165d1290bf..fd981e5e8a 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -113,6 +113,7 @@ conn_state_to_string(int type, int state) case DIR_CONN_STATE_CONNECTING: return "connecting"; case DIR_CONN_STATE_CLIENT_SENDING: return "client sending"; case DIR_CONN_STATE_CLIENT_READING: return "client reading"; + case DIR_CONN_STATE_CLIENT_FINISHED: return "client finished"; case DIR_CONN_STATE_SERVER_COMMAND_WAIT: return "waiting for command"; case DIR_CONN_STATE_SERVER_WRITING: return "writing"; } @@ -212,9 +213,22 @@ connection_new(int type) return conn; } +/** Create a link between <b>conn_a</b> and <b>conn_b</b> */ +void +connection_link_connections(connection_t *conn_a, connection_t *conn_b) +{ + tor_assert(conn_a->s < 0); + tor_assert(conn_b->s < 0); + + conn_a->linked = 1; + conn_b->linked = 1; + conn_a->linked_conn = conn_b; + conn_b->linked_conn = conn_a; +} + /** Tell libevent that we don't care about <b>conn</b> any more. */ void -connection_unregister(connection_t *conn) +connection_unregister_events(connection_t *conn) { if (conn->read_event) { if (event_del(conn->read_event)) @@ -260,6 +274,17 @@ _connection_free(connection_t *conn) break; } + if (conn->linked) { + int severity = buf_datalen(conn->inbuf)+buf_datalen(conn->outbuf) + ? LOG_NOTICE : LOG_INFO; + log_fn(severity, LD_GENERAL, "Freeing linked %s connection [%s] with %d " + "bytes on inbuf, %d on outbuf.", + conn_type_to_string(conn->type), + conn_state_to_string(conn->type, conn->state), + (int)buf_datalen(conn->inbuf), (int)buf_datalen(conn->outbuf)); + // tor_assert(!buf_datalen(conn->outbuf)); /*XXXX020 remove me.*/ + } + if (!connection_is_listener(conn)) { buf_free(conn->inbuf); buf_free(conn->outbuf); @@ -325,6 +350,15 @@ connection_free(connection_t *conn) tor_assert(conn); tor_assert(!connection_is_on_closeable_list(conn)); tor_assert(!connection_in_array(conn)); + if (conn->linked_conn) { + log_err(LD_BUG, "Called with conn->linked_conn still set."); + tor_fragile_assert(); + conn->linked_conn->linked_conn = NULL; + if (! conn->linked_conn->marked_for_close && + conn->linked_conn->reading_from_linked_conn) + connection_start_reading(conn->linked_conn); + conn->linked_conn = NULL; + } if (connection_speaks_cells(conn)) { if (conn->state == OR_CONN_STATE_OPEN) directory_set_dirty(); @@ -336,7 +370,7 @@ connection_free(connection_t *conn) TO_CONTROL_CONN(conn)->event_mask = 0; control_update_global_event_mask(); } - connection_unregister(conn); + connection_unregister_events(conn); _connection_free(conn); } @@ -486,7 +520,7 @@ void connection_close_immediate(connection_t *conn) { assert_connection_ok(conn,0); - if (conn->s < 0) { + if (conn->s < 0 && !conn->linked) { log_err(LD_BUG,"Attempt to close already-closed connection."); tor_fragile_assert(); return; @@ -498,9 +532,10 @@ connection_close_immediate(connection_t *conn) (int)conn->outbuf_flushlen); } - connection_unregister(conn); + connection_unregister_events(conn); - tor_close_socket(conn->s); + if (conn->s >= 0) + tor_close_socket(conn->s); conn->s = -1; if (!connection_is_listener(conn)) { buf_clear(conn->outbuf); @@ -529,6 +564,12 @@ _connection_mark_for_close(connection_t *conn, int line, const char *file) conn->marked_for_close_file = file; add_connection_to_closeable_list(conn); +#if 0 + /* XXXX020 Actually, I don't think this is right. */ + if (conn->linked_conn && !conn->linked_conn->marked_for_close) + _connection_mark_for_close(conn->linked_conn, line, file); +#endif + /* in case we're going to be held-open-til-flushed, reset * the number of seconds since last successful write, so * we get our whole 15 seconds */ @@ -1101,11 +1142,14 @@ retry_all_listeners(int force, smartlist_t *replaced_conns, /** Return 1 if we should apply rate limiting to <b>conn</b>, * and 0 otherwise. Right now this just checks if it's an internal - * IP address. */ + * IP address or an internal connection. */ static int connection_is_rate_limited(connection_t *conn) { - return !is_internal_IP(conn->addr, 0); + if (conn->linked || is_internal_IP(conn->addr, 0)) + return 0; + else + return 1; } extern int global_read_bucket, global_write_bucket; @@ -1483,6 +1527,7 @@ int connection_handle_read(connection_t *conn) { int max_to_read=-1, try_to_read; + size_t before, n_read = 0; if (conn->marked_for_close) return 0; /* do nothing */ @@ -1505,6 +1550,8 @@ connection_handle_read(connection_t *conn) loop_again: try_to_read = max_to_read; tor_assert(!conn->marked_for_close); + + before = buf_datalen(conn->inbuf); if (connection_read_to_buf(conn, &max_to_read) < 0) { /* There's a read error; kill the connection.*/ connection_close_immediate(conn); /* Don't flush; connection is dead. */ @@ -1517,6 +1564,7 @@ loop_again: connection_mark_for_close(conn); return -1; } + n_read += buf_datalen(conn->inbuf) - before; if (CONN_IS_EDGE(conn) && try_to_read != max_to_read) { /* instruct it not to try to package partial cells. */ if (connection_process_inbuf(conn, 0) < 0) { @@ -1533,6 +1581,27 @@ loop_again: connection_process_inbuf(conn, 1) < 0) { return -1; } + if (conn->linked_conn) { + /* The other side's handle_write will never actually get called, so + * we need to invoke the appropriate callbacks ourself. */ + connection_t *linked = conn->linked_conn; + /* XXXX020 Do we need to ensure that this stuff is called even if + * conn dies in a way that causes us to return -1 earlier? */ + + if (n_read) { + /* Probably a no-op, but hey. */ + connection_buckets_decrement(linked, time(NULL), 0, n_read); + + if (connection_flushed_some(linked) < 0) + connection_mark_for_close(linked); + if (!connection_wants_to_flush(linked)) + connection_finished_flushing(linked); + } + + if (!buf_datalen(linked->outbuf) && conn->active_on_link) + connection_stop_reading_from_linked_conn(conn); + } + /* If we hit the EOF, call connection_reached_eof. */ if (!conn->marked_for_close && conn->inbuf_reached_eof && connection_reached_eof(conn) < 0) { @@ -1541,9 +1610,9 @@ loop_again: return 0; } -/** Pull in new bytes from conn-\>s onto conn-\>inbuf, either - * directly or via TLS. Reduce the token buckets by the number of - * bytes read. +/** Pull in new bytes from conn-\>s or conn-\>linked_conn onto conn-\>inbuf, + * either directly or via TLS. Reduce the token buckets by the number of bytes + * read. * * If *max_to_read is -1, then decide it ourselves, else go with the * value passed to us. When returning, if it's changed, subtract the @@ -1633,7 +1702,24 @@ connection_read_to_buf(connection_t *conn, int *max_to_read) tor_tls_get_n_raw_bytes(or_conn->tls, &n_read, &n_written); log_debug(LD_GENERAL, "After TLS read of %d: %ld read, %ld written", result, (long)n_read, (long)n_written); + } else if (conn->linked) { + if (conn->linked_conn) { + result = move_buf_to_buf(conn->inbuf, conn->linked_conn->outbuf, + &conn->linked_conn->outbuf_flushlen); + } else { + result = 0; + } + //log_notice(LD_GENERAL, "Moved %d bytes on an internal link!", result); + /* If the other side has disappeared, or if it's been marked for close and + * we flushed its outbuf, then we should set our inbuf_reached_eof. */ + if (!conn->linked_conn || + (conn->linked_conn->marked_for_close && + buf_datalen(conn->linked_conn->outbuf) == 0)) + conn->inbuf_reached_eof = 1; + + n_read = (size_t) result; } else { + /* !connection_speaks_cells, !conn->linked_conn. */ int reached_eof = 0; CONN_LOG_PROTECT(conn, result = read_to_buf(conn->s, at_most, conn->inbuf, &reached_eof)); @@ -1687,7 +1773,7 @@ connection_fetch_from_buf(char *string, size_t len, connection_t *conn) int connection_wants_to_flush(connection_t *conn) { - return conn->outbuf_flushlen; + return conn->outbuf_flushlen > 0; } /** Are there too many bytes on edge connection <b>conn</b>'s outbuf to @@ -2203,6 +2289,19 @@ connection_state_is_connecting(connection_t *conn) return 0; } +/** DOCDOC */ +int +connection_should_read_from_linked_conn(connection_t *conn) +{ + if (conn->linked && conn->reading_from_linked_conn) { + if (! conn->linked_conn || + (conn->linked_conn->writing_to_linked_conn && + buf_datalen(conn->linked_conn->outbuf))) + return 1; + } + return 0; +} + /** Allocates a base64'ed authenticator for use in http or https * auth, based on the input string <b>authenticator</b>. Returns it * if success, else returns NULL. */ @@ -2433,6 +2532,13 @@ assert_connection_ok(connection_t *conn, time_t now) break; } + if (conn->linked_conn) { + tor_assert(conn->linked_conn->linked_conn == conn); + tor_assert(conn->linked != 0); + } + if (conn->linked) + tor_assert(conn->s < 0); + if (conn->outbuf_flushlen > 0) { tor_assert(connection_is_writing(conn) || conn->wants_to_write || conn->edge_blocked_on_circ); diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c index 6583b97f1f..0e5aa8fd72 100644 --- a/src/or/connection_edge.c +++ b/src/or/connection_edge.c @@ -1860,32 +1860,21 @@ connection_ap_handshake_send_resolve(edge_connection_t *ap_conn) * and call connection_ap_handshake_attach_circuit(conn) on it. * * Return the other end of the socketpair, or -1 if error. + * + * DOCDOC The above is now wrong; we use links. + * DOCDOC start_reading */ -int +edge_connection_t * connection_ap_make_bridge(char *address, uint16_t port, const char *digest, int command) { - int fd[2]; edge_connection_t *conn; - int err; - - log_info(LD_APP,"Making AP bridge to %s:%d ...",safe_str(address),port); - - if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) < 0) { - log_warn(LD_NET, - "Couldn't construct socketpair (%s). Network down? Delaying.", - tor_socket_strerror(-err)); - return -1; - } - - tor_assert(fd[0] >= 0); - tor_assert(fd[1] >= 0); - set_socket_nonblocking(fd[0]); - set_socket_nonblocking(fd[1]); + log_notice(LD_APP,"Making internal anonymized tunnel to %s:%d ...", + safe_str(address),port); /* XXXX020 Downgrade back to info. */ conn = TO_EDGE_CONN(connection_new(CONN_TYPE_AP)); - conn->_base.s = fd[0]; + conn->_base.linked = 1; /* so that we can add it safely below. */ /* populate conn->socks_request */ @@ -1903,28 +1892,25 @@ connection_ap_make_bridge(char *address, uint16_t port, digest, DIGEST_LEN); } - conn->_base.address = tor_strdup("(local bridge)"); + conn->_base.address = tor_strdup("(local bridge)"); /*XXXX020 no "bridge"*/ conn->_base.addr = 0; conn->_base.port = 0; if (connection_add(TO_CONN(conn)) < 0) { /* no space, forget it */ - connection_free(TO_CONN(conn)); /* this closes fd[0] */ - tor_close_socket(fd[1]); - return -1; + connection_free(TO_CONN(conn)); + return NULL; } conn->_base.state = AP_CONN_STATE_CIRCUIT_WAIT; - connection_start_reading(TO_CONN(conn)); /* attaching to a dirty circuit is fine */ if (connection_ap_handshake_attach_circuit(conn) < 0) { connection_mark_unattached_ap(conn, END_STREAM_REASON_CANT_ATTACH); - tor_close_socket(fd[1]); - return -1; + return NULL; } log_info(LD_APP,"... AP bridge created and connected."); - return fd[1]; + return conn; } /** Send an answer to an AP connection that has requested a DNS lookup @@ -2406,37 +2392,19 @@ connection_exit_connect(edge_connection_t *edge_conn) * back an end cell for). Return -(some circuit end reason) if the circuit * needs to be torn down. Either connects exit_conn, frees it, or marks it, * as appropriate. + * + * DOCDOC no longer uses socketpair */ static int connection_exit_connect_dir(edge_connection_t *exit_conn) { - int fd[2]; - int err; dir_connection_t *dir_conn = NULL; - log_info(LD_EXIT, "Opening dir bridge"); - - if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) < 0) { - log_warn(LD_NET, - "Couldn't construct socketpair (%s). " - "Network down? Out of sockets?", - tor_socket_strerror(-err)); - connection_edge_end(exit_conn, END_STREAM_REASON_RESOURCELIMIT); - connection_free(TO_CONN(exit_conn)); - return 0; - } - - tor_assert(fd[0] >= 0); - tor_assert(fd[1] >= 0); + log_info(LD_EXIT, "Opening local connection for anonymized directory exit"); - set_socket_nonblocking(fd[0]); - set_socket_nonblocking(fd[1]); - - exit_conn->_base.s = fd[0]; exit_conn->_base.state = EXIT_CONN_STATE_OPEN; dir_conn = TO_DIR_CONN(connection_new(CONN_TYPE_DIR)); - dir_conn->_base.s = fd[1]; dir_conn->_base.addr = 0x7f000001; dir_conn->_base.port = 0; @@ -2445,6 +2413,8 @@ connection_exit_connect_dir(edge_connection_t *exit_conn) dir_conn->_base.purpose = DIR_PURPOSE_SERVER; dir_conn->_base.state = DIR_CONN_STATE_SERVER_COMMAND_WAIT; + connection_link_connections(TO_CONN(dir_conn), TO_CONN(exit_conn)); + if (connection_add(TO_CONN(exit_conn))<0) { connection_edge_end(exit_conn, END_STREAM_REASON_RESOURCELIMIT); connection_free(TO_CONN(exit_conn)); diff --git a/src/or/directory.c b/src/or/directory.c index 2c1261bd32..67bfcf8e95 100644 --- a/src/or/directory.c +++ b/src/or/directory.c @@ -451,22 +451,25 @@ directory_initiate_command(const char *address, uint32_t addr, error indicates broken link in windowsland. */ } } else { /* we want to connect via tor */ + edge_connection_t *linked_conn; /* make an AP connection * populate it and add it at the right state * socketpair and hook up both sides */ conn->dirconn_direct = 0; - conn->_base.s = + linked_conn = connection_ap_make_bridge(conn->_base.address, conn->_base.port, digest, private_connection ? SOCKS_COMMAND_CONNECT : SOCKS_COMMAND_CONNECT_DIR); - if (conn->_base.s < 0) { + if (!linked_conn) { log_warn(LD_NET,"Making AP bridge to dirserver failed."); connection_mark_for_close(TO_CONN(conn)); + connection_mark_for_close(TO_CONN(linked_conn)); return; } + connection_link_connections(TO_CONN(conn), TO_CONN(linked_conn)); if (connection_add(TO_CONN(conn)) < 0) { log_warn(LD_NET,"Unable to add AP bridge to dirserver."); @@ -478,6 +481,7 @@ directory_initiate_command(const char *address, uint32_t addr, directory_send_command(conn, purpose, 0, resource, payload, payload_len); connection_watch_events(TO_CONN(conn), EV_READ | EV_WRITE); + connection_start_reading(TO_CONN(linked_conn)); } } @@ -1297,7 +1301,8 @@ connection_dir_reached_eof(dir_connection_t *conn) { int retval; if (conn->_base.state != DIR_CONN_STATE_CLIENT_READING) { - log_info(LD_HTTP,"conn reached eof, not reading. Closing."); + log_info(LD_HTTP,"conn reached eof, not reading. [state=%d] Closing.", + conn->_base.state); connection_close_immediate(TO_CONN(conn)); /* error: give up on flushing */ connection_mark_for_close(TO_CONN(conn)); return -1; diff --git a/src/or/main.c b/src/or/main.c index f15b18243b..9e96afce3a 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -74,6 +74,10 @@ static connection_t *connection_array[MAXCONNECTIONS+1] = /** List of connections that have been marked for close and need to be freed * and removed from connection_array. */ static smartlist_t *closeable_connection_lst = NULL; +/** DOCDOC */ +static smartlist_t *active_linked_connection_lst = NULL; +/** DOCDOC */ +static int called_loop_once = 0; static int n_conns=0; /**< Number of connections currently active. */ @@ -155,7 +159,7 @@ int connection_add(connection_t *conn) { tor_assert(conn); - tor_assert(conn->s >= 0); + tor_assert(conn->s >= 0 || conn->linked); tor_assert(conn->conn_array_index == -1); /* can only connection_add once */ if (n_conns == MAXCONNECTIONS) { @@ -198,13 +202,12 @@ connection_remove(connection_t *conn) tor_assert(conn->conn_array_index >= 0); current_index = conn->conn_array_index; + connection_unregister_events(conn); /* This is redundant, but cheap. */ if (current_index == n_conns-1) { /* this is the end */ n_conns--; return 0; } - connection_unregister(conn); - /* replace this one with the one at the end */ n_conns--; connection_array[current_index] = connection_array[n_conns]; @@ -213,23 +216,31 @@ connection_remove(connection_t *conn) return 0; } -/** If it's an edge conn, remove it from the list +/** If <b>conn</b> is an edge conn, remove it from the list * of conn's on this circuit. If it's not on an edge, * flush and send destroys for all circuits on this conn. * - * If <b>remove</b> is non-zero, then remove it from the - * connection_array and closeable_connection_lst. + * Remove it from connection_array (if applicable) and + * from closeable_connection_list. * * Then free it. */ static void -connection_unlink(connection_t *conn, int remove) +connection_unlink(connection_t *conn) { connection_about_to_close_connection(conn); - if (remove) { + if (conn->conn_array_index >= 0) { connection_remove(conn); } + if (conn->linked_conn) { + conn->linked_conn->linked_conn = NULL; + if (! conn->linked_conn->marked_for_close && + conn->linked_conn->reading_from_linked_conn) + connection_start_reading(conn->linked_conn); + conn->linked_conn = NULL; + } smartlist_remove(closeable_connection_lst, conn); + smartlist_remove(active_linked_connection_lst, conn); if (conn->type == CONN_TYPE_EXIT) { assert_connection_edge_not_dns_pending(TO_EDGE_CONN(conn)); } @@ -286,16 +297,23 @@ get_connection_array(connection_t ***array, int *n) void connection_watch_events(connection_t *conn, short events) { - int r; + int r = 0; tor_assert(conn); tor_assert(conn->read_event); tor_assert(conn->write_event); - if (events & EV_READ) { - r = event_add(conn->read_event, NULL); + if (conn->linked) { + if (events & EV_READ) + connection_start_reading(conn); + else + connection_stop_reading(conn); } else { - r = event_del(conn->read_event); + if (events & EV_READ) { + r = event_add(conn->read_event, NULL); + } else { + r = event_del(conn->read_event); + } } if (r<0) @@ -305,10 +323,17 @@ connection_watch_events(connection_t *conn, short events) conn->s, (events & EV_READ)?"":"un", tor_socket_strerror(tor_socket_errno(conn->s))); - if (events & EV_WRITE) { - r = event_add(conn->write_event, NULL); + if (conn->linked) { + if (events & EV_WRITE) + connection_start_writing(conn); + else + connection_stop_writing(conn); } else { - r = event_del(conn->write_event); + if (events & EV_WRITE) { + r = event_add(conn->write_event, NULL); + } else { + r = event_del(conn->write_event); + } } if (r<0) @@ -325,7 +350,8 @@ connection_is_reading(connection_t *conn) { tor_assert(conn); - return conn->read_event && event_pending(conn->read_event, EV_READ, NULL); + return conn->reading_from_linked_conn || + (conn->read_event && event_pending(conn->read_event, EV_READ, NULL)); } /** Tell the main loop to stop notifying <b>conn</b> of any read events. */ @@ -335,12 +361,16 @@ connection_stop_reading(connection_t *conn) tor_assert(conn); tor_assert(conn->read_event); - log_debug(LD_NET,"entering."); - if (event_del(conn->read_event)) - log_warn(LD_NET, "Error from libevent setting read event state for %d " - "to unwatched: %s", - conn->s, - tor_socket_strerror(tor_socket_errno(conn->s))); + if (conn->linked) { + conn->reading_from_linked_conn = 0; + connection_stop_reading_from_linked_conn(conn); + } else { + if (event_del(conn->read_event)) + log_warn(LD_NET, "Error from libevent setting read event state for %d " + "to unwatched: %s", + conn->s, + tor_socket_strerror(tor_socket_errno(conn->s))); + } } /** Tell the main loop to start notifying <b>conn</b> of any read events. */ @@ -350,11 +380,17 @@ connection_start_reading(connection_t *conn) tor_assert(conn); tor_assert(conn->read_event); - if (event_add(conn->read_event, NULL)) - log_warn(LD_NET, "Error from libevent setting read event state for %d " - "to watched: %s", - conn->s, - tor_socket_strerror(tor_socket_errno(conn->s))); + if (conn->linked) { + conn->reading_from_linked_conn = 1; + if (connection_should_read_from_linked_conn(conn)) + connection_start_reading_from_linked_conn(conn); + } else { + if (event_add(conn->read_event, NULL)) + log_warn(LD_NET, "Error from libevent setting read event state for %d " + "to watched: %s", + conn->s, + tor_socket_strerror(tor_socket_errno(conn->s))); + } } /** Return true iff <b>conn</b> is listening for write events. */ @@ -363,7 +399,8 @@ connection_is_writing(connection_t *conn) { tor_assert(conn); - return conn->write_event && event_pending(conn->write_event, EV_WRITE, NULL); + return conn->writing_to_linked_conn || + (conn->write_event && event_pending(conn->write_event, EV_WRITE, NULL)); } /** Tell the main loop to stop notifying <b>conn</b> of any write events. */ @@ -373,11 +410,17 @@ connection_stop_writing(connection_t *conn) tor_assert(conn); tor_assert(conn->write_event); - if (event_del(conn->write_event)) - log_warn(LD_NET, "Error from libevent setting write event state for %d " - "to unwatched: %s", - conn->s, - tor_socket_strerror(tor_socket_errno(conn->s))); + if (conn->linked) { + conn->writing_to_linked_conn = 0; + if (conn->linked_conn) + connection_stop_reading_from_linked_conn(conn->linked_conn); + } else { + if (event_del(conn->write_event)) + log_warn(LD_NET, "Error from libevent setting write event state for %d " + "to unwatched: %s", + conn->s, + tor_socket_strerror(tor_socket_errno(conn->s))); + } } /** Tell the main loop to start notifying <b>conn</b> of any write events. */ @@ -387,11 +430,58 @@ connection_start_writing(connection_t *conn) tor_assert(conn); tor_assert(conn->write_event); - if (event_add(conn->write_event, NULL)) - log_warn(LD_NET, "Error from libevent setting write event state for %d " - "to watched: %s", - conn->s, - tor_socket_strerror(tor_socket_errno(conn->s))); + if (conn->linked) { + conn->writing_to_linked_conn = 1; + if (conn->linked_conn && + connection_should_read_from_linked_conn(conn->linked_conn)) + connection_start_reading_from_linked_conn(conn->linked_conn); + } else { + if (event_add(conn->write_event, NULL)) + log_warn(LD_NET, "Error from libevent setting write event state for %d " + "to watched: %s", + conn->s, + tor_socket_strerror(tor_socket_errno(conn->s))); + } +} + +/** DOCDOC*/ +void +connection_start_reading_from_linked_conn(connection_t *conn) +{ + tor_assert(conn); + tor_assert(conn->linked == 1); + + if (!conn->active_on_link) { + conn->active_on_link = 1; + smartlist_add(active_linked_connection_lst, conn); + if (!called_loop_once) { + /* This is the first event on the list; we won't be in LOOP_ONCE mode, + * so we need to make sure that the event_loop() actually exits at the + * end of its run through the current connections and + * lets us activate read events for linked connections. */ + struct timeval tv = { 0, 0 }; + event_loopexit(&tv); + } + } else { + tor_assert(smartlist_isin(active_linked_connection_lst, conn)); + } +} + +/** DOCDOC*/ +void +connection_stop_reading_from_linked_conn(connection_t *conn) +{ + tor_assert(conn); + tor_assert(conn->linked == 1); + + if (conn->active_on_link) { + conn->active_on_link = 0; + /* XXXX020 maybe we should keep an index here so we can smartlist_del + * cleanly. */ + smartlist_remove(active_linked_connection_lst, conn); + } else { + tor_assert(!smartlist_isin(active_linked_connection_lst, conn)); + } } /** Close all connections that have been scheduled to get closed */ @@ -402,7 +492,7 @@ close_closeable_connections(void) for (i = 0; i < smartlist_len(closeable_connection_lst); ) { connection_t *conn = smartlist_get(closeable_connection_lst, i); if (conn->conn_array_index < 0) { - connection_unlink(conn, 0); /* blow it away right now */ + connection_unlink(conn); /* blow it away right now */ } else { if (!conn_close_if_marked(conn->conn_array_index)) ++i; @@ -500,7 +590,7 @@ conn_close_if_marked(int i) assert_all_pending_dns_resolves_ok(); log_debug(LD_NET,"Cleaning up connection (fd %d).",conn->s); - if (conn->s >= 0 && connection_wants_to_flush(conn)) { + if ((conn->s >= 0 || conn->linked_conn) && connection_wants_to_flush(conn)) { /* s == -1 means it's an incomplete edge connection, or that the socket * has already been closed as unflushable. */ int sz = connection_bucket_write_limit(conn); @@ -512,7 +602,21 @@ conn_close_if_marked(int i) conn->s, conn_type_to_string(conn->type), conn->state, (int)conn->outbuf_flushlen, conn->marked_for_close_file, conn->marked_for_close); - if (connection_speaks_cells(conn)) { + if (conn->linked_conn) { + retval = move_buf_to_buf(conn->linked_conn->inbuf, conn->outbuf, + &conn->outbuf_flushlen); + if (retval >= 0) { + /* The linked conn will notice that it has data when it notices that + * we're gone. */ + connection_start_reading_from_linked_conn(conn->linked_conn); + } + /* XXXX020 Downgrade to debug. */ + log_info(LD_GENERAL, "Flushed last %d bytes from a linked conn; " + "%d left; flushlen %d; wants-to-flush==%d", retval, + (int)buf_datalen(conn->outbuf), + (int)conn->outbuf_flushlen, + connection_wants_to_flush(conn)); + } else if (connection_speaks_cells(conn)) { if (conn->state == OR_CONN_STATE_OPEN) { retval = flush_buf_tls(TO_OR_CONN(conn)->tls, conn->outbuf, sz, &conn->outbuf_flushlen); @@ -553,7 +657,7 @@ conn_close_if_marked(int i) conn->marked_for_close); } } - connection_unlink(conn, 1); /* unlink, remove, free */ + connection_unlink(conn); /* unlink, remove, free */ return 1; } @@ -1270,8 +1374,14 @@ do_main_loop(void) /* Make it easier to tell whether libevent failure is our fault or not. */ errno = 0; #endif - /* poll until we have an event, or the second ends */ - loop_result = event_dispatch(); + /* All active linked conns should get their read events activated. */ + SMARTLIST_FOREACH(active_linked_connection_lst, connection_t *, conn, + event_active(conn->read_event, EV_READ, 1)); + called_loop_once = smartlist_len(active_linked_connection_lst) ? 1 : 0; + + /* poll until we have an event, or the second ends, or until we have + * some active linked connections to triggger events for. */ + loop_result = event_loop(called_loop_once ? EVLOOP_ONCE : 0); /* let catch() handle things like ^c, and otherwise don't worry about it */ if (loop_result < 0) { @@ -1601,6 +1711,8 @@ tor_init(int argc, char *argv[]) time_of_process_start = time(NULL); if (!closeable_connection_lst) closeable_connection_lst = smartlist_create(); + if (!active_linked_connection_lst) + active_linked_connection_lst = smartlist_create(); /* Initialize the history structures. */ rep_hist_init(); /* Initialize the service cache. */ @@ -1673,6 +1785,7 @@ tor_free_all(int postfork) tor_tls_free_all(); /* stuff in main.c */ smartlist_free(closeable_connection_lst); + smartlist_free(active_linked_connection_lst); tor_free(timeout_event); /* Stuff in util.c */ escaped(NULL); diff --git a/src/or/or.h b/src/or/or.h index 1fbcb1b7aa..a090f2e0ce 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -748,6 +748,7 @@ typedef struct connection_t { /* The next fields are all one-bit booleans. Some are only applicable * to connection subtypes, but we hold them here anyway, to save space. * (Currently, they all fit into a single byte.) */ + /*XXXX020 rename wants_to_*; the names are misleading. */ unsigned wants_to_read:1; /**< Boolean: should we start reading again once * the bandwidth throttler allows it? */ unsigned wants_to_write:1; /**< Boolean: should we start writing again once @@ -771,7 +772,7 @@ typedef struct connection_t { unsigned int chosen_exit_optional:1; int s; /**< Our socket; -1 if this connection is closed, or has no - * sockets. */ + * socket. */ int conn_array_index; /**< Index into the global connection array. */ struct event *read_event; /**< Libevent event structure. */ struct event *write_event; /**< Libevent event structure. */ @@ -797,6 +798,13 @@ typedef struct connection_t { * we marked for close? */ char *address; /**< FQDN (or IP) of the guy on the other end. * strdup into this, because free_connection frees it. */ + /** Annother connection that's connected to this one in lieu of a socket. */ + struct connection_t *linked_conn; + /* XXXX020 NM move these up to the other 1-bit flags. */ + unsigned int linked:1; /**< True if there is, or has been, a linked_conn. */ + unsigned int reading_from_linked_conn:1; /**DOCDOC*/ + unsigned int writing_to_linked_conn:1; /**DOCDOC*/ + unsigned int active_on_link:1; /**DOCDOC*/ } connection_t; @@ -1967,6 +1975,7 @@ int flush_buf_tls(tor_tls_t *tls, buf_t *buf, size_t sz, size_t *buf_flushlen); int write_to_buf(const char *string, size_t string_len, buf_t *buf); int write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state, const char *data, size_t data_len, int done); +int move_buf_to_buf(buf_t *buf_out, buf_t *buf_in, size_t *buf_flushlen); int fetch_from_buf(char *string, size_t string_len, buf_t *buf); int fetch_from_buf_http(buf_t *buf, char **headers_out, size_t max_headerlen, @@ -2163,7 +2172,8 @@ const char *conn_type_to_string(int type); const char *conn_state_to_string(int type, int state); connection_t *connection_new(int type); -void connection_unregister(connection_t *conn); +void connection_link_connections(connection_t *conn_a, connection_t *conn_b); +void connection_unregister_events(connection_t *conn); void connection_free(connection_t *conn); void connection_free_all(void); void connection_about_to_close_connection(connection_t *conn); @@ -2227,6 +2237,7 @@ connection_t *connection_get_by_type_state_rendquery(int type, int state, int connection_is_listener(connection_t *conn); int connection_state_is_open(connection_t *conn); int connection_state_is_connecting(connection_t *conn); +int connection_should_read_from_linked_conn(connection_t *conn); char *alloc_http_authenticator(const char *authenticator); @@ -2252,8 +2263,8 @@ int connection_edge_finished_connecting(edge_connection_t *conn); int connection_ap_handshake_send_begin(edge_connection_t *ap_conn); int connection_ap_handshake_send_resolve(edge_connection_t *ap_conn); -int connection_ap_make_bridge(char *address, uint16_t port, - const char *digest, int command); +edge_connection_t *connection_ap_make_bridge(char *address, uint16_t port, + const char *digest, int command); void connection_ap_handshake_socks_reply(edge_connection_t *conn, char *reply, size_t replylen, int endreason); @@ -2580,6 +2591,9 @@ int connection_is_writing(connection_t *conn); void connection_stop_writing(connection_t *conn); void connection_start_writing(connection_t *conn); +void connection_stop_reading_from_linked_conn(connection_t *conn); +void connection_start_reading_from_linked_conn(connection_t *conn); + void directory_all_unreachable(time_t now); void directory_info_has_arrived(time_t now, int from_cache); diff --git a/src/or/test.c b/src/or/test.c index a03f17134e..21916c8a0e 100644 --- a/src/or/test.c +++ b/src/or/test.c @@ -111,9 +111,10 @@ test_buffers(void) char str[256]; char str2[256]; - buf_t *buf; + buf_t *buf, *buf2; int j; + size_t r; /**** * buf_new @@ -218,6 +219,37 @@ test_buffers(void) test_memeq(str2, str, 255); } + /* Move from buf to buf. */ + buf_free(buf); + buf = buf_new_with_capacity(4096); + buf2 = buf_new_with_capacity(4096); + for (j=0;j<100;++j) + write_to_buf(str, 255, buf); + test_eq(buf_datalen(buf), 25500); + for (j=0;j<100;++j) { + r = 10; + move_buf_to_buf(buf2, buf, &r); + test_eq(r, 0); + } + test_eq(buf_datalen(buf), 24500); + test_eq(buf_datalen(buf2), 1000); + for (j=0;j<3;++j) { + fetch_from_buf(str2, 255, buf2); + test_memeq(str2, str, 255); + } + r = 8192; /*big move*/ + move_buf_to_buf(buf2, buf, &r); + test_eq(r, 0); + r = 30000; /* incomplete move */ + move_buf_to_buf(buf2, buf, &r); + test_eq(r, 13692); + for (j=0;j<97;++j) { + fetch_from_buf(str2, 255, buf2); + test_memeq(str2, str, 255); + } + buf_free(buf); + buf_free(buf2); + #if 0 { int s; @@ -285,8 +317,6 @@ test_buffers(void) test_eq(eof, 1); } #endif - - buf_free(buf); } static void |