aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2019-01-15 10:27:39 -0500
committerNick Mathewson <nickm@torproject.org>2019-03-25 16:35:33 -0400
commitbdeaf7d4b2929609c4d3f2ce9adfd973361ef578 (patch)
tree2742b9f39f5ab9331a54f3dd6cb932679af122cb /src/core
parent24df14eb096e73438d6045ff3b2840499a9af9b5 (diff)
downloadtor-bdeaf7d4b2929609c4d3f2ce9adfd973361ef578.tar.gz
tor-bdeaf7d4b2929609c4d3f2ce9adfd973361ef578.zip
Code to manage publish/subscribe setup via subsystem interface.
This commit has the necessary logic to run the publish/subscribe system from the mainloop, and to initialize it on startup and tear it down later.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/include.am2
-rw-r--r--src/core/mainloop/mainloop_pubsub.c149
-rw-r--r--src/core/mainloop/mainloop_pubsub.h23
3 files changed, 174 insertions, 0 deletions
diff --git a/src/core/include.am b/src/core/include.am
index ae47c75e09..3a0e907edb 100644
--- a/src/core/include.am
+++ b/src/core/include.am
@@ -22,6 +22,7 @@ LIBTOR_APP_A_SOURCES = \
src/core/mainloop/connection.c \
src/core/mainloop/cpuworker.c \
src/core/mainloop/mainloop.c \
+ src/core/mainloop/mainloop_pubsub.c \
src/core/mainloop/netstatus.c \
src/core/mainloop/periodic.c \
src/core/or/address_set.c \
@@ -213,6 +214,7 @@ noinst_HEADERS += \
src/core/mainloop/connection.h \
src/core/mainloop/cpuworker.h \
src/core/mainloop/mainloop.h \
+ src/core/mainloop/mainloop_pubsub.h \
src/core/mainloop/netstatus.h \
src/core/mainloop/periodic.h \
src/core/or/addr_policy_st.h \
diff --git a/src/core/mainloop/mainloop_pubsub.c b/src/core/mainloop/mainloop_pubsub.c
new file mode 100644
index 0000000000..ab3614ae00
--- /dev/null
+++ b/src/core/mainloop/mainloop_pubsub.c
@@ -0,0 +1,149 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "orconfig.h"
+
+#include "src/core/or/or.h"
+#include "src/core/mainloop/mainloop.h"
+#include "src/core/mainloop/mainloop_pubsub.h"
+
+#include "lib/container/smartlist.h"
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/evloop/compat_libevent.h"
+#include "lib/pubsub/pubsub.h"
+#include "lib/pubsub/pubsub_build.h"
+
+/**
+ * Dispatcher to use for delivering messages.
+ **/
+static dispatch_t *the_dispatcher = NULL;
+static pubsub_items_t *the_pubsub_items = NULL;
+/**
+ * A list of mainloop_event_t, indexed by channel ID, to flush the messages
+ * on a channel.
+ **/
+static smartlist_t *alert_events = NULL;
+
+/**
+ * Mainloop event callback: flush all the messages in a channel.
+ *
+ * The channel is encoded as a pointer, and passed via arg.
+ **/
+static void
+flush_channel_event(mainloop_event_t *ev, void *arg)
+{
+ (void)ev;
+ if (!the_dispatcher)
+ return;
+
+ channel_id_t chan = (channel_id_t)(uintptr_t)(arg);
+ dispatch_flush(the_dispatcher, chan, INT_MAX);
+}
+
+int
+tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder)
+{
+ int rv = -1;
+ tor_mainloop_disconnect_pubsub();
+
+ the_dispatcher = pubsub_builder_finalize(builder, &the_pubsub_items);
+ if (! the_dispatcher)
+ goto err;
+
+ const size_t num_channels = get_num_channel_ids();
+ alert_events = smartlist_new();
+ for (size_t i = 0; i < num_channels; ++i) {
+ smartlist_add(alert_events,
+ mainloop_event_postloop_new(flush_channel_event,
+ (void*)(uintptr_t)(i)));
+ }
+
+ rv = 0;
+ err:
+ tor_mainloop_disconnect_pubsub();
+ return rv;
+}
+
+/**
+ * Dispatch alertfn callback: do nothing. Implements DELIV_NEVER.
+ **/
+static void
+alertfn_never(dispatch_t *d, channel_id_t chan, void *arg)
+{
+ (void)d;
+ (void)chan;
+ (void)arg;
+}
+
+/**
+ * Dispatch alertfn callback: activate a mainloop event. Implements
+ * DELIV_PROMPT.
+ **/
+static void
+alertfn_prompt(dispatch_t *d, channel_id_t chan, void *arg)
+{
+ (void)d;
+ (void)chan;
+ mainloop_event_t *event = arg;
+ mainloop_event_activate(event);
+}
+
+/**
+ * Dispatch alertfn callback: flush all messages right now. Implements
+ * DELIV_IMMEDIATE.
+ **/
+static void
+alertfn_immediate(dispatch_t *d, channel_id_t chan, void *arg)
+{
+ (void) arg;
+ dispatch_flush(d, chan, INT_MAX);
+}
+
+/**
+ * Set the strategy to be used for delivering messages on the named channel.
+ **/
+int
+tor_mainloop_set_delivery_strategy(const char *msg_channel_name,
+ deliv_strategy_t strategy)
+{
+ channel_id_t chan = get_channel_id(msg_channel_name);
+ if (BUG(chan == ERROR_ID) ||
+ BUG(chan >= smartlist_len(alert_events)))
+ return -1;
+
+ switch (strategy) {
+ case DELIV_NEVER:
+ dispatch_set_alert_fn(the_dispatcher, chan, alertfn_never, NULL);
+ break;
+ case DELIV_PROMPT:
+ dispatch_set_alert_fn(the_dispatcher, chan, alertfn_prompt,
+ smartlist_get(alert_events, chan));
+ break;
+ case DELIV_IMMEDIATE:
+ dispatch_set_alert_fn(the_dispatcher, chan, alertfn_immediate, NULL);
+ break;
+ }
+ return 0;
+}
+
+/**
+ * Remove all pubsub dispatchers and events from the mainloop.
+ **/
+void
+tor_mainloop_disconnect_pubsub(void)
+{
+ if (the_pubsub_items) {
+ pubsub_items_clear_bindings(the_pubsub_items);
+ pubsub_items_free(the_pubsub_items);
+ }
+ if (alert_events) {
+ SMARTLIST_FOREACH(alert_events, mainloop_event_t *, ev,
+ mainloop_event_free(ev));
+ smartlist_free(alert_events);
+ }
+ dispatch_free(the_dispatcher);
+}
diff --git a/src/core/mainloop/mainloop_pubsub.h b/src/core/mainloop/mainloop_pubsub.h
new file mode 100644
index 0000000000..6eff778420
--- /dev/null
+++ b/src/core/mainloop/mainloop_pubsub.h
@@ -0,0 +1,23 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_MAINLOOP_PUBSUB_H
+#define TOR_MAINLOOP_PUBSUB_H
+
+struct pubsub_builder_t;
+
+typedef enum {
+ DELIV_NEVER=0,
+ DELIV_PROMPT,
+ DELIV_IMMEDIATE,
+} deliv_strategy_t;
+
+int tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder);
+int tor_mainloop_set_delivery_strategy(const char *msg_channel_name,
+ deliv_strategy_t strategy);
+void tor_mainloop_disconnect_pubsub(void);
+
+#endif