diff options
author | Nick Mathewson <nickm@torproject.org> | 2015-08-12 10:10:11 -0400 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2015-08-18 08:56:23 -0400 |
commit | e2a6a7ec6178834c3de7a3be614679120e2c00c8 (patch) | |
tree | 96c4a4941e10c9557838e1022f36038eebda2d66 /src | |
parent | 81f3572467583c54eb2a22c3af93f7c819796563 (diff) | |
download | tor-e2a6a7ec6178834c3de7a3be614679120e2c00c8.tar.gz tor-e2a6a7ec6178834c3de7a3be614679120e2c00c8.zip |
Multithreading support for event-queue code.
Diffstat (limited to 'src')
-rw-r--r-- | src/or/control.c | 61 |
1 files changed, 46 insertions, 15 deletions
diff --git a/src/or/control.c b/src/or/control.c index 4b8e9c1573..2be99d5d7a 100644 --- a/src/or/control.c +++ b/src/or/control.c @@ -594,13 +594,20 @@ typedef struct queued_event_s { char *msg; } queued_event_t; -/** If this is greater than 0, we don't allow new events to be queued. */ +/** If this is greater than 0, we don't allow new events to be queued. + * XXXX This should be thread-local. */ static int block_event_queue = 0; /** Holds a smartlist of queued_event_t objects that may need to be sent * to one or more controllers */ static smartlist_t *queued_control_events = NULL; +/** True if the flush_queued_events_event is pending. */ +static int flush_queued_event_pending = 0; + +/** Lock to protect the above fields. */ +static tor_mutex_t *queued_control_events_lock = NULL; + /** An event that should fire in order to flush the contents of * queued_control_events. */ static struct event *flush_queued_events_event = NULL; @@ -621,6 +628,10 @@ control_initialize_event_queue(void) tor_assert(flush_queued_events_event); } } + + if (queued_control_events_lock == NULL) { + queued_control_events_lock = tor_mutex_new(); + } } /** Helper: inserts an event on the list of events queued to be sent to @@ -643,30 +654,43 @@ queue_control_event_string,(uint16_t event, char *msg)) tor_free(msg); return; } - if (block_event_queue) { - tor_free(msg); - return; - } queued_event_t *ev = tor_malloc(sizeof(*ev)); ev->event = event; ev->msg = msg; + tor_mutex_acquire(queued_control_events_lock); + if (block_event_queue) { /* XXXX This should be thread-specific. */ + tor_mutex_release(queued_control_events_lock); + tor_free(msg); + tor_free(ev); + return; + } + /* No queueing an event while queueing an event */ ++block_event_queue; tor_assert(queued_control_events); smartlist_add(queued_control_events, ev); - /* We just put the first event on the queue; mark the queue to be - * flushed. + int activate_event = 0; + if (! flush_queued_event_pending && in_main_thread()) { + activate_event = 1; + flush_queued_event_pending = 1; + } + + --block_event_queue; + + tor_mutex_release(queued_control_events_lock); + + /* We just put an event on the queue; mark the queue to be + * flushed. We only do this from the main thread for now; otherwise, + * we'd need to incur locking overhead in Libevent or use a socket. */ - if (smartlist_len(queued_control_events) == 1) { + if (activate_event) { tor_assert(flush_queued_events_event); event_active(flush_queued_events_event, EV_READ, 1); } - - --block_event_queue; } /** Release all storage held by <b>ev</b>. */ @@ -687,15 +711,20 @@ queued_event_free(queued_event_t *ev) static void queued_events_flush_all(int force) { - smartlist_t *all_conns = get_connection_array(); - smartlist_t *controllers = smartlist_new(); - if (PREDICT_UNLIKELY(queued_control_events == NULL)) { return; } + smartlist_t *all_conns = get_connection_array(); + smartlist_t *controllers = smartlist_new(); + smartlist_t *queued_events; + tor_mutex_acquire(queued_control_events_lock); /* No queueing an event while flushing events. */ ++block_event_queue; + flush_queued_event_pending = 0; + queued_events = queued_control_events; + queued_control_events = smartlist_new(); + tor_mutex_release(queued_control_events_lock); /* Gather all the controllers that will care... */ SMARTLIST_FOREACH_BEGIN(all_conns, connection_t *, conn) { @@ -708,7 +737,7 @@ queued_events_flush_all(int force) } } SMARTLIST_FOREACH_END(conn); - SMARTLIST_FOREACH_BEGIN(queued_control_events, queued_event_t *, ev) { + SMARTLIST_FOREACH_BEGIN(queued_events, queued_event_t *, ev) { const event_mask_t bit = ((event_mask_t)1) << ev->event; const size_t msg_len = strlen(ev->msg); SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *, @@ -728,10 +757,12 @@ queued_events_flush_all(int force) } SMARTLIST_FOREACH_END(control_conn); } - smartlist_clear(queued_control_events); + smartlist_free(queued_events); smartlist_free(controllers); + tor_mutex_acquire(queued_control_events_lock); --block_event_queue; + tor_mutex_release(queued_control_events_lock); } /** Libevent callback: Flushes pending events to controllers that are |