aboutsummaryrefslogtreecommitdiff
path: root/src/or
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2015-08-12 10:10:11 -0400
committerNick Mathewson <nickm@torproject.org>2015-08-18 08:56:23 -0400
commite2a6a7ec6178834c3de7a3be614679120e2c00c8 (patch)
tree96c4a4941e10c9557838e1022f36038eebda2d66 /src/or
parent81f3572467583c54eb2a22c3af93f7c819796563 (diff)
downloadtor-e2a6a7ec6178834c3de7a3be614679120e2c00c8.tar.gz
tor-e2a6a7ec6178834c3de7a3be614679120e2c00c8.zip
Multithreading support for event-queue code.
Diffstat (limited to 'src/or')
-rw-r--r--src/or/control.c61
1 files changed, 46 insertions, 15 deletions
diff --git a/src/or/control.c b/src/or/control.c
index 4b8e9c1573..2be99d5d7a 100644
--- a/src/or/control.c
+++ b/src/or/control.c
@@ -594,13 +594,20 @@ typedef struct queued_event_s {
char *msg;
} queued_event_t;
-/** If this is greater than 0, we don't allow new events to be queued. */
+/** If this is greater than 0, we don't allow new events to be queued.
+ * XXXX This should be thread-local. */
static int block_event_queue = 0;
/** Holds a smartlist of queued_event_t objects that may need to be sent
* to one or more controllers */
static smartlist_t *queued_control_events = NULL;
+/** True if the flush_queued_events_event is pending. */
+static int flush_queued_event_pending = 0;
+
+/** Lock to protect the above fields. */
+static tor_mutex_t *queued_control_events_lock = NULL;
+
/** An event that should fire in order to flush the contents of
* queued_control_events. */
static struct event *flush_queued_events_event = NULL;
@@ -621,6 +628,10 @@ control_initialize_event_queue(void)
tor_assert(flush_queued_events_event);
}
}
+
+ if (queued_control_events_lock == NULL) {
+ queued_control_events_lock = tor_mutex_new();
+ }
}
/** Helper: inserts an event on the list of events queued to be sent to
@@ -643,30 +654,43 @@ queue_control_event_string,(uint16_t event, char *msg))
tor_free(msg);
return;
}
- if (block_event_queue) {
- tor_free(msg);
- return;
- }
queued_event_t *ev = tor_malloc(sizeof(*ev));
ev->event = event;
ev->msg = msg;
+ tor_mutex_acquire(queued_control_events_lock);
+ if (block_event_queue) { /* XXXX This should be thread-specific. */
+ tor_mutex_release(queued_control_events_lock);
+ tor_free(msg);
+ tor_free(ev);
+ return;
+ }
+
/* No queueing an event while queueing an event */
++block_event_queue;
tor_assert(queued_control_events);
smartlist_add(queued_control_events, ev);
- /* We just put the first event on the queue; mark the queue to be
- * flushed.
+ int activate_event = 0;
+ if (! flush_queued_event_pending && in_main_thread()) {
+ activate_event = 1;
+ flush_queued_event_pending = 1;
+ }
+
+ --block_event_queue;
+
+ tor_mutex_release(queued_control_events_lock);
+
+ /* We just put an event on the queue; mark the queue to be
+ * flushed. We only do this from the main thread for now; otherwise,
+ * we'd need to incur locking overhead in Libevent or use a socket.
*/
- if (smartlist_len(queued_control_events) == 1) {
+ if (activate_event) {
tor_assert(flush_queued_events_event);
event_active(flush_queued_events_event, EV_READ, 1);
}
-
- --block_event_queue;
}
/** Release all storage held by <b>ev</b>. */
@@ -687,15 +711,20 @@ queued_event_free(queued_event_t *ev)
static void
queued_events_flush_all(int force)
{
- smartlist_t *all_conns = get_connection_array();
- smartlist_t *controllers = smartlist_new();
-
if (PREDICT_UNLIKELY(queued_control_events == NULL)) {
return;
}
+ smartlist_t *all_conns = get_connection_array();
+ smartlist_t *controllers = smartlist_new();
+ smartlist_t *queued_events;
+ tor_mutex_acquire(queued_control_events_lock);
/* No queueing an event while flushing events. */
++block_event_queue;
+ flush_queued_event_pending = 0;
+ queued_events = queued_control_events;
+ queued_control_events = smartlist_new();
+ tor_mutex_release(queued_control_events_lock);
/* Gather all the controllers that will care... */
SMARTLIST_FOREACH_BEGIN(all_conns, connection_t *, conn) {
@@ -708,7 +737,7 @@ queued_events_flush_all(int force)
}
} SMARTLIST_FOREACH_END(conn);
- SMARTLIST_FOREACH_BEGIN(queued_control_events, queued_event_t *, ev) {
+ SMARTLIST_FOREACH_BEGIN(queued_events, queued_event_t *, ev) {
const event_mask_t bit = ((event_mask_t)1) << ev->event;
const size_t msg_len = strlen(ev->msg);
SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *,
@@ -728,10 +757,12 @@ queued_events_flush_all(int force)
} SMARTLIST_FOREACH_END(control_conn);
}
- smartlist_clear(queued_control_events);
+ smartlist_free(queued_events);
smartlist_free(controllers);
+ tor_mutex_acquire(queued_control_events_lock);
--block_event_queue;
+ tor_mutex_release(queued_control_events_lock);
}
/** Libevent callback: Flushes pending events to controllers that are