diff options
-rw-r--r-- | changes/decouple_control_events | 8 | ||||
-rw-r--r-- | src/or/control.c | 272 | ||||
-rw-r--r-- | src/or/control.h | 10 | ||||
-rw-r--r-- | src/or/main.c | 2 | ||||
-rw-r--r-- | src/test/test_hs.c | 12 | ||||
-rw-r--r-- | src/test/test_pt.c | 12 |
6 files changed, 252 insertions, 64 deletions
diff --git a/changes/decouple_control_events b/changes/decouple_control_events new file mode 100644 index 0000000000..67c9c11f87 --- /dev/null +++ b/changes/decouple_control_events @@ -0,0 +1,8 @@ + o Code simplification and refactoring: + - When generating an event to send to the controller, we no longer + put the event over the network immediately. Instead, we queue + these events, and use a Libevent callback to deliver them. + This change simplifies Tor's callgraph by reducing the number + of functions from which all other Tor functions are reachable. + Closes ticket 16695. + diff --git a/src/or/control.c b/src/or/control.c index 7a113f2c1c..a17f6977f1 100644 --- a/src/or/control.c +++ b/src/or/control.c @@ -20,6 +20,7 @@ #include "circuitstats.h" #include "circuituse.h" #include "command.h" +#include "compat_libevent.h" #include "config.h" #include "confparse.h" #include "connection.h" @@ -50,6 +51,12 @@ #include <sys/resource.h> #endif +#ifdef HAVE_EVENT2_EVENT_H +#include <event2/event.h> +#else +#include <event.h> +#endif + #include "crypto_s2k.h" #include "procmon.h" @@ -181,6 +188,8 @@ static void orconn_target_get_name(char *buf, size_t len, static int get_cached_network_liveness(void); static void set_cached_network_liveness(int liveness); +static void flush_queued_events_cb(evutil_socket_t fd, short what, void *arg); + /** Given a control event code for a message event, return the corresponding * log severity. */ static INLINE int @@ -578,6 +587,156 @@ send_control_done(control_connection_t *conn) connection_write_str_to_buf("250 OK\r\n", conn); } +/** Represents an event that's queued to be sent to one or more + * controllers. */ +typedef struct queued_event_s { + uint16_t event; + char *msg; +} queued_event_t; + +/** If this is greater than 0, we don't allow new events to be queued. */ +static int block_event_queue = 0; + +/** Holds a smartlist of queued_event_t objects that may need to be sent + * to one or more controllers */ +static smartlist_t *queued_control_events = NULL; + +/** An event that should fire in order to flush the contents of + * queued_control_events. */ +static struct event *flush_queued_events_event = NULL; + +/** Helper: inserts an event on the list of events queued to be sent to + * one or more controllers, and schedules the events to be flushed if needed. + * + * This function takes ownership of <b>msg</b>, and may free it. + * + * We queue these events rather than send them immediately in order to break + * the dependency in our callgraph from code that generates events for the + * controller, and the network layer at large. Otherwise, nearly every + * interesting part of Tor would potentially call every other interesting part + * of Tor. + */ +MOCK_IMPL(STATIC void, +queue_control_event_string,(uint16_t event, char *msg)) +{ + if (PREDICT_UNLIKELY(queued_control_events == NULL)) { + queued_control_events = smartlist_new(); + } + + /* This is redundant with checks done elsewhere, but it's a last-ditch + * attempt to avoid queueing something we shouldn't have to queue. */ + if (PREDICT_UNLIKELY( ! EVENT_IS_INTERESTING(event) )) { + tor_free(msg); + return; + } + if (block_event_queue) { + tor_free(msg); + return; + } + + /* No queueing an event while queueing an event */ + ++block_event_queue; + + queued_event_t *ev = tor_malloc(sizeof(*ev)); + ev->event = event; + ev->msg = msg; + + smartlist_add(queued_control_events, ev); + + /* We just put the first event on the queue; mark the queue to be + * flushed. + */ + if (smartlist_len(queued_control_events) == 1) { + if (PREDICT_UNLIKELY(flush_queued_events_event == NULL)) { + struct event_base *b = tor_libevent_get_base(); + tor_assert(b); + flush_queued_events_event = tor_event_new(b, + -1, 0, flush_queued_events_cb, + NULL); + tor_assert(flush_queued_events_event); + } + event_active(flush_queued_events_event, EV_READ, 1); + } + + --block_event_queue; +} + +/** Release all storage held by <b>ev</b>. */ +static void +queued_event_free(queued_event_t *ev) +{ + if (ev == NULL) + return; + + tor_free(ev->msg); + tor_free(ev); +} + +/** Send every queued event to every controller that's interested in it, + * and remove the events from the queue. If <b>force</b> is true, + * then make all controllers send their data out immediately, since we + * may be about to shut down. */ +static void +queued_events_flush_all(int force) +{ + smartlist_t *all_conns = get_connection_array(); + smartlist_t *controllers = smartlist_new(); + + if (PREDICT_UNLIKELY(queued_control_events == NULL)) { + return; + } + + /* No queueing an event while flushing events. */ + ++block_event_queue; + + /* Gather all the controllers that will care... */ + SMARTLIST_FOREACH_BEGIN(all_conns, connection_t *, conn) { + if (conn->type == CONN_TYPE_CONTROL && + !conn->marked_for_close && + conn->state == CONTROL_CONN_STATE_OPEN) { + control_connection_t *control_conn = TO_CONTROL_CONN(conn); + + smartlist_add(controllers, control_conn); + } + } SMARTLIST_FOREACH_END(conn); + + SMARTLIST_FOREACH_BEGIN(queued_control_events, queued_event_t *, ev) { + const event_mask_t bit = ((event_mask_t)1) << ev->event; + const size_t msg_len = strlen(ev->msg); + SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *, + control_conn) { + if (control_conn->event_mask & bit) { + connection_write_to_buf(ev->msg, msg_len, TO_CONN(control_conn)); + } + } SMARTLIST_FOREACH_END(control_conn); + + queued_event_free(ev); + } SMARTLIST_FOREACH_END(ev); + + if (force) { + SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *, + control_conn) { + connection_flush(TO_CONN(control_conn)); + } SMARTLIST_FOREACH_END(control_conn); + } + + smartlist_clear(queued_control_events); + smartlist_free(controllers); + + --block_event_queue; +} + +/** Libevent callback: Flushes pending events to controllers that are + * interested in them */ +static void +flush_queued_events_cb(evutil_socket_t fd, short what, void *arg) +{ + (void) fd; + (void) what; + (void) arg; + queued_events_flush_all(0); +} + /** Send an event to all v1 controllers that are listening for code * <b>event</b>. The event's body is given by <b>msg</b>. * @@ -592,32 +751,9 @@ MOCK_IMPL(STATIC void, send_control_event_string,(uint16_t event, event_format_t which, const char *msg)) { - smartlist_t *conns = get_connection_array(); - (void)which; + (void) which; tor_assert(event >= EVENT_MIN_ && event <= EVENT_MAX_); - - SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { - if (conn->type == CONN_TYPE_CONTROL && - !conn->marked_for_close && - conn->state == CONTROL_CONN_STATE_OPEN) { - control_connection_t *control_conn = TO_CONTROL_CONN(conn); - - if (control_conn->event_mask & (((event_mask_t)1)<<event)) { - int is_err = 0; - connection_write_to_buf(msg, strlen(msg), TO_CONN(control_conn)); - if (event == EVENT_ERR_MSG) - is_err = 1; - else if (event == EVENT_STATUS_GENERAL) - is_err = !strcmpstart(msg, "STATUS_GENERAL ERR "); - else if (event == EVENT_STATUS_CLIENT) - is_err = !strcmpstart(msg, "STATUS_CLIENT ERR "); - else if (event == EVENT_STATUS_SERVER) - is_err = !strcmpstart(msg, "STATUS_SERVER ERR "); - if (is_err) - connection_flush(TO_CONN(control_conn)); - } - } - } SMARTLIST_FOREACH_END(conn); + queue_control_event_string(event, tor_strdup(msg)); } /** Helper for send_control_event and control_event_status: @@ -628,6 +764,8 @@ static void send_control_event_impl(uint16_t event, event_format_t which, const char *format, va_list ap) { + (void) which; + char *buf = NULL; int len; @@ -637,9 +775,7 @@ send_control_event_impl(uint16_t event, event_format_t which, return; } - send_control_event_string(event, which|ALL_FORMATS, buf); - - tor_free(buf); + queue_control_event_string(event, buf); } /** Send an event to all v1 controllers that are listening for code @@ -5032,6 +5168,10 @@ control_event_logmsg(int severity, uint32_t domain, const char *msg) } ++disable_log_messages; send_control_event(event, ALL_FORMATS, "650 %s %s\r\n", s, b?b:msg); + if (severity == LOG_ERR) { + /* Force a flush, since we may be about to die horribly */ + queued_events_flush_all(1); + } --disable_log_messages; tor_free(b); } @@ -5401,19 +5541,35 @@ control_event_status(int type, int severity, const char *format, va_list args) return 0; } +#define CONTROL_EVENT_STATUS_BODY(event, sev) \ + int r; \ + do { \ + va_list ap; \ + if (!EVENT_IS_INTERESTING(event)) \ + return 0; \ + \ + va_start(ap, format); \ + r = control_event_status((event), (sev), format, ap); \ + va_end(ap); \ + } while (0) + /** Format and send an EVENT_STATUS_GENERAL event whose main text is obtained * by formatting the arguments using the printf-style <b>format</b>. */ int control_event_general_status(int severity, const char *format, ...) { - va_list ap; - int r; - if (!EVENT_IS_INTERESTING(EVENT_STATUS_GENERAL)) - return 0; + CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_GENERAL, severity); + return r; +} - va_start(ap, format); - r = control_event_status(EVENT_STATUS_GENERAL, severity, format, ap); - va_end(ap); +/** Format and send an EVENT_STATUS_GENERAL LOG_ERR event, and flush it to the + * controller(s) immediately. */ +int +control_event_general_error(const char *format, ...) +{ + CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_GENERAL, LOG_ERR); + /* Force a flush, since we may be about to die horribly */ + queued_events_flush_all(1); return r; } @@ -5422,14 +5578,18 @@ control_event_general_status(int severity, const char *format, ...) int control_event_client_status(int severity, const char *format, ...) { - va_list ap; - int r; - if (!EVENT_IS_INTERESTING(EVENT_STATUS_CLIENT)) - return 0; + CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_CLIENT, severity); + return r; +} - va_start(ap, format); - r = control_event_status(EVENT_STATUS_CLIENT, severity, format, ap); - va_end(ap); +/** Format and send an EVENT_STATUS_CLIENT LOG_ERR event, and flush it to the + * controller(s) immediately. */ +int +control_event_client_error(const char *format, ...) +{ + CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_CLIENT, LOG_ERR); + /* Force a flush, since we may be about to die horribly */ + queued_events_flush_all(1); return r; } @@ -5438,14 +5598,18 @@ control_event_client_status(int severity, const char *format, ...) int control_event_server_status(int severity, const char *format, ...) { - va_list ap; - int r; - if (!EVENT_IS_INTERESTING(EVENT_STATUS_SERVER)) - return 0; + CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_SERVER, severity); + return r; +} - va_start(ap, format); - r = control_event_status(EVENT_STATUS_SERVER, severity, format, ap); - va_end(ap); +/** Format and send an EVENT_STATUS_SERVER LOG_ERR event, and flush it to the + * controller(s) immediately. */ +int +control_event_server_error(const char *format, ...) +{ + CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_SERVER, LOG_ERR); + /* Force a flush, since we may be about to die horribly */ + queued_events_flush_all(1); return r; } @@ -6262,6 +6426,16 @@ control_free_all(void) SMARTLIST_FOREACH(detached_onion_services, char *, cp, tor_free(cp)); smartlist_free(detached_onion_services); } + if (queued_control_events) { + SMARTLIST_FOREACH(queued_control_events, queued_event_t *, ev, + queued_event_free(ev)); + smartlist_free(queued_control_events); + queued_control_events = NULL; + } + if (flush_queued_events_event) { + tor_event_free(flush_queued_events_event); + flush_queued_events_event = NULL; + } } #ifdef TOR_UNIT_TESTS diff --git a/src/or/control.h b/src/or/control.h index 2d02443834..084030562a 100644 --- a/src/or/control.h +++ b/src/or/control.h @@ -78,6 +78,14 @@ int control_event_client_status(int severity, const char *format, ...) CHECK_PRINTF(2,3); int control_event_server_status(int severity, const char *format, ...) CHECK_PRINTF(2,3); + +int control_event_general_error(const char *format, ...) + CHECK_PRINTF(1,2); +int control_event_client_error(const char *format, ...) + CHECK_PRINTF(1,2); +int control_event_server_error(const char *format, ...) + CHECK_PRINTF(1,2); + int control_event_guard(const char *nickname, const char *digest, const char *status); int control_event_conf_changed(const smartlist_t *elements); @@ -215,6 +223,8 @@ typedef int event_format_t; MOCK_DECL(STATIC void, send_control_event_string,(uint16_t event, event_format_t which, const char *msg)); +MOCK_DECL(STATIC void, + queue_control_event_string,(uint16_t event, char *msg)); void control_testing_set_global_event_mask(uint64_t mask); #endif diff --git a/src/or/main.c b/src/or/main.c index 5bff82b3cf..ee56e10019 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -1007,7 +1007,7 @@ directory_all_unreachable(time_t now) connection_mark_unattached_ap(entry_conn, END_STREAM_REASON_NET_UNREACHABLE); } - control_event_general_status(LOG_ERR, "DIR_ALL_UNREACHABLE"); + control_event_general_error("DIR_ALL_UNREACHABLE"); } /** This function is called whenever we successfully pull down some new diff --git a/src/test/test_hs.c b/src/test/test_hs.c index 6d01798a63..126e211858 100644 --- a/src/test/test_hs.c +++ b/src/test/test_hs.c @@ -102,13 +102,11 @@ static char *received_msg = NULL; /** Mock function for send_control_event_string */ static void -send_control_event_string_replacement(uint16_t event, event_format_t which, - const char *msg) +queue_control_event_string_replacement(uint16_t event, char *msg) { (void) event; - (void) which; tor_free(received_msg); - received_msg = tor_strdup(msg); + received_msg = msg; } /** Mock function for node_describe_longname_by_id, it returns either @@ -141,8 +139,8 @@ test_hs_desc_event(void *arg) char desc_id_base32[REND_DESC_ID_V2_LEN_BASE32 + 1]; (void) arg; - MOCK(send_control_event_string, - send_control_event_string_replacement); + MOCK(queue_control_event_string, + queue_control_event_string_replacement); MOCK(node_describe_longname_by_id, node_describe_longname_by_id_replacement); @@ -225,7 +223,7 @@ test_hs_desc_event(void *arg) smartlist_free(rend_query.hsdirs_fp); done: - UNMOCK(send_control_event_string); + UNMOCK(queue_control_event_string); UNMOCK(node_describe_longname_by_id); tor_free(received_msg); } diff --git a/src/test/test_pt.c b/src/test/test_pt.c index 996ef8666b..6c9aefc487 100644 --- a/src/test/test_pt.c +++ b/src/test/test_pt.c @@ -333,15 +333,13 @@ static uint16_t controlevent_event = 0; static smartlist_t *controlevent_msgs = NULL; static void -send_control_event_string_replacement(uint16_t event, event_format_t which, - const char *msg) +queue_control_event_string_replacement(uint16_t event, char *msg) { - (void) which; ++controlevent_n; controlevent_event = event; if (!controlevent_msgs) controlevent_msgs = smartlist_new(); - smartlist_add(controlevent_msgs, tor_strdup(msg)); + smartlist_add(controlevent_msgs, msg); } /* Test the configure_proxy() function. */ @@ -360,8 +358,8 @@ test_pt_configure_proxy(void *arg) tor_process_handle_destroy_replacement); MOCK(get_or_state, get_or_state_replacement); - MOCK(send_control_event_string, - send_control_event_string_replacement); + MOCK(queue_control_event_string, + queue_control_event_string_replacement); control_testing_set_global_event_mask(EVENT_TRANSPORT_LAUNCHED); @@ -435,7 +433,7 @@ test_pt_configure_proxy(void *arg) UNMOCK(tor_get_lines_from_handle); UNMOCK(tor_process_handle_destroy); UNMOCK(get_or_state); - UNMOCK(send_control_event_string); + UNMOCK(queue_control_event_string); if (controlevent_msgs) { SMARTLIST_FOREACH(controlevent_msgs, char *, cp, tor_free(cp)); smartlist_free(controlevent_msgs); |