diff options
author | Nick Mathewson <nickm@torproject.org> | 2009-08-11 15:16:16 -0400 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2010-09-27 12:31:13 -0400 |
commit | 4af6887d201d978a46072ead0036e0d16fa5908a (patch) | |
tree | 6d4c6d53f20ac02ef713fd01a211a57ff26c6510 | |
parent | b63f6518cbdc4c80b09399bc17d3bec3cac76ad9 (diff) | |
download | tor-4af6887d201d978a46072ead0036e0d16fa5908a.tar.gz tor-4af6887d201d978a46072ead0036e0d16fa5908a.zip |
Add support for linked connections with bufferevent_pair.
Also, set directory connections (linked and otherwise) to use bufferevents.
Also, stop using outbuf_flushlen anywhere except for OR connections.
-rw-r--r-- | src/or/connection.c | 2 | ||||
-rw-r--r-- | src/or/connection.h | 4 | ||||
-rw-r--r-- | src/or/connection_edge.c | 22 | ||||
-rw-r--r-- | src/or/connection_edge.h | 3 | ||||
-rw-r--r-- | src/or/directory.c | 11 | ||||
-rw-r--r-- | src/or/main.c | 90 |
6 files changed, 102 insertions, 30 deletions
diff --git a/src/or/connection.c b/src/or/connection.c index 394aa3b836..8b9d47d5d1 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -191,8 +191,8 @@ connection_type_uses_bufferevent(connection_t *conn) { switch (conn->type) { case CONN_TYPE_AP: - return 1; case CONN_TYPE_EXIT: + case CONN_TYPE_DIR: return 1; default: return 0; diff --git a/src/or/connection.h b/src/or/connection.h index 4d269d649a..adf79f1391 100644 --- a/src/or/connection.h +++ b/src/or/connection.h @@ -100,7 +100,7 @@ connection_get_inbuf_len(connection_t *conn) IF_HAS_BUFFEREVENT(conn, { return evbuffer_get_length(bufferevent_get_input(conn->bufev)); }) ELSE_IF_NO_BUFFEREVENT { - return buf_datalen(conn->inbuf); + return conn->inbuf ? buf_datalen(conn->inbuf) : 0; } } @@ -110,7 +110,7 @@ connection_get_outbuf_len(connection_t *conn) IF_HAS_BUFFEREVENT(conn, { return evbuffer_get_length(bufferevent_get_output(conn->bufev)); }) ELSE_IF_NO_BUFFEREVENT { - return buf_datalen(conn->outbuf); + return conn->outbuf ? buf_datalen(conn->outbuf) : 0; } } diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c index f90c44f58d..39bc8e7c05 100644 --- a/src/or/connection_edge.c +++ b/src/or/connection_edge.c @@ -357,8 +357,9 @@ connection_edge_finished_connecting(edge_connection_t *edge_conn) rep_hist_note_exit_stream_opened(conn->port); conn->state = EXIT_CONN_STATE_OPEN; - connection_watch_events(conn, READ_EVENT); /* stop writing, keep reading */ - if (connection_wants_to_flush(conn)) /* in case there are any queued relay + IF_HAS_NO_BUFFEREVENT(conn) + connection_watch_events(conn, READ_EVENT); /* stop writing, keep reading */ + if (connection_get_outbuf_len(conn)) /* in case there are any queued relay * cells */ connection_start_writing(conn); /* deliver a 'connected' relay cell back through the circuit. */ @@ -2109,8 +2110,10 @@ connection_ap_handshake_send_begin(edge_connection_t *ap_conn) ap_conn->socks_request->port); payload_len = (int)strlen(payload)+1; - log_debug(LD_APP, - "Sending relay cell to begin stream %d.", ap_conn->stream_id); + log_info(LD_APP, + "Sending relay cell %d to begin stream %d.", + (int)ap_conn->use_begindir, + ap_conn->stream_id); begin_type = ap_conn->use_begindir ? RELAY_COMMAND_BEGIN_DIR : RELAY_COMMAND_BEGIN; @@ -2218,9 +2221,11 @@ connection_ap_handshake_send_resolve(edge_connection_t *ap_conn) * and call connection_ap_handshake_attach_circuit(conn) on it. * * Return the other end of the linked connection pair, or -1 if error. + * DOCDOC partner. */ edge_connection_t * -connection_ap_make_link(char *address, uint16_t port, +connection_ap_make_link(connection_t *partner, + char *address, uint16_t port, const char *digest, int use_begindir, int want_onehop) { edge_connection_t *conn; @@ -2255,6 +2260,8 @@ connection_ap_make_link(char *address, uint16_t port, tor_addr_make_unspec(&conn->_base.addr); conn->_base.port = 0; + connection_link_connections(partner, TO_CONN(conn)); + if (connection_add(TO_CONN(conn)) < 0) { /* no space, forget it */ connection_free(TO_CONN(conn)); return NULL; @@ -2772,12 +2779,13 @@ connection_exit_connect(edge_connection_t *edge_conn) } conn->state = EXIT_CONN_STATE_OPEN; - if (connection_wants_to_flush(conn)) { + if (connection_get_outbuf_len(conn)) { /* in case there are any queued data cells */ log_warn(LD_BUG,"newly connected conn had data waiting!"); // connection_start_writing(conn); } - connection_watch_events(conn, READ_EVENT); + IF_HAS_NO_BUFFEREVENT(conn) + connection_watch_events(conn, READ_EVENT); /* also, deliver a 'connected' cell back through the circuit. */ if (connection_edge_is_rendezvous_stream(edge_conn)) { diff --git a/src/or/connection_edge.h b/src/or/connection_edge.h index 762af5172e..0f7bf07809 100644 --- a/src/or/connection_edge.h +++ b/src/or/connection_edge.h @@ -29,7 +29,8 @@ int connection_edge_finished_connecting(edge_connection_t *conn); int connection_ap_handshake_send_begin(edge_connection_t *ap_conn); int connection_ap_handshake_send_resolve(edge_connection_t *ap_conn); -edge_connection_t *connection_ap_make_link(char *address, uint16_t port, +edge_connection_t *connection_ap_make_link(connection_t *partner, + char *address, uint16_t port, const char *digest, int use_begindir, int want_onehop); void connection_ap_handshake_socks_reply(edge_connection_t *conn, char *reply, diff --git a/src/or/directory.c b/src/or/directory.c index 284a1ad12f..ac6f205fbe 100644 --- a/src/or/directory.c +++ b/src/or/directory.c @@ -892,14 +892,14 @@ directory_initiate_command_rend(const char *address, const tor_addr_t *_addr, * hook up both sides */ linked_conn = - connection_ap_make_link(conn->_base.address, conn->_base.port, + connection_ap_make_link(TO_CONN(conn), + conn->_base.address, conn->_base.port, digest, use_begindir, conn->dirconn_direct); if (!linked_conn) { log_warn(LD_NET,"Making tunnel to dirserver failed."); connection_mark_for_close(TO_CONN(conn)); return; } - connection_link_connections(TO_CONN(conn), TO_CONN(linked_conn)); if (connection_add(TO_CONN(conn)) < 0) { log_warn(LD_NET,"Unable to add connection for link to dirserver."); @@ -912,8 +912,12 @@ directory_initiate_command_rend(const char *address, const tor_addr_t *_addr, payload, payload_len, supports_conditional_consensus, if_modified_since); + connection_watch_events(TO_CONN(conn), READ_EVENT|WRITE_EVENT); - connection_start_reading(TO_CONN(linked_conn)); + IF_HAS_BUFFEREVENT(TO_CONN(linked_conn), { + connection_watch_events(TO_CONN(linked_conn), READ_EVENT|WRITE_EVENT); + }) ELSE_IF_NO_BUFFEREVENT + connection_start_reading(TO_CONN(linked_conn)); } } @@ -3352,6 +3356,7 @@ connection_dir_finished_flushing(dir_connection_t *conn) DIRREQ_DIRECT, DIRREQ_FLUSHING_DIR_CONN_FINISHED); switch (conn->_base.state) { + case DIR_CONN_STATE_CONNECTING: case DIR_CONN_STATE_CLIENT_SENDING: log_debug(LD_DIR,"client finished sending command."); conn->_base.state = DIR_CONN_STATE_CLIENT_READING; diff --git a/src/or/main.c b/src/or/main.c index 976d805e13..cba98a8849 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -155,6 +155,32 @@ int can_complete_circuit=0; * ****************************************************************************/ +#ifdef USE_BUFFEREVENTS +static void +free_old_inbuf(connection_t *conn) +{ + if (! conn->inbuf) + return; + + tor_assert(conn->outbuf); + tor_assert(buf_datalen(conn->inbuf) == 0); + tor_assert(buf_datalen(conn->outbuf) == 0); + buf_free(conn->inbuf); + buf_free(conn->outbuf); + conn->inbuf = conn->outbuf = NULL; + + if (conn->read_event) { + event_del(conn->read_event); + tor_event_free(conn->read_event); + } + if (conn->write_event) { + event_del(conn->read_event); + tor_event_free(conn->write_event); + } + conn->read_event = conn->write_event = NULL; +} +#endif + /** Add <b>conn</b> to the array of connections that we can poll on. The * connection's socket must be set; the connection starts out * non-reading and non-writing. @@ -173,28 +199,47 @@ connection_add_impl(connection_t *conn, int is_connecting) smartlist_add(connection_array, conn); #ifdef USE_BUFFEREVENTS - if (connection_type_uses_bufferevent(conn) && - conn->s >= 0 && !conn->linked) { - conn->bufev = bufferevent_socket_new( + if (connection_type_uses_bufferevent(conn)) { + if (conn->s >= 0 && !conn->linked) { + conn->bufev = bufferevent_socket_new( tor_libevent_get_base(), conn->s, BEV_OPT_DEFER_CALLBACKS); - if (conn->inbuf) { + /* XXXX CHECK FOR NULL RETURN! */ + if (is_connecting) { + /* Put the bufferevent into a "connecting" state so that we'll get + * a "connected" event callback on successful write. */ + bufferevent_socket_connect(conn->bufev, NULL, 0); + } + connection_configure_bufferevent_callbacks(conn); + } else if (conn->linked && conn->linked_conn && + connection_type_uses_bufferevent(conn->linked_conn)) { + tor_assert(conn->s < 0); + if (!conn->bufev) { + struct bufferevent *pair[2] = { NULL, NULL }; + /* XXXX CHECK FOR ERROR RETURN! */ + bufferevent_pair_new(tor_libevent_get_base(), + BEV_OPT_DEFER_CALLBACKS, + pair); + tor_assert(pair[0]); + conn->bufev = pair[0]; + conn->linked_conn->bufev = pair[1]; + } /* else the other side already was added, and got a bufferevent_pair */ + connection_configure_bufferevent_callbacks(conn); + } + + if (conn->bufev && conn->inbuf) { /* XXX Instead we should assert that there is no inbuf, once we * have linked connections using bufferevents. */ - tor_assert(conn->outbuf); - tor_assert(buf_datalen(conn->inbuf) == 0); - tor_assert(buf_datalen(conn->outbuf) == 0); - buf_free(conn->inbuf); - buf_free(conn->outbuf); - conn->inbuf = conn->outbuf = NULL; + free_old_inbuf(conn); } - if (is_connecting) { - /* Put the bufferevent into a "connecting" state so that we'll get - * a "connected" event callback on successful write. */ - bufferevent_socket_connect(conn->bufev, NULL, 0); + + if (conn->linked_conn && conn->linked_conn->bufev && + conn->linked_conn->inbuf) { + /* XXX Instead we should assert that there is no inbuf, once we + * have linked connections using bufferevents. */ + free_old_inbuf(conn->linked_conn); } - connection_configure_bufferevent_callbacks(conn); } #else (void) is_connecting; @@ -205,6 +250,7 @@ connection_add_impl(connection_t *conn, int is_connecting) conn->s, EV_READ|EV_PERSIST, conn_read_callback, conn); conn->write_event = tor_event_new(tor_libevent_get_base(), conn->s, EV_WRITE|EV_PERSIST, conn_write_callback, conn); + /* XXXX CHECK FOR NULL RETURN! */ } log_debug(LD_NET,"new conn type %s, socket %d, address %s, n_conns %d.", @@ -671,11 +717,19 @@ conn_close_if_marked(int i) /* assert_all_pending_dns_resolves_ok(); */ #ifdef USE_BUFFEREVENTS - if (conn->bufev && conn->hold_open_until_flushed) + if (conn->bufev && conn->hold_open_until_flushed) { + if (conn->linked) { + /* We need to do this explicitly so that the linked connection + * notices that there was an EOF. */ + bufferevent_flush(conn->bufev, EV_WRITE, BEV_FINISHED); + /* XXXX Now can we free it? */ + } return 0; + } #endif log_debug(LD_NET,"Cleaning up connection (fd %d).",conn->s); + IF_HAS_BUFFEREVENT(conn, goto unlink); if ((conn->s >= 0 || conn->linked_conn) && connection_wants_to_flush(conn)) { /* s == -1 means it's an incomplete edge connection, or that the socket * has already been closed as unflushable. */ @@ -743,6 +797,10 @@ conn_close_if_marked(int i) conn->marked_for_close); } } + +#ifdef USE_BUFFEREVENTS + unlink: +#endif connection_unlink(conn); /* unlink, remove, free */ return 1; } |