aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--changes/decouple_control_events8
-rw-r--r--src/or/control.c272
-rw-r--r--src/or/control.h10
-rw-r--r--src/or/main.c2
-rw-r--r--src/test/test_hs.c12
-rw-r--r--src/test/test_pt.c12
6 files changed, 252 insertions, 64 deletions
diff --git a/changes/decouple_control_events b/changes/decouple_control_events
new file mode 100644
index 0000000000..67c9c11f87
--- /dev/null
+++ b/changes/decouple_control_events
@@ -0,0 +1,8 @@
+ o Code simplification and refactoring:
+ - When generating an event to send to the controller, we no longer
+ put the event over the network immediately. Instead, we queue
+ these events, and use a Libevent callback to deliver them.
+ This change simplifies Tor's callgraph by reducing the number
+ of functions from which all other Tor functions are reachable.
+ Closes ticket 16695.
+
diff --git a/src/or/control.c b/src/or/control.c
index 7a113f2c1c..a17f6977f1 100644
--- a/src/or/control.c
+++ b/src/or/control.c
@@ -20,6 +20,7 @@
#include "circuitstats.h"
#include "circuituse.h"
#include "command.h"
+#include "compat_libevent.h"
#include "config.h"
#include "confparse.h"
#include "connection.h"
@@ -50,6 +51,12 @@
#include <sys/resource.h>
#endif
+#ifdef HAVE_EVENT2_EVENT_H
+#include <event2/event.h>
+#else
+#include <event.h>
+#endif
+
#include "crypto_s2k.h"
#include "procmon.h"
@@ -181,6 +188,8 @@ static void orconn_target_get_name(char *buf, size_t len,
static int get_cached_network_liveness(void);
static void set_cached_network_liveness(int liveness);
+static void flush_queued_events_cb(evutil_socket_t fd, short what, void *arg);
+
/** Given a control event code for a message event, return the corresponding
* log severity. */
static INLINE int
@@ -578,6 +587,156 @@ send_control_done(control_connection_t *conn)
connection_write_str_to_buf("250 OK\r\n", conn);
}
+/** Represents an event that's queued to be sent to one or more
+ * controllers. */
+typedef struct queued_event_s {
+ uint16_t event;
+ char *msg;
+} queued_event_t;
+
+/** If this is greater than 0, we don't allow new events to be queued. */
+static int block_event_queue = 0;
+
+/** Holds a smartlist of queued_event_t objects that may need to be sent
+ * to one or more controllers */
+static smartlist_t *queued_control_events = NULL;
+
+/** An event that should fire in order to flush the contents of
+ * queued_control_events. */
+static struct event *flush_queued_events_event = NULL;
+
+/** Helper: inserts an event on the list of events queued to be sent to
+ * one or more controllers, and schedules the events to be flushed if needed.
+ *
+ * This function takes ownership of <b>msg</b>, and may free it.
+ *
+ * We queue these events rather than send them immediately in order to break
+ * the dependency in our callgraph from code that generates events for the
+ * controller, and the network layer at large. Otherwise, nearly every
+ * interesting part of Tor would potentially call every other interesting part
+ * of Tor.
+ */
+MOCK_IMPL(STATIC void,
+queue_control_event_string,(uint16_t event, char *msg))
+{
+ if (PREDICT_UNLIKELY(queued_control_events == NULL)) {
+ queued_control_events = smartlist_new();
+ }
+
+ /* This is redundant with checks done elsewhere, but it's a last-ditch
+ * attempt to avoid queueing something we shouldn't have to queue. */
+ if (PREDICT_UNLIKELY( ! EVENT_IS_INTERESTING(event) )) {
+ tor_free(msg);
+ return;
+ }
+ if (block_event_queue) {
+ tor_free(msg);
+ return;
+ }
+
+ /* No queueing an event while queueing an event */
+ ++block_event_queue;
+
+ queued_event_t *ev = tor_malloc(sizeof(*ev));
+ ev->event = event;
+ ev->msg = msg;
+
+ smartlist_add(queued_control_events, ev);
+
+ /* We just put the first event on the queue; mark the queue to be
+ * flushed.
+ */
+ if (smartlist_len(queued_control_events) == 1) {
+ if (PREDICT_UNLIKELY(flush_queued_events_event == NULL)) {
+ struct event_base *b = tor_libevent_get_base();
+ tor_assert(b);
+ flush_queued_events_event = tor_event_new(b,
+ -1, 0, flush_queued_events_cb,
+ NULL);
+ tor_assert(flush_queued_events_event);
+ }
+ event_active(flush_queued_events_event, EV_READ, 1);
+ }
+
+ --block_event_queue;
+}
+
+/** Release all storage held by <b>ev</b>. */
+static void
+queued_event_free(queued_event_t *ev)
+{
+ if (ev == NULL)
+ return;
+
+ tor_free(ev->msg);
+ tor_free(ev);
+}
+
+/** Send every queued event to every controller that's interested in it,
+ * and remove the events from the queue. If <b>force</b> is true,
+ * then make all controllers send their data out immediately, since we
+ * may be about to shut down. */
+static void
+queued_events_flush_all(int force)
+{
+ smartlist_t *all_conns = get_connection_array();
+ smartlist_t *controllers = smartlist_new();
+
+ if (PREDICT_UNLIKELY(queued_control_events == NULL)) {
+ return;
+ }
+
+ /* No queueing an event while flushing events. */
+ ++block_event_queue;
+
+ /* Gather all the controllers that will care... */
+ SMARTLIST_FOREACH_BEGIN(all_conns, connection_t *, conn) {
+ if (conn->type == CONN_TYPE_CONTROL &&
+ !conn->marked_for_close &&
+ conn->state == CONTROL_CONN_STATE_OPEN) {
+ control_connection_t *control_conn = TO_CONTROL_CONN(conn);
+
+ smartlist_add(controllers, control_conn);
+ }
+ } SMARTLIST_FOREACH_END(conn);
+
+ SMARTLIST_FOREACH_BEGIN(queued_control_events, queued_event_t *, ev) {
+ const event_mask_t bit = ((event_mask_t)1) << ev->event;
+ const size_t msg_len = strlen(ev->msg);
+ SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *,
+ control_conn) {
+ if (control_conn->event_mask & bit) {
+ connection_write_to_buf(ev->msg, msg_len, TO_CONN(control_conn));
+ }
+ } SMARTLIST_FOREACH_END(control_conn);
+
+ queued_event_free(ev);
+ } SMARTLIST_FOREACH_END(ev);
+
+ if (force) {
+ SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *,
+ control_conn) {
+ connection_flush(TO_CONN(control_conn));
+ } SMARTLIST_FOREACH_END(control_conn);
+ }
+
+ smartlist_clear(queued_control_events);
+ smartlist_free(controllers);
+
+ --block_event_queue;
+}
+
+/** Libevent callback: Flushes pending events to controllers that are
+ * interested in them */
+static void
+flush_queued_events_cb(evutil_socket_t fd, short what, void *arg)
+{
+ (void) fd;
+ (void) what;
+ (void) arg;
+ queued_events_flush_all(0);
+}
+
/** Send an event to all v1 controllers that are listening for code
* <b>event</b>. The event's body is given by <b>msg</b>.
*
@@ -592,32 +751,9 @@ MOCK_IMPL(STATIC void,
send_control_event_string,(uint16_t event, event_format_t which,
const char *msg))
{
- smartlist_t *conns = get_connection_array();
- (void)which;
+ (void) which;
tor_assert(event >= EVENT_MIN_ && event <= EVENT_MAX_);
-
- SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
- if (conn->type == CONN_TYPE_CONTROL &&
- !conn->marked_for_close &&
- conn->state == CONTROL_CONN_STATE_OPEN) {
- control_connection_t *control_conn = TO_CONTROL_CONN(conn);
-
- if (control_conn->event_mask & (((event_mask_t)1)<<event)) {
- int is_err = 0;
- connection_write_to_buf(msg, strlen(msg), TO_CONN(control_conn));
- if (event == EVENT_ERR_MSG)
- is_err = 1;
- else if (event == EVENT_STATUS_GENERAL)
- is_err = !strcmpstart(msg, "STATUS_GENERAL ERR ");
- else if (event == EVENT_STATUS_CLIENT)
- is_err = !strcmpstart(msg, "STATUS_CLIENT ERR ");
- else if (event == EVENT_STATUS_SERVER)
- is_err = !strcmpstart(msg, "STATUS_SERVER ERR ");
- if (is_err)
- connection_flush(TO_CONN(control_conn));
- }
- }
- } SMARTLIST_FOREACH_END(conn);
+ queue_control_event_string(event, tor_strdup(msg));
}
/** Helper for send_control_event and control_event_status:
@@ -628,6 +764,8 @@ static void
send_control_event_impl(uint16_t event, event_format_t which,
const char *format, va_list ap)
{
+ (void) which;
+
char *buf = NULL;
int len;
@@ -637,9 +775,7 @@ send_control_event_impl(uint16_t event, event_format_t which,
return;
}
- send_control_event_string(event, which|ALL_FORMATS, buf);
-
- tor_free(buf);
+ queue_control_event_string(event, buf);
}
/** Send an event to all v1 controllers that are listening for code
@@ -5032,6 +5168,10 @@ control_event_logmsg(int severity, uint32_t domain, const char *msg)
}
++disable_log_messages;
send_control_event(event, ALL_FORMATS, "650 %s %s\r\n", s, b?b:msg);
+ if (severity == LOG_ERR) {
+ /* Force a flush, since we may be about to die horribly */
+ queued_events_flush_all(1);
+ }
--disable_log_messages;
tor_free(b);
}
@@ -5401,19 +5541,35 @@ control_event_status(int type, int severity, const char *format, va_list args)
return 0;
}
+#define CONTROL_EVENT_STATUS_BODY(event, sev) \
+ int r; \
+ do { \
+ va_list ap; \
+ if (!EVENT_IS_INTERESTING(event)) \
+ return 0; \
+ \
+ va_start(ap, format); \
+ r = control_event_status((event), (sev), format, ap); \
+ va_end(ap); \
+ } while (0)
+
/** Format and send an EVENT_STATUS_GENERAL event whose main text is obtained
* by formatting the arguments using the printf-style <b>format</b>. */
int
control_event_general_status(int severity, const char *format, ...)
{
- va_list ap;
- int r;
- if (!EVENT_IS_INTERESTING(EVENT_STATUS_GENERAL))
- return 0;
+ CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_GENERAL, severity);
+ return r;
+}
- va_start(ap, format);
- r = control_event_status(EVENT_STATUS_GENERAL, severity, format, ap);
- va_end(ap);
+/** Format and send an EVENT_STATUS_GENERAL LOG_ERR event, and flush it to the
+ * controller(s) immediately. */
+int
+control_event_general_error(const char *format, ...)
+{
+ CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_GENERAL, LOG_ERR);
+ /* Force a flush, since we may be about to die horribly */
+ queued_events_flush_all(1);
return r;
}
@@ -5422,14 +5578,18 @@ control_event_general_status(int severity, const char *format, ...)
int
control_event_client_status(int severity, const char *format, ...)
{
- va_list ap;
- int r;
- if (!EVENT_IS_INTERESTING(EVENT_STATUS_CLIENT))
- return 0;
+ CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_CLIENT, severity);
+ return r;
+}
- va_start(ap, format);
- r = control_event_status(EVENT_STATUS_CLIENT, severity, format, ap);
- va_end(ap);
+/** Format and send an EVENT_STATUS_CLIENT LOG_ERR event, and flush it to the
+ * controller(s) immediately. */
+int
+control_event_client_error(const char *format, ...)
+{
+ CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_CLIENT, LOG_ERR);
+ /* Force a flush, since we may be about to die horribly */
+ queued_events_flush_all(1);
return r;
}
@@ -5438,14 +5598,18 @@ control_event_client_status(int severity, const char *format, ...)
int
control_event_server_status(int severity, const char *format, ...)
{
- va_list ap;
- int r;
- if (!EVENT_IS_INTERESTING(EVENT_STATUS_SERVER))
- return 0;
+ CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_SERVER, severity);
+ return r;
+}
- va_start(ap, format);
- r = control_event_status(EVENT_STATUS_SERVER, severity, format, ap);
- va_end(ap);
+/** Format and send an EVENT_STATUS_SERVER LOG_ERR event, and flush it to the
+ * controller(s) immediately. */
+int
+control_event_server_error(const char *format, ...)
+{
+ CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_SERVER, LOG_ERR);
+ /* Force a flush, since we may be about to die horribly */
+ queued_events_flush_all(1);
return r;
}
@@ -6262,6 +6426,16 @@ control_free_all(void)
SMARTLIST_FOREACH(detached_onion_services, char *, cp, tor_free(cp));
smartlist_free(detached_onion_services);
}
+ if (queued_control_events) {
+ SMARTLIST_FOREACH(queued_control_events, queued_event_t *, ev,
+ queued_event_free(ev));
+ smartlist_free(queued_control_events);
+ queued_control_events = NULL;
+ }
+ if (flush_queued_events_event) {
+ tor_event_free(flush_queued_events_event);
+ flush_queued_events_event = NULL;
+ }
}
#ifdef TOR_UNIT_TESTS
diff --git a/src/or/control.h b/src/or/control.h
index 2d02443834..084030562a 100644
--- a/src/or/control.h
+++ b/src/or/control.h
@@ -78,6 +78,14 @@ int control_event_client_status(int severity, const char *format, ...)
CHECK_PRINTF(2,3);
int control_event_server_status(int severity, const char *format, ...)
CHECK_PRINTF(2,3);
+
+int control_event_general_error(const char *format, ...)
+ CHECK_PRINTF(1,2);
+int control_event_client_error(const char *format, ...)
+ CHECK_PRINTF(1,2);
+int control_event_server_error(const char *format, ...)
+ CHECK_PRINTF(1,2);
+
int control_event_guard(const char *nickname, const char *digest,
const char *status);
int control_event_conf_changed(const smartlist_t *elements);
@@ -215,6 +223,8 @@ typedef int event_format_t;
MOCK_DECL(STATIC void,
send_control_event_string,(uint16_t event, event_format_t which,
const char *msg));
+MOCK_DECL(STATIC void,
+ queue_control_event_string,(uint16_t event, char *msg));
void control_testing_set_global_event_mask(uint64_t mask);
#endif
diff --git a/src/or/main.c b/src/or/main.c
index 5bff82b3cf..ee56e10019 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -1007,7 +1007,7 @@ directory_all_unreachable(time_t now)
connection_mark_unattached_ap(entry_conn,
END_STREAM_REASON_NET_UNREACHABLE);
}
- control_event_general_status(LOG_ERR, "DIR_ALL_UNREACHABLE");
+ control_event_general_error("DIR_ALL_UNREACHABLE");
}
/** This function is called whenever we successfully pull down some new
diff --git a/src/test/test_hs.c b/src/test/test_hs.c
index 6d01798a63..126e211858 100644
--- a/src/test/test_hs.c
+++ b/src/test/test_hs.c
@@ -102,13 +102,11 @@ static char *received_msg = NULL;
/** Mock function for send_control_event_string
*/
static void
-send_control_event_string_replacement(uint16_t event, event_format_t which,
- const char *msg)
+queue_control_event_string_replacement(uint16_t event, char *msg)
{
(void) event;
- (void) which;
tor_free(received_msg);
- received_msg = tor_strdup(msg);
+ received_msg = msg;
}
/** Mock function for node_describe_longname_by_id, it returns either
@@ -141,8 +139,8 @@ test_hs_desc_event(void *arg)
char desc_id_base32[REND_DESC_ID_V2_LEN_BASE32 + 1];
(void) arg;
- MOCK(send_control_event_string,
- send_control_event_string_replacement);
+ MOCK(queue_control_event_string,
+ queue_control_event_string_replacement);
MOCK(node_describe_longname_by_id,
node_describe_longname_by_id_replacement);
@@ -225,7 +223,7 @@ test_hs_desc_event(void *arg)
smartlist_free(rend_query.hsdirs_fp);
done:
- UNMOCK(send_control_event_string);
+ UNMOCK(queue_control_event_string);
UNMOCK(node_describe_longname_by_id);
tor_free(received_msg);
}
diff --git a/src/test/test_pt.c b/src/test/test_pt.c
index 996ef8666b..6c9aefc487 100644
--- a/src/test/test_pt.c
+++ b/src/test/test_pt.c
@@ -333,15 +333,13 @@ static uint16_t controlevent_event = 0;
static smartlist_t *controlevent_msgs = NULL;
static void
-send_control_event_string_replacement(uint16_t event, event_format_t which,
- const char *msg)
+queue_control_event_string_replacement(uint16_t event, char *msg)
{
- (void) which;
++controlevent_n;
controlevent_event = event;
if (!controlevent_msgs)
controlevent_msgs = smartlist_new();
- smartlist_add(controlevent_msgs, tor_strdup(msg));
+ smartlist_add(controlevent_msgs, msg);
}
/* Test the configure_proxy() function. */
@@ -360,8 +358,8 @@ test_pt_configure_proxy(void *arg)
tor_process_handle_destroy_replacement);
MOCK(get_or_state,
get_or_state_replacement);
- MOCK(send_control_event_string,
- send_control_event_string_replacement);
+ MOCK(queue_control_event_string,
+ queue_control_event_string_replacement);
control_testing_set_global_event_mask(EVENT_TRANSPORT_LAUNCHED);
@@ -435,7 +433,7 @@ test_pt_configure_proxy(void *arg)
UNMOCK(tor_get_lines_from_handle);
UNMOCK(tor_process_handle_destroy);
UNMOCK(get_or_state);
- UNMOCK(send_control_event_string);
+ UNMOCK(queue_control_event_string);
if (controlevent_msgs) {
SMARTLIST_FOREACH(controlevent_msgs, char *, cp, tor_free(cp));
smartlist_free(controlevent_msgs);