summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2016-03-24 11:15:19 -0400
committerNick Mathewson <nickm@torproject.org>2016-05-11 13:25:11 -0400
commit80a6c8caa3910b18ce50ef870ef0af546c64faa2 (patch)
tree2f48426164336a059e8f3ff02ae631463017d78c
parent4e76b206b5fac18805a5cb2c3440dd367bd3b92d (diff)
downloadtor-80a6c8caa3910b18ce50ef870ef0af546c64faa2.tar.gz
tor-80a6c8caa3910b18ce50ef870ef0af546c64faa2.zip
Basic work on a publish/subscribe abstraction
The goal here is to provide a way to decouple pieces of the code that want to learn "when something happens" from those that realize that it has happened. The implementation here consists of a generic backend, plus a set of macros to define and implement a set of type-safe frontends.
-rw-r--r--src/common/include.am2
-rw-r--r--src/common/pubsub.c129
-rw-r--r--src/common/pubsub.h177
-rw-r--r--src/test/include.am1
-rw-r--r--src/test/test.c2
-rw-r--r--src/test/test_pubsub.c85
6 files changed, 396 insertions, 0 deletions
diff --git a/src/common/include.am b/src/common/include.am
index 5afb30da6a..05342e9e1c 100644
--- a/src/common/include.am
+++ b/src/common/include.am
@@ -67,6 +67,7 @@ LIBOR_A_SOURCES = \
src/common/di_ops.c \
src/common/log.c \
src/common/memarea.c \
+ src/common/pubsub.c \
src/common/util.c \
src/common/util_format.c \
src/common/util_process.c \
@@ -132,6 +133,7 @@ COMMONHEADERS = \
src/common/memarea.h \
src/common/linux_syscalls.inc \
src/common/procmon.h \
+ src/common/pubsub.h \
src/common/sandbox.h \
src/common/testsupport.h \
src/common/torgzip.h \
diff --git a/src/common/pubsub.c b/src/common/pubsub.c
new file mode 100644
index 0000000000..98ec3f81cc
--- /dev/null
+++ b/src/common/pubsub.c
@@ -0,0 +1,129 @@
+/* Copyright (c) 2016, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file pubsub.c
+ *
+ * \brief DOCDOC
+ */
+
+#include "orconfig.h"
+#include "pubsub.h"
+#include "container.h"
+
+/** Helper: insert <b>s</b> into <b>topic's</b> list of subscribers, keeping
+ * them sorted in priority order. */
+static void
+subscriber_insert(pubsub_topic_t *topic, pubsub_subscriber_t *s)
+{
+ int i;
+ smartlist_t *sl = topic->subscribers;
+ for (i = 0; i < smartlist_len(sl); ++i) {
+ pubsub_subscriber_t *other = smartlist_get(sl, i);
+ if (s->priority < other->priority) {
+ break;
+ }
+ }
+ smartlist_insert(sl, i, s);
+}
+
+/**
+ * Add a new subscriber to <b>topic</b>, where (when an event is triggered),
+ * we'll notify the function <b>fn</b> by passing it <b>subscriber_data</b>.
+ * Return a handle to the subscribe which can later be passed to
+ * pubsub_unsubscribe_().
+ *
+ * Functions are called in priority order, from lowest to highest.
+ *
+ * See pubsub.h for <b>subscribe_flags</b>.
+ */
+const pubsub_subscriber_t *
+pubsub_subscribe_(pubsub_topic_t *topic,
+ pubsub_subscriber_fn_t fn,
+ void *subscriber_data,
+ unsigned subscribe_flags,
+ unsigned priority)
+{
+ tor_assert(! topic->locked);
+ if (subscribe_flags & SUBSCRIBE_ATSTART) {
+ tor_assert(topic->n_events_fired == 0);
+ }
+ pubsub_subscriber_t *r = tor_malloc_zero(sizeof(r));
+ r->priority = priority;
+ r->subscriber_flags = subscribe_flags;
+ r->fn = fn;
+ r->subscriber_data = subscriber_data;
+ if (topic->subscribers == NULL) {
+ topic->subscribers = smartlist_new();
+ }
+ subscriber_insert(topic, r);
+ return r;
+}
+
+/**
+ * Remove the subscriber <b>s</b> from <b>topic</b>. After calling this
+ * function, <b>s</b> may no longer be used.
+ */
+int
+pubsub_unsubscribe_(pubsub_topic_t *topic,
+ const pubsub_subscriber_t *s)
+{
+ tor_assert(! topic->locked);
+ smartlist_t *sl = topic->subscribers;
+ if (sl == NULL)
+ return -1;
+ int i = smartlist_pos(sl, s);
+ if (i == -1)
+ return -1;
+ pubsub_subscriber_t *tmp = smartlist_get(sl, i);
+ tor_assert(tmp == s);
+ smartlist_del_keeporder(sl, i);
+ tor_free(tmp);
+ return 0;
+}
+
+/**
+ * For every subscriber s in <b>topic</b>, invoke notify_fn on s and
+ * event_data. Return 0 if there were no nonzero return values, and -1 if
+ * there were any.
+ */
+int
+pubsub_notify_(pubsub_topic_t *topic, pubsub_notify_fn_t notify_fn,
+ void *event_data, unsigned notify_flags)
+{
+ tor_assert(! topic->locked);
+ (void) notify_flags;
+ smartlist_t *sl = topic->subscribers;
+ int n_bad = 0;
+ ++topic->n_events_fired;
+ if (sl == NULL)
+ return -1;
+ topic->locked = 1;
+ SMARTLIST_FOREACH_BEGIN(sl, pubsub_subscriber_t *, s) {
+ int r = notify_fn(s, event_data);
+ if (r != 0)
+ ++n_bad;
+ } SMARTLIST_FOREACH_END(s);
+ topic->locked = 0;
+ return (n_bad == 0) ? 0 : -1;
+}
+
+/**
+ * Release all storage held by <b>topic</b>.
+ */
+void
+pubsub_clear_(pubsub_topic_t *topic)
+{
+ tor_assert(! topic->locked);
+
+ smartlist_t *sl = topic->subscribers;
+ if (sl == NULL)
+ return;
+ SMARTLIST_FOREACH_BEGIN(sl, pubsub_subscriber_t *, s) {
+ tor_free(s);
+ } SMARTLIST_FOREACH_END(s);
+ smartlist_free(sl);
+ topic->subscribers = NULL;
+ topic->n_events_fired = 0;
+}
+
diff --git a/src/common/pubsub.h b/src/common/pubsub.h
new file mode 100644
index 0000000000..09e492ec4f
--- /dev/null
+++ b/src/common/pubsub.h
@@ -0,0 +1,177 @@
+/* Copyright (c) 2016, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file pubsub.h
+ * \brief Macros to implement publish/subscribe abstractions.
+ *
+ * To use these macros, call DECLARE_PUBSUB_TOPIC() with an identifier to use
+ * as your topic. Below, I'm going to assume you say DECLARE_PUBSUB_TOPIC(T).
+ *
+ * Doing this will declare the following types:
+ * typedef struct T_event_data_t T_event_data_t; // you define this struct
+ * typedef struct T_subscriber_data_t T_subscriber_data_t; // this one too.
+ * typedef struct T_subscriber_t T_subscriber_t; // opaque
+ * typedef int (*T_subscriber_fn_t)(T_event_data_t*, T_subscriber_data_t*);
+ *
+ * and it will declare the following functions:
+ * const T_subscriber_t *T_subscribe(T_subscriber_fn_t,
+ * T_subscriber_data_t *,
+ * unsigned flags,
+ * unsigned priority);
+ * int T_unsubscribe(const T_subscriber_t *)
+ *
+ * Elsewhere you can say DECLARE_NOTIFY_PUBSUB_TOPIC(static, T), which declares:
+ * static int T_notify(T_event_data_t *, unsigned notify_flags);
+ * static void T_clear(void);
+ *
+ * And in some C file, you would define these functions with:
+ * IMPLEMENT_PUBSUB_TOPIC(static, T).
+ *
+ * The implementations will be small typesafe wrappers over generic versions
+ * of the above functions.
+ *
+ * To use the typesafe functions, you add any number of subscribers with
+ * T_subscribe(). Each has an associated function pointer, data pointer,
+ * and priority. Later, you can invoke T_notify() to declare that the
+ * event has occurred. Each of the subscribers will be invoked once.
+ **/
+
+#ifndef TOR_PUBSUB_H
+#define TOR_PUBSUB_H
+
+#include "torint.h"
+
+/**
+ * Flag for T_subscribe: die with an assertion failure if the event
+ * have ever been published before. Used when a subscriber must absolutely
+ * never have missed an event.
+ */
+#define SUBSCRIBE_ATSTART (1u<<0)
+
+#define DECLARE_PUBSUB_STRUCT_TYPES(name) \
+ /* You define this type. */ \
+ typedef struct name ## _event_data_t name ## _event_data_t; \
+ /* You define this type. */ \
+ typedef struct name ## _subscriber_data_t name ## _subscriber_data_t;
+
+#define DECLARE_PUBSUB_TOPIC(name) \
+ /* This type is opaque. */ \
+ typedef struct name ## _subscriber_t name ## _subscriber_t; \
+ /* You declare functions matching this type. */ \
+ typedef int (*name ## _subscriber_fn_t)( \
+ name ## _event_data_t *data, \
+ name ## _subscriber_data_t *extra); \
+ /* Call this function to subscribe to a topic. */ \
+ const name ## _subscriber_t *name ## _subscribe( \
+ name##_subscriber_fn_t subscriber, \
+ name##_subscriber_data_t *extra_data, \
+ unsigned flags, \
+ unsigned priority); \
+ /* Call this function to unsubscribe from a topic. */ \
+ int name ## _unsubscribe(const name##_subscriber_t *s);
+
+#define DECLARE_NOTIFY_PUBSUB_TOPIC(linkage, name) \
+ /* Call this function to notify all subscribers. Flags not yet used. */ \
+ linkage int name ## _notify(name ## _event_data_t *data, unsigned flags); \
+ /* Call this function to release storage held by the topic. */ \
+ linkage void name ## _clear(void);
+
+/**
+ * Type used to hold a generic function for a subscriber.
+ *
+ * [Yes, it is safe to cast to this, so long as we cast back to the original
+ * type before calling. From C99: "A pointer to a function of one type may be
+ * converted to a pointer to a function of another type and back again; the
+ * result shall compare equal to the original pointer."]
+*/
+typedef int (*pubsub_subscriber_fn_t)(void *, void *);
+
+/**
+ * Helper type to implement pubsub abstraction. Don't use this directly.
+ * It represents a subscriber.
+ */
+typedef struct pubsub_subscriber_t {
+ /** Function to invoke when the event triggers. */
+ pubsub_subscriber_fn_t fn;
+ /** Data associated with this subscriber. */
+ void *subscriber_data;
+ /** Priority for this subscriber. Low priorities happen first. */
+ unsigned priority;
+ /** Flags set on this subscriber. Not yet used.*/
+ unsigned subscriber_flags;
+} pubsub_subscriber_t;
+
+/**
+ * Helper type to implement pubsub abstraction. Don't use this directly.
+ * It represents a topic, and keeps a record of subscribers.
+ */
+typedef struct pubsub_topic_t {
+ /** List of subscribers to this topic. May be NULL. */
+ struct smartlist_t *subscribers;
+ /** Total number of times that pubsub_notify_() has ever been called on this
+ * topic. */
+ uint64_t n_events_fired;
+ /** True iff we're running 'notify' on this topic, and shouldn't allow
+ * any concurrent modifications or events. */
+ unsigned locked;
+} pubsub_topic_t;
+
+const pubsub_subscriber_t *pubsub_subscribe_(pubsub_topic_t *topic,
+ pubsub_subscriber_fn_t fn,
+ void *subscriber_data,
+ unsigned subscribe_flags,
+ unsigned priority);
+int pubsub_unsubscribe_(pubsub_topic_t *topic, const pubsub_subscriber_t *sub);
+void pubsub_clear_(pubsub_topic_t *topic);
+typedef int (*pubsub_notify_fn_t)(pubsub_subscriber_t *subscriber,
+ void *notify_data);
+int pubsub_notify_(pubsub_topic_t *topic, pubsub_notify_fn_t notify_fn,
+ void *notify_data, unsigned notify_flags);
+
+#define IMPLEMENT_PUBSUB_TOPIC(notify_linkage, name) \
+ static pubsub_topic_t name ## _topic_ = { NULL, 0, 0 }; \
+ const name ## _subscriber_t * \
+ name ## _subscribe(name##_subscriber_fn_t subscriber, \
+ name##_subscriber_data_t *extra_data, \
+ unsigned flags, \
+ unsigned priority) \
+ { \
+ const pubsub_subscriber_t *s; \
+ s = pubsub_subscribe_(&name##_topic_, \
+ (pubsub_subscriber_fn_t)subscriber, \
+ extra_data, \
+ flags, \
+ priority); \
+ return (const name##_subscriber_t *)s; \
+ } \
+ int \
+ name ## _unsubscribe(const name##_subscriber_t *subscriber) \
+ { \
+ return pubsub_unsubscribe_(&name##_topic_, \
+ (const pubsub_subscriber_t *)subscriber); \
+ } \
+ static int \
+ name##_call_the_notify_fn_(pubsub_subscriber_t *subscriber, \
+ void *notify_data) \
+ { \
+ name ## _subscriber_fn_t fn; \
+ fn = (name ## _subscriber_fn_t) subscriber->fn; \
+ return fn(notify_data, subscriber->subscriber_data); \
+ } \
+ notify_linkage int \
+ name ## _notify(name ## _event_data_t *event_data, unsigned flags) \
+ { \
+ return pubsub_notify_(&name##_topic_, \
+ name##_call_the_notify_fn_, \
+ event_data, \
+ flags); \
+ } \
+ notify_linkage void \
+ name ## _clear(void) \
+ { \
+ pubsub_clear_(&name##_topic_); \
+ }
+
+#endif /* TOR_PUBSUB_H */
+
diff --git a/src/test/include.am b/src/test/include.am
index 7d80fdf152..f1f9047e84 100644
--- a/src/test/include.am
+++ b/src/test/include.am
@@ -97,6 +97,7 @@ src_test_test_SOURCES = \
src/test/test_policy.c \
src/test/test_procmon.c \
src/test/test_pt.c \
+ src/test/test_pubsub.c \
src/test/test_relay.c \
src/test/test_relaycell.c \
src/test/test_rendcache.c \
diff --git a/src/test/test.c b/src/test/test.c
index ed167a3e67..dcbaf5bbca 100644
--- a/src/test/test.c
+++ b/src/test/test.c
@@ -1159,6 +1159,7 @@ extern struct testcase_t oom_tests[];
extern struct testcase_t options_tests[];
extern struct testcase_t policy_tests[];
extern struct testcase_t procmon_tests[];
+extern struct testcase_t pubsub_tests[];
extern struct testcase_t pt_tests[];
extern struct testcase_t relay_tests[];
extern struct testcase_t relaycell_tests[];
@@ -1230,6 +1231,7 @@ struct testgroup_t testgroups[] = {
{ "util/format/", util_format_tests },
{ "util/logging/", logging_tests },
{ "util/process/", util_process_tests },
+ { "util/pubsub/", pubsub_tests },
{ "util/thread/", thread_tests },
{ "dns/", dns_tests },
END_OF_GROUPS
diff --git a/src/test/test_pubsub.c b/src/test/test_pubsub.c
new file mode 100644
index 0000000000..547d6c6b32
--- /dev/null
+++ b/src/test/test_pubsub.c
@@ -0,0 +1,85 @@
+/* Copyright (c) 2016, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file test_pubsub.c
+ * \brief Unit tests for publish-subscribe abstraction.
+ **/
+
+#include "or.h"
+#include "test.h"
+#include "pubsub.h"
+
+DECLARE_PUBSUB_STRUCT_TYPES(foobar)
+DECLARE_PUBSUB_TOPIC(foobar)
+DECLARE_NOTIFY_PUBSUB_TOPIC(static, foobar)
+IMPLEMENT_PUBSUB_TOPIC(static, foobar)
+
+struct foobar_event_data_t {
+ unsigned u;
+ const char *s;
+};
+
+struct foobar_subscriber_data_t {
+ const char *name;
+ long l;
+};
+
+static int
+foobar_sub1(foobar_event_data_t *ev, foobar_subscriber_data_t *mine)
+{
+ ev->u += 10;
+ mine->l += 100;
+ return 0;
+}
+
+static int
+foobar_sub2(foobar_event_data_t *ev, foobar_subscriber_data_t *mine)
+{
+ ev->u += 5;
+ mine->l += 50;
+ return 0;
+}
+
+static void
+test_pubsub_basic(void *arg)
+{
+ (void)arg;
+ foobar_subscriber_data_t subdata1 = { "hi", 0 };
+ foobar_subscriber_data_t subdata2 = { "wow", 0 };
+ const foobar_subscriber_t *sub1;
+ const foobar_subscriber_t *sub2;
+ foobar_event_data_t ed = { 0, "x" };
+ foobar_event_data_t ed2 = { 0, "y" };
+ sub1 = foobar_subscribe(foobar_sub1, &subdata1, SUBSCRIBE_ATSTART, 100);
+ tt_assert(sub1);
+
+ foobar_notify(&ed, 0);
+ tt_int_op(subdata1.l, OP_EQ, 100);
+ tt_int_op(subdata2.l, OP_EQ, 0);
+ tt_int_op(ed.u, OP_EQ, 10);
+
+ sub2 = foobar_subscribe(foobar_sub2, &subdata2, 0, 5);
+ tt_assert(sub2);
+
+ foobar_notify(&ed2, 0);
+ tt_int_op(subdata1.l, OP_EQ, 200);
+ tt_int_op(subdata2.l, OP_EQ, 50);
+ tt_int_op(ed2.u, OP_EQ, 15);
+
+ foobar_unsubscribe(sub1);
+
+ foobar_notify(&ed, 0);
+ tt_int_op(subdata1.l, OP_EQ, 200);
+ tt_int_op(subdata2.l, OP_EQ, 100);
+ tt_int_op(ed.u, OP_EQ, 15);
+
+ done:
+ foobar_clear();
+}
+
+struct testcase_t pubsub_tests[] = {
+ { "pubsub_basic", test_pubsub_basic, TT_FORK, NULL, NULL },
+ END_OF_TESTCASES
+};
+