summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/common/compat_pthreads.c27
-rw-r--r--src/common/compat_threads.h13
-rw-r--r--src/common/compat_winthreads.c27
-rw-r--r--src/or/control.c47
-rw-r--r--src/test/test_threads.c14
5 files changed, 104 insertions, 24 deletions
diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c
index b78ab3d871..4b32fc93d2 100644
--- a/src/common/compat_pthreads.c
+++ b/src/common/compat_pthreads.c
@@ -275,6 +275,33 @@ tor_cond_signal_all(tor_cond_t *cond)
pthread_cond_broadcast(&cond->cond);
}
+int
+tor_threadlocal_init(tor_threadlocal_t *threadlocal)
+{
+ int err = pthread_key_create(&threadlocal->key, NULL);
+ return err ? -1 : 0;
+}
+
+void
+tor_threadlocal_destroy(tor_threadlocal_t *threadlocal)
+{
+ pthread_key_delete(threadlocal->key);
+ memset(threadlocal, 0, sizeof(tor_threadlocal_t));
+}
+
+void *
+tor_threadlocal_get(tor_threadlocal_t *threadlocal)
+{
+ return pthread_getspecific(threadlocal->key);
+}
+
+void
+tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value)
+{
+ int err = pthread_setspecific(threadlocal->key, value);
+ tor_assert(err == 0);
+}
+
/** Set up common structures for use by threading. */
void
tor_threads_init(void)
diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h
index acf3083f37..a1b5056a40 100644
--- a/src/common/compat_threads.h
+++ b/src/common/compat_threads.h
@@ -111,5 +111,18 @@ typedef struct alert_sockets_s {
int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags);
void alert_sockets_close(alert_sockets_t *socks);
+typedef struct tor_threadlocal_s {
+#ifdef _WIN32
+ DWORD index;
+#else
+ pthread_key_t key;
+#endif
+} tor_threadlocal_t;
+
+int tor_threadlocal_init(tor_threadlocal_t *threadlocal);
+void tor_threadlocal_destroy(tor_threadlocal_t *threadlocal);
+void *tor_threadlocal_get(tor_threadlocal_t *threadlocal);
+void tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value);
+
#endif
diff --git a/src/common/compat_winthreads.c b/src/common/compat_winthreads.c
index 71b994c4e4..3d9e236d8b 100644
--- a/src/common/compat_winthreads.c
+++ b/src/common/compat_winthreads.c
@@ -123,6 +123,33 @@ tor_cond_signal_all(tor_cond_t *cond)
}
int
+tor_threadlocal_init(tor_threadlocal_t *threadlocal)
+{
+ threadlocal->index = TlsAlloc();
+ return (threadlocal->index == TLS_OUT_OF_INDEXES) ? -1 : 0;
+}
+
+void
+tor_threadlocal_destroy(tor_threadlocal_t *threadlocal)
+{
+ TlsFree(threadlocal->index);
+ memset(threadlocal, 0, sizeof(tor_threadlocal_t));
+}
+
+void *
+tor_threadlocal_get(tor_threadlocal_t *threadlocal)
+{
+ return TlsGetValue(threadlocal->index);
+}
+
+void
+tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value)
+{
+ BOOL ok = TlsSetValue(threadlocal->index, value);
+ tor_assert(ok);
+}
+
+int
tor_cond_wait(tor_cond_t *cond, tor_mutex_t *lock_, const struct timeval *tv)
{
CRITICAL_SECTION *lock = &lock_->mutex;
diff --git a/src/or/control.c b/src/or/control.c
index 2be99d5d7a..c80546529e 100644
--- a/src/or/control.c
+++ b/src/or/control.c
@@ -594,9 +594,9 @@ typedef struct queued_event_s {
char *msg;
} queued_event_t;
-/** If this is greater than 0, we don't allow new events to be queued.
- * XXXX This should be thread-local. */
-static int block_event_queue = 0;
+/** Pointer to int. If this is greater than 0, we don't allow new events to be
+ * queued. */
+static tor_threadlocal_t block_event_queue;
/** Holds a smartlist of queued_event_t objects that may need to be sent
* to one or more controllers */
@@ -631,9 +631,21 @@ control_initialize_event_queue(void)
if (queued_control_events_lock == NULL) {
queued_control_events_lock = tor_mutex_new();
+ tor_threadlocal_init(&block_event_queue);
}
}
+static int *
+get_block_event_queue(void)
+{
+ int *val = tor_threadlocal_get(&block_event_queue);
+ if (PREDICT_UNLIKELY(val == NULL)) {
+ val = tor_malloc_zero(sizeof(int));
+ tor_threadlocal_set(&block_event_queue, val);
+ }
+ return val;
+}
+
/** Helper: inserts an event on the list of events queued to be sent to
* one or more controllers, and schedules the events to be flushed if needed.
*
@@ -655,21 +667,20 @@ queue_control_event_string,(uint16_t event, char *msg))
return;
}
- queued_event_t *ev = tor_malloc(sizeof(*ev));
- ev->event = event;
- ev->msg = msg;
-
- tor_mutex_acquire(queued_control_events_lock);
- if (block_event_queue) { /* XXXX This should be thread-specific. */
- tor_mutex_release(queued_control_events_lock);
+ int *block_event_queue = get_block_event_queue();
+ if (*block_event_queue) {
tor_free(msg);
- tor_free(ev);
return;
}
+ queued_event_t *ev = tor_malloc(sizeof(*ev));
+ ev->event = event;
+ ev->msg = msg;
+
/* No queueing an event while queueing an event */
- ++block_event_queue;
+ ++*block_event_queue;
+ tor_mutex_acquire(queued_control_events_lock);
tor_assert(queued_control_events);
smartlist_add(queued_control_events, ev);
@@ -679,10 +690,10 @@ queue_control_event_string,(uint16_t event, char *msg))
flush_queued_event_pending = 1;
}
- --block_event_queue;
-
tor_mutex_release(queued_control_events_lock);
+ --*block_event_queue;
+
/* We just put an event on the queue; mark the queue to be
* flushed. We only do this from the main thread for now; otherwise,
* we'd need to incur locking overhead in Libevent or use a socket.
@@ -718,9 +729,11 @@ queued_events_flush_all(int force)
smartlist_t *controllers = smartlist_new();
smartlist_t *queued_events;
+ int *block_event_queue = get_block_event_queue();
+ ++*block_event_queue;
+
tor_mutex_acquire(queued_control_events_lock);
/* No queueing an event while flushing events. */
- ++block_event_queue;
flush_queued_event_pending = 0;
queued_events = queued_control_events;
queued_control_events = smartlist_new();
@@ -760,9 +773,7 @@ queued_events_flush_all(int force)
smartlist_free(queued_events);
smartlist_free(controllers);
- tor_mutex_acquire(queued_control_events_lock);
- --block_event_queue;
- tor_mutex_release(queued_control_events_lock);
+ --*block_event_queue;
}
/** Libevent callback: Flushes pending events to controllers that are
diff --git a/src/test/test_threads.c b/src/test/test_threads.c
index 2ac08d4d28..35f5dc8ea3 100644
--- a/src/test/test_threads.c
+++ b/src/test/test_threads.c
@@ -28,7 +28,7 @@ static unsigned long thread_fn_tid1, thread_fn_tid2;
static void thread_test_func_(void* _s) ATTR_NORETURN;
/** How many iterations have the threads in the unit test run? */
-static int t1_count = 0, t2_count = 0;
+static tor_threadlocal_t count;
/** Helper function for threading unit tests: This function runs in a
* subthread. It grabs its own mutex (start1 or start2) to make sure that it
@@ -38,19 +38,19 @@ static void
thread_test_func_(void* _s)
{
char *s = _s;
- int i, *count;
+ int i;
tor_mutex_t *m;
char buf[64];
char **cp;
+ int *mycount = tor_malloc_zero(sizeof(int));
+ tor_threadlocal_set(&count, mycount);
if (!strcmp(s, "thread 1")) {
m = thread_test_start1_;
cp = &thread1_name_;
- count = &t1_count;
thread_fn_tid1 = tor_get_thread_id();
} else {
m = thread_test_start2_;
cp = &thread2_name_;
- count = &t2_count;
thread_fn_tid2 = tor_get_thread_id();
}
@@ -62,8 +62,10 @@ thread_test_func_(void* _s)
for (i=0; i<10000; ++i) {
tor_mutex_acquire(thread_test_mutex_);
strmap_set(thread_test_strmap_, "last to run", *cp);
- ++*count;
tor_mutex_release(thread_test_mutex_);
+ int *tls_count = tor_threadlocal_get(&count);
+ tor_assert(tls_count == mycount);
+ ++*tls_count;
}
tor_mutex_acquire(thread_test_mutex_);
strmap_set(thread_test_strmap_, s, *cp);
@@ -89,6 +91,7 @@ test_threads_basic(void *arg)
tv.tv_usec=100*1000;
#endif
(void) arg;
+ tt_int_op(tor_threadlocal_init(&count), OP_EQ, 0);
set_main_thread();
@@ -128,7 +131,6 @@ test_threads_basic(void *arg)
tor_mutex_free(thread_test_mutex_);
if (timedout) {
- printf("\nTimed out: %d %d", t1_count, t2_count);
tt_assert(strmap_get(thread_test_strmap_, "thread 1"));
tt_assert(strmap_get(thread_test_strmap_, "thread 2"));
tt_assert(!timedout);