diff options
Diffstat (limited to 'src/lib/dispatch')
-rw-r--r-- | src/lib/dispatch/.may_include | 11 | ||||
-rw-r--r-- | src/lib/dispatch/dispatch.h | 114 | ||||
-rw-r--r-- | src/lib/dispatch/dispatch_cfg.c | 141 | ||||
-rw-r--r-- | src/lib/dispatch/dispatch_cfg.h | 50 | ||||
-rw-r--r-- | src/lib/dispatch/dispatch_cfg_st.h | 33 | ||||
-rw-r--r-- | src/lib/dispatch/dispatch_core.c | 260 | ||||
-rw-r--r-- | src/lib/dispatch/dispatch_naming.c | 70 | ||||
-rw-r--r-- | src/lib/dispatch/dispatch_naming.h | 51 | ||||
-rw-r--r-- | src/lib/dispatch/dispatch_new.c | 176 | ||||
-rw-r--r-- | src/lib/dispatch/dispatch_st.h | 108 | ||||
-rw-r--r-- | src/lib/dispatch/include.am | 27 | ||||
-rw-r--r-- | src/lib/dispatch/lib_dispatch.md | 14 | ||||
-rw-r--r-- | src/lib/dispatch/msgtypes.h | 80 |
13 files changed, 1135 insertions, 0 deletions
diff --git a/src/lib/dispatch/.may_include b/src/lib/dispatch/.may_include new file mode 100644 index 0000000000..884f4c0dbc --- /dev/null +++ b/src/lib/dispatch/.may_include @@ -0,0 +1,11 @@ +orconfig.h + +ext/tor_queue.h + +lib/cc/*.h +lib/container/*.h +lib/dispatch/*.h +lib/intmath/*.h +lib/log/*.h +lib/malloc/*.h +lib/testsupport/*.h
\ No newline at end of file diff --git a/src/lib/dispatch/dispatch.h b/src/lib/dispatch/dispatch.h new file mode 100644 index 0000000000..9c7c4833c2 --- /dev/null +++ b/src/lib/dispatch/dispatch.h @@ -0,0 +1,114 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2020, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_DISPATCH_H +#define TOR_DISPATCH_H + +#include "lib/dispatch/msgtypes.h" + +/** + * \file dispatch.h + * \brief Low-level APIs for message-passing system. + * + * This module implements message dispatch based on a set of short integer + * identifiers. For a higher-level interface, see pubsub.h. + * + * Each message is represented as a generic msg_t object, and is discriminated + * by its message_id_t. Messages are delivered by a dispatch_t object, which + * delivers each message to its recipients by a configured "channel". + * + * A "channel" is a means of delivering messages. Every message_id_t must + * be associated with exactly one channel, identified by channel_id_t. + * When a channel receives messages, a callback is invoked to either process + * the messages immediately, or to cause them to be processed later. + * + * Every message_id_t has zero or more associated receiver functions set up in + * the dispatch_t object. Once the dispatch_t object is created, receivers + * can be enabled or disabled [TODO], but not added or removed. + * + * Every message_id_t has an associated datatype, identified by a + * msg_type_id_t. These datatypes can be associated with functions to + * (for example) free them, or format them for debugging. + * + * To setup a dispatch_t object, first create a dispatch_cfg_t object, and + * configure messages with their types, channels, and receivers. Then, use + * dispatch_new() with that dispatch_cfg_t to create the dispatch_t object. + * + * (We use a two-phase contruction procedure here to enable better static + * reasoning about publish/subscribe relationships.) + * + * Once you have a dispatch_t, you can queue messages on it with + * dispatch_send*(), and cause those messages to be delivered with + * dispatch_flush(). + **/ + +/** + * A "dispatcher" is the highest-level object; it handles making sure that + * messages are received and delivered properly. Only the mainloop + * should handle this type directly. + */ +typedef struct dispatch_t dispatch_t; + +struct dispatch_cfg_t; + +dispatch_t *dispatch_new(const struct dispatch_cfg_t *cfg); + +/** + * Free a dispatcher. Tor does this at exit. + */ +#define dispatch_free(d) \ + FREE_AND_NULL(dispatch_t, dispatch_free_, (d)) + +void dispatch_free_(dispatch_t *); + +int dispatch_send(dispatch_t *d, + subsys_id_t sender, + channel_id_t channel, + message_id_t msg, + msg_type_id_t type, + msg_aux_data_t auxdata); + +int dispatch_send_msg(dispatch_t *d, msg_t *m); + +int dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m); + +/* Flush up to <b>max_msgs</b> currently pending messages from the + * dispatcher. Messages that are not pending when this function are + * called, are not flushed by this call. Return 0 on success, -1 on + * unrecoverable error. + */ +int dispatch_flush(dispatch_t *, channel_id_t chan, int max_msgs); + +/** + * Function callback type used to alert some other module when a channel's + * queue changes from empty to nonempty. + * + * Ex 1: To cause messages to be processed immediately on-stack, this callback + * should invoke dispatch_flush() directly. + * + * Ex 2: To cause messages to be processed very soon, from the event queue, + * this callback should schedule an event callback to run dispatch_flush(). + * + * Ex 3: To cause messages to be processed periodically, this function should + * do nothing, and a periodic event should invoke dispatch_flush(). + **/ +typedef void (*dispatch_alertfn_t)(struct dispatch_t *, + channel_id_t, void *); + +int dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan, + dispatch_alertfn_t fn, void *userdata); + +#define dispatch_free_msg(d,msg) \ + STMT_BEGIN { \ + msg_t **msg_tmp_ptr__ = &(msg); \ + dispatch_free_msg_((d), *msg_tmp_ptr__); \ + *msg_tmp_ptr__= NULL; \ + } STMT_END +void dispatch_free_msg_(const dispatch_t *d, msg_t *msg); + +char *dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg); + +#endif /* !defined(TOR_DISPATCH_H) */ diff --git a/src/lib/dispatch/dispatch_cfg.c b/src/lib/dispatch/dispatch_cfg.c new file mode 100644 index 0000000000..a54188dcaa --- /dev/null +++ b/src/lib/dispatch/dispatch_cfg.c @@ -0,0 +1,141 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2020, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file dispatch_cfg.c + * \brief Create and configure a dispatch_cfg_t. + * + * A dispatch_cfg_t object is used to configure a set of messages and + * associated information before creating a dispatch_t. + */ + +#define DISPATCH_PRIVATE + +#include "orconfig.h" +#include "lib/dispatch/dispatch_cfg.h" +#include "lib/dispatch/dispatch_cfg_st.h" +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_st.h" + +#include "lib/container/smartlist.h" +#include "lib/malloc/malloc.h" + +/** + * Create and return a new dispatch_cfg_t. + **/ +dispatch_cfg_t * +dcfg_new(void) +{ + dispatch_cfg_t *cfg = tor_malloc(sizeof(dispatch_cfg_t)); + cfg->type_by_msg = smartlist_new(); + cfg->chan_by_msg = smartlist_new(); + cfg->fns_by_type = smartlist_new(); + cfg->recv_by_msg = smartlist_new(); + return cfg; +} + +/** + * Associate a message with a datatype. Return 0 on success, -1 if a + * different type was previously associated with the message ID. + **/ +int +dcfg_msg_set_type(dispatch_cfg_t *cfg, message_id_t msg, + msg_type_id_t type) +{ + smartlist_grow(cfg->type_by_msg, msg+1); + msg_type_id_t *oldval = smartlist_get(cfg->type_by_msg, msg); + if (oldval != NULL && *oldval != type) { + return -1; + } + if (!oldval) + smartlist_set(cfg->type_by_msg, msg, tor_memdup(&type, sizeof(type))); + return 0; +} + +/** + * Associate a message with a channel. Return 0 on success, -1 if a + * different channel was previously associated with the message ID. + **/ +int +dcfg_msg_set_chan(dispatch_cfg_t *cfg, message_id_t msg, + channel_id_t chan) +{ + smartlist_grow(cfg->chan_by_msg, msg+1); + channel_id_t *oldval = smartlist_get(cfg->chan_by_msg, msg); + if (oldval != NULL && *oldval != chan) { + return -1; + } + if (!oldval) + smartlist_set(cfg->chan_by_msg, msg, tor_memdup(&chan, sizeof(chan))); + return 0; +} + +/** + * Associate a set of functions with a datatype. Return 0 on success, -1 if + * different functions were previously associated with the type. + **/ +int +dcfg_type_set_fns(dispatch_cfg_t *cfg, msg_type_id_t type, + const dispatch_typefns_t *fns) +{ + smartlist_grow(cfg->fns_by_type, type+1); + dispatch_typefns_t *oldfns = smartlist_get(cfg->fns_by_type, type); + if (oldfns && (oldfns->free_fn != fns->free_fn || + oldfns->fmt_fn != fns->fmt_fn)) + return -1; + if (!oldfns) + smartlist_set(cfg->fns_by_type, type, tor_memdup(fns, sizeof(*fns))); + return 0; +} + +/** + * Associate a receiver with a message ID. Multiple receivers may be + * associated with a single messasge ID. + * + * Return 0 on success, on failure. + **/ +int +dcfg_add_recv(dispatch_cfg_t *cfg, message_id_t msg, + subsys_id_t sys, recv_fn_t fn) +{ + smartlist_grow(cfg->recv_by_msg, msg+1); + smartlist_t *receivers = smartlist_get(cfg->recv_by_msg, msg); + if (!receivers) { + receivers = smartlist_new(); + smartlist_set(cfg->recv_by_msg, msg, receivers); + } + + dispatch_rcv_t *rcv = tor_malloc(sizeof(dispatch_rcv_t)); + rcv->sys = sys; + rcv->enabled = true; + rcv->fn = fn; + smartlist_add(receivers, (void*)rcv); + return 0; +} + +/** Helper: release all storage held by <b>cfg</b>. */ +void +dcfg_free_(dispatch_cfg_t *cfg) +{ + if (!cfg) + return; + + SMARTLIST_FOREACH(cfg->type_by_msg, msg_type_id_t *, id, tor_free(id)); + SMARTLIST_FOREACH(cfg->chan_by_msg, channel_id_t *, id, tor_free(id)); + SMARTLIST_FOREACH(cfg->fns_by_type, dispatch_typefns_t *, f, tor_free(f)); + smartlist_free(cfg->type_by_msg); + smartlist_free(cfg->chan_by_msg); + smartlist_free(cfg->fns_by_type); + SMARTLIST_FOREACH_BEGIN(cfg->recv_by_msg, smartlist_t *, receivers) { + if (!receivers) + continue; + SMARTLIST_FOREACH(receivers, dispatch_rcv_t *, rcv, tor_free(rcv)); + smartlist_free(receivers); + } SMARTLIST_FOREACH_END(receivers); + smartlist_free(cfg->recv_by_msg); + + tor_free(cfg); +} diff --git a/src/lib/dispatch/dispatch_cfg.h b/src/lib/dispatch/dispatch_cfg.h new file mode 100644 index 0000000000..a4f1948eac --- /dev/null +++ b/src/lib/dispatch/dispatch_cfg.h @@ -0,0 +1,50 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2020, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_DISPATCH_CFG_H +#define TOR_DISPATCH_CFG_H + +/** + * @file dispatch_cfg.h + * @brief Header for distpach_cfg.c + **/ + +#include "lib/dispatch/msgtypes.h" +#include "lib/testsupport/testsupport.h" + +/** + * A "dispatch_cfg" is the configuration used to set up a dispatcher. + * It is created and accessed with a set of dcfg_* functions, and then + * used with dispatcher_new() to make the dispatcher. + */ +typedef struct dispatch_cfg_t dispatch_cfg_t; + +dispatch_cfg_t *dcfg_new(void); + +int dcfg_msg_set_type(dispatch_cfg_t *cfg, message_id_t msg, + msg_type_id_t type); + +int dcfg_msg_set_chan(dispatch_cfg_t *cfg, message_id_t msg, + channel_id_t chan); + +int dcfg_type_set_fns(dispatch_cfg_t *cfg, msg_type_id_t type, + const dispatch_typefns_t *fns); + +int dcfg_add_recv(dispatch_cfg_t *cfg, message_id_t msg, + subsys_id_t sys, recv_fn_t fn); + +/** Free a dispatch_cfg_t. */ +#define dcfg_free(cfg) \ + FREE_AND_NULL(dispatch_cfg_t, dcfg_free_, (cfg)) + +void dcfg_free_(dispatch_cfg_t *cfg); + +#ifdef DISPATCH_NEW_PRIVATE +struct smartlist_t; +STATIC int max_in_u16_sl(const struct smartlist_t *sl, int dflt); +#endif + +#endif /* !defined(TOR_DISPATCH_CFG_H) */ diff --git a/src/lib/dispatch/dispatch_cfg_st.h b/src/lib/dispatch/dispatch_cfg_st.h new file mode 100644 index 0000000000..3c99adf2f7 --- /dev/null +++ b/src/lib/dispatch/dispatch_cfg_st.h @@ -0,0 +1,33 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2020, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file dispatch_cfg_st.h + * @brief Declarations for dispatch-configuration types. + **/ + +#ifndef TOR_DISPATCH_CFG_ST_H +#define TOR_DISPATCH_CFG_ST_H + +struct smartlist_t; + +/** Information needed to create a dispatcher, but in a less efficient, more + * mutable format. + * + * Nearly everybody should use the \refdir{lib/pubsub} module to configure + * dispatchers, instead of using this. */ +struct dispatch_cfg_t { + /** A list of msg_type_id_t (cast to void*), indexed by msg_t. */ + struct smartlist_t *type_by_msg; + /** A list of channel_id_t (cast to void*), indexed by msg_t. */ + struct smartlist_t *chan_by_msg; + /** A list of dispatch_rcv_t, indexed by msg_type_id_t. */ + struct smartlist_t *fns_by_type; + /** A list of dispatch_typefns_t, indexed by msg_t. */ + struct smartlist_t *recv_by_msg; +}; + +#endif /* !defined(TOR_DISPATCH_CFG_ST_H) */ diff --git a/src/lib/dispatch/dispatch_core.c b/src/lib/dispatch/dispatch_core.c new file mode 100644 index 0000000000..3d51c876a7 --- /dev/null +++ b/src/lib/dispatch/dispatch_core.c @@ -0,0 +1,260 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2020, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file dispatch_core.c + * \brief Core module for sending and receiving messages. + */ + +#define DISPATCH_PRIVATE +#include "orconfig.h" + +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_st.h" +#include "lib/dispatch/dispatch_naming.h" + +#include "lib/malloc/malloc.h" +#include "lib/log/util_bug.h" + +#include <string.h> + +/** + * Use <b>d</b> to drop all storage held for <b>msg</b>. + * + * (We need the dispatcher so we know how to free the auxiliary data.) + **/ +void +dispatch_free_msg_(const dispatch_t *d, msg_t *msg) +{ + if (!msg) + return; + + d->typefns[msg->type].free_fn(msg->aux_data__); + tor_free(msg); +} + +/** + * Format the auxiliary data held by msg. + **/ +char * +dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg) +{ + if (!msg) + return NULL; + + return d->typefns[msg->type].fmt_fn(msg->aux_data__); +} + +/** + * Release all storage held by <b>d</b>. + **/ +void +dispatch_free_(dispatch_t *d) +{ + if (d == NULL) + return; + + size_t n_queues = d->n_queues; + for (size_t i = 0; i < n_queues; ++i) { + msg_t *m, *mtmp; + TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) { + dispatch_free_msg(d, m); + } + } + + size_t n_msgs = d->n_msgs; + + for (size_t i = 0; i < n_msgs; ++i) { + tor_free(d->table[i]); + } + tor_free(d->table); + tor_free(d->typefns); + tor_free(d->queues); + + // This is the only time we will treat d->cfg as non-const. + //dispatch_cfg_free_((dispatch_items_t *) d->cfg); + + tor_free(d); +} + +/** + * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever + * <b>chan</b> becomes nonempty. Return 0 on success, -1 on error. + **/ +int +dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan, + dispatch_alertfn_t fn, void *userdata) +{ + if (BUG(chan >= d->n_queues)) + return -1; + + dqueue_t *q = &d->queues[chan]; + q->alert_fn = fn; + q->alert_fn_arg = userdata; + return 0; +} + +/** + * Send a message on the appropriate channel notifying that channel if + * necessary. + * + * This function takes ownership of the auxiliary data; it can't be static or + * stack-allocated, and the caller is not allowed to use it afterwards. + * + * This function does not check the various vields of the message object for + * consistency. + **/ +int +dispatch_send(dispatch_t *d, + subsys_id_t sender, + channel_id_t channel, + message_id_t msg, + msg_type_id_t type, + msg_aux_data_t auxdata) +{ + if (!d->table[msg]) { + /* Fast path: nobody wants this data. */ + + d->typefns[type].free_fn(auxdata); + return 0; + } + + msg_t *m = tor_malloc(sizeof(msg_t)); + + m->sender = sender; + m->channel = channel; + m->msg = msg; + m->type = type; + memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t)); + + return dispatch_send_msg(d, m); +} + +int +dispatch_send_msg(dispatch_t *d, msg_t *m) +{ + if (BUG(!d)) + goto err; + if (BUG(!m)) + goto err; + if (BUG(m->channel >= d->n_queues)) + goto err; + if (BUG(m->msg >= d->n_msgs)) + goto err; + + dtbl_entry_t *ent = d->table[m->msg]; + if (ent) { + if (BUG(m->type != ent->type)) + goto err; + if (BUG(m->channel != ent->channel)) + goto err; + } + + return dispatch_send_msg_unchecked(d, m); + err: + /* Probably it isn't safe to free m, since type could be wrong. */ + return -1; +} + +/** + * Send a message on the appropriate queue, notifying that queue if necessary. + * + * This function takes ownership of the message object and its auxiliary data; + * it can't be static or stack-allocated, and the caller isn't allowed to use + * it afterwards. + * + * This function does not check the various fields of the message object for + * consistency, and can crash if they are out of range. Only functions that + * have already constructed the message in a safe way, or checked it for + * correctness themselves, should call this function. + **/ +int +dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m) +{ + /* Find the right queue. */ + dqueue_t *q = &d->queues[m->channel]; + bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue); + + /* Append the message. */ + TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next); + + if (debug_logging_enabled()) { + char *arg = dispatch_fmt_msg_data(d, m); + log_debug(LD_MESG, + "Queued: %s (%s) from %s, on %s.", + get_message_id_name(m->msg), + arg, + get_subsys_id_name(m->sender), + get_channel_id_name(m->channel)); + tor_free(arg); + } + + /* If we just made the queue nonempty for the first time, call the alert + * function. */ + if (was_empty) { + q->alert_fn(d, m->channel, q->alert_fn_arg); + } + + return 0; +} + +/** + * Run all of the callbacks on <b>d</b> associated with <b>m</b>. + **/ +static void +dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m) +{ + tor_assert(m->msg <= d->n_msgs); + dtbl_entry_t *ent = d->table[m->msg]; + int n_fns = ent->n_fns; + + if (debug_logging_enabled()) { + char *arg = dispatch_fmt_msg_data(d, m); + log_debug(LD_MESG, + "Delivering: %s (%s) from %s, on %s:", + get_message_id_name(m->msg), + arg, + get_subsys_id_name(m->sender), + get_channel_id_name(m->channel)); + tor_free(arg); + } + + int i; + for (i=0; i < n_fns; ++i) { + if (ent->rcv[i].enabled) { + log_debug(LD_MESG, " Delivering to %s.", + get_subsys_id_name(ent->rcv[i].sys)); + ent->rcv[i].fn(m); + } + } +} + +/** + * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b> + * on the given dispatcher. Return 0 on success or recoverable failure, + * -1 on unrecoverable error. + **/ +int +dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs) +{ + if (BUG(ch >= d->n_queues)) + return 0; + + int n_flushed = 0; + dqueue_t *q = &d->queues[ch]; + + while (n_flushed < max_msgs) { + msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue); + if (!m) + break; + TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next); + dispatcher_run_msg_cbs(d, m); + dispatch_free_msg(d, m); + ++n_flushed; + } + + return 0; +} diff --git a/src/lib/dispatch/dispatch_naming.c b/src/lib/dispatch/dispatch_naming.c new file mode 100644 index 0000000000..bb49343712 --- /dev/null +++ b/src/lib/dispatch/dispatch_naming.c @@ -0,0 +1,70 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2020, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file dispatch_naming.c + * @brief Name-to-ID maps for our message dispatch system. + **/ + +#include "orconfig.h" + +#include "lib/cc/compat_compiler.h" + +#include "lib/dispatch/dispatch_naming.h" +#include "lib/dispatch/msgtypes.h" + +#include "lib/container/namemap.h" +#include "lib/container/namemap_st.h" + +#include "lib/log/util_bug.h" +#include "lib/log/log.h" + +#include <stdlib.h> + +/** Global namemap for message IDs. */ +static namemap_t message_id_map = NAMEMAP_INIT(); +/** Global namemap for subsystem IDs. */ +static namemap_t subsys_id_map = NAMEMAP_INIT(); +/** Global namemap for channel IDs. */ +static namemap_t channel_id_map = NAMEMAP_INIT(); +/** Global namemap for message type IDs. */ +static namemap_t msg_type_id_map = NAMEMAP_INIT(); + +void +dispatch_naming_init(void) +{ +} + +#ifndef COCCI +/* Helper macro: declare functions to map IDs to and from names for a given + * type in a namemap_t. + */ +#define DECLARE_ID_MAP_FNS(type) \ + type##_id_t \ + get_##type##_id(const char *name) \ + { \ + unsigned u = namemap_get_or_create_id(&type##_id_map, name); \ + tor_assert(u != NAMEMAP_ERR); \ + tor_assert(u != ERROR_ID); \ + return (type##_id_t) u; \ + } \ + const char * \ + get_##type##_id_name(type##_id_t id) \ + { \ + return namemap_fmt_name(&type##_id_map, id); \ + } \ + size_t \ + get_num_##type##_ids(void) \ + { \ + return namemap_get_size(&type##_id_map); \ + } \ + EAT_SEMICOLON +#endif /* !defined(COCCI) */ + +DECLARE_ID_MAP_FNS(message); +DECLARE_ID_MAP_FNS(channel); +DECLARE_ID_MAP_FNS(subsys); +DECLARE_ID_MAP_FNS(msg_type); diff --git a/src/lib/dispatch/dispatch_naming.h b/src/lib/dispatch/dispatch_naming.h new file mode 100644 index 0000000000..72206d3ed5 --- /dev/null +++ b/src/lib/dispatch/dispatch_naming.h @@ -0,0 +1,51 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2020, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file dispatch_naming.h + * @brief Header for dispatch_naming.c + **/ + +#ifndef TOR_DISPATCH_NAMING_H +#define TOR_DISPATCH_NAMING_H + +#include "lib/dispatch/msgtypes.h" +#include <stddef.h> + +/** + * Return an existing channel ID by name, allocating the channel ID if + * if necessary. Returns ERROR_ID if we have run out of + * channels + */ +channel_id_t get_channel_id(const char *); +/** + * Return the name corresponding to a given channel ID. + **/ +const char *get_channel_id_name(channel_id_t); +/** + * Return the total number of _named_ channel IDs. + **/ +size_t get_num_channel_ids(void); + +/* As above, but for messages. */ +message_id_t get_message_id(const char *); +const char *get_message_id_name(message_id_t); +size_t get_num_message_ids(void); + +/* As above, but for subsystems */ +subsys_id_t get_subsys_id(const char *); +const char *get_subsys_id_name(subsys_id_t); +size_t get_num_subsys_ids(void); + +/* As above, but for types. Note that types additionally must be + * "defined", if any message is to use them. */ +msg_type_id_t get_msg_type_id(const char *); +const char *get_msg_type_id_name(msg_type_id_t); +size_t get_num_msg_type_ids(void); + +void dispatch_naming_init(void); + +#endif /* !defined(TOR_DISPATCH_NAMING_H) */ diff --git a/src/lib/dispatch/dispatch_new.c b/src/lib/dispatch/dispatch_new.c new file mode 100644 index 0000000000..e1dbb1c4b8 --- /dev/null +++ b/src/lib/dispatch/dispatch_new.c @@ -0,0 +1,176 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2020, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file dispatch_new.c + * \brief Code to construct a dispatch_t from a dispatch_cfg_t. + **/ + +#define DISPATCH_NEW_PRIVATE +#define DISPATCH_PRIVATE +#include "orconfig.h" + +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_st.h" +#include "lib/dispatch/dispatch_cfg.h" +#include "lib/dispatch/dispatch_cfg_st.h" + +#include "lib/cc/ctassert.h" +#include "lib/intmath/cmp.h" +#include "lib/malloc/malloc.h" +#include "lib/log/util_bug.h" + +#include <string.h> + +/** Given a smartlist full of (possibly NULL) pointers to uint16_t values, + * return the largest value, or dflt if the list is empty. */ +STATIC int +max_in_u16_sl(const smartlist_t *sl, int dflt) +{ + uint16_t *maxptr = NULL; + SMARTLIST_FOREACH_BEGIN(sl, uint16_t *, u) { + if (!maxptr) + maxptr = u; + else if (u && *u > *maxptr) + maxptr = u; + } SMARTLIST_FOREACH_END(u); + + return maxptr ? *maxptr : dflt; +} + +/* The above function is only safe to call if we are sure that channel_id_t + * and msg_type_id_t are really uint16_t. They should be so defined in + * msgtypes.h, but let's be extra cautious. + */ +CTASSERT(sizeof(uint16_t) == sizeof(msg_type_id_t)); +CTASSERT(sizeof(uint16_t) == sizeof(channel_id_t)); + +/** Helper: Format an unformattable message auxiliary data item: just return a +* copy of the string <>. */ +static char * +type_fmt_nop(msg_aux_data_t arg) +{ + (void)arg; + return tor_strdup("<>"); +} + +/** Helper: Free an unfreeable message auxiliary data item: do nothing. */ +static void +type_free_nop(msg_aux_data_t arg) +{ + (void)arg; +} + +/** Type functions to use when no type functions are provided. */ +static dispatch_typefns_t nop_typefns = { + .free_fn = type_free_nop, + .fmt_fn = type_fmt_nop +}; + +/** + * Alert function to use when none is configured: do nothing. + **/ +static void +alert_fn_nop(dispatch_t *d, channel_id_t ch, void *arg) +{ + (void)d; + (void)ch; + (void)arg; +} + +/** + * Given a list of recvfn_t, create and return a new dtbl_entry_t mapping + * to each of those functions. + **/ +static dtbl_entry_t * +dtbl_entry_from_lst(smartlist_t *receivers) +{ + if (!receivers) + return NULL; + + size_t n_recv = smartlist_len(receivers); + dtbl_entry_t *ent; + ent = tor_malloc_zero(offsetof(dtbl_entry_t, rcv) + + sizeof(dispatch_rcv_t) * n_recv); + + ent->n_fns = n_recv; + + SMARTLIST_FOREACH_BEGIN(receivers, const dispatch_rcv_t *, rcv) { + memcpy(&ent->rcv[rcv_sl_idx], rcv, sizeof(*rcv)); + if (rcv->enabled) { + ++ent->n_enabled; + } + } SMARTLIST_FOREACH_END(rcv); + + return ent; +} + +/** Create and return a new dispatcher from a given dispatch_cfg_t. */ +dispatch_t * +dispatch_new(const dispatch_cfg_t *cfg) +{ + dispatch_t *d = tor_malloc_zero(sizeof(dispatch_t)); + + /* Any message that has a type or a receiver counts towards our messages */ + const size_t n_msgs = MAX(smartlist_len(cfg->type_by_msg), + smartlist_len(cfg->recv_by_msg)) + 1; + + /* Any channel that any message has counts towards the number of channels. */ + const size_t n_chans = (size_t) + MAX(1, max_in_u16_sl(cfg->chan_by_msg,0)) + 1; + + /* Any type that a message has, or that has functions, counts towards + * the number of types. */ + const size_t n_types = (size_t) MAX(max_in_u16_sl(cfg->type_by_msg,0), + smartlist_len(cfg->fns_by_type)) + 1; + + d->n_msgs = n_msgs; + d->n_queues = n_chans; + d->n_types = n_types; + + /* Initialize the array of type-functions. */ + d->typefns = tor_calloc(n_types, sizeof(dispatch_typefns_t)); + for (size_t i = 0; i < n_types; ++i) { + /* Default to no-op for everything... */ + memcpy(&d->typefns[i], &nop_typefns, sizeof(dispatch_typefns_t)); + } + SMARTLIST_FOREACH_BEGIN(cfg->fns_by_type, dispatch_typefns_t *, fns) { + /* Set the functions if they are provided. */ + if (fns) { + if (fns->free_fn) + d->typefns[fns_sl_idx].free_fn = fns->free_fn; + if (fns->fmt_fn) + d->typefns[fns_sl_idx].fmt_fn = fns->fmt_fn; + } + } SMARTLIST_FOREACH_END(fns); + + /* Initialize the message queues: one for each channel. */ + d->queues = tor_calloc(d->n_queues, sizeof(dqueue_t)); + for (size_t i = 0; i < d->n_queues; ++i) { + TOR_SIMPLEQ_INIT(&d->queues[i].queue); + d->queues[i].alert_fn = alert_fn_nop; + } + + /* Build the dispatch tables mapping message IDs to receivers. */ + d->table = tor_calloc(d->n_msgs, sizeof(dtbl_entry_t *)); + SMARTLIST_FOREACH_BEGIN(cfg->recv_by_msg, smartlist_t *, rcv) { + d->table[rcv_sl_idx] = dtbl_entry_from_lst(rcv); + } SMARTLIST_FOREACH_END(rcv); + + /* Fill in the empty entries in the dispatch tables: + * types and channels for each message. */ + SMARTLIST_FOREACH_BEGIN(cfg->type_by_msg, msg_type_id_t *, type) { + if (d->table[type_sl_idx]) + d->table[type_sl_idx]->type = *type; + } SMARTLIST_FOREACH_END(type); + + SMARTLIST_FOREACH_BEGIN(cfg->chan_by_msg, channel_id_t *, chan) { + if (d->table[chan_sl_idx]) + d->table[chan_sl_idx]->channel = *chan; + } SMARTLIST_FOREACH_END(chan); + + return d; +} diff --git a/src/lib/dispatch/dispatch_st.h b/src/lib/dispatch/dispatch_st.h new file mode 100644 index 0000000000..ad5b4efc40 --- /dev/null +++ b/src/lib/dispatch/dispatch_st.h @@ -0,0 +1,108 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2020, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file dispatch_st.h + * + * \brief private structures used for the dispatcher module + */ + +#ifndef TOR_DISPATCH_ST_H +#define TOR_DISPATCH_ST_H + +#ifdef DISPATCH_PRIVATE + +#include "lib/container/smartlist.h" + +/** + * Information about the recipient of a message. + **/ +typedef struct dispatch_rcv_t { + /** The subsystem receiving a message. */ + subsys_id_t sys; + /** True iff this recipient is enabled. */ + bool enabled; + /** The function that will handle the message. */ + recv_fn_t fn; +} dispatch_rcv_t; + +/** + * Information used by a dispatcher to handle and dispatch a single message + * ID. It maps that message ID to its type, channel, and list of receiver + * functions. + * + * This structure is used when the dispatcher is running. + **/ +typedef struct dtbl_entry_t { + /** The number of enabled non-stub subscribers for this message. + * + * Note that for now, this will be the same as <b>n_fns</b>, since there is + * no way to turn these subscribers on an off yet. */ + uint16_t n_enabled; + /** The channel that handles this message. */ + channel_id_t channel; + /** The associated C type for this message. */ + msg_type_id_t type; + /** + * The number of functions pointers for subscribers that receive this + * message, in rcv. */ + uint16_t n_fns; + /** + * The recipients for this message. + */ + dispatch_rcv_t rcv[FLEXIBLE_ARRAY_MEMBER]; +} dtbl_entry_t; + +/** + * A queue of messages for a given channel, used by a live dispatcher. + */ +typedef struct dqueue_t { + /** The queue of messages itself. */ + TOR_SIMPLEQ_HEAD( , msg_t) queue; + /** A function to be called when the queue becomes nonempty. */ + dispatch_alertfn_t alert_fn; + /** An argument for the alert_fn. */ + void *alert_fn_arg; +} dqueue_t ; + +/** + * A single dispatcher for cross-module messages. + */ +struct dispatch_t { + /** + * The length of <b>table</b>: the number of message IDs that this + * dispatcher can handle. + */ + size_t n_msgs; + /** + * The length of <b>queues</b>: the number of channels that this dispatcher + * has configured. + */ + size_t n_queues; + /** + * The length of <b>typefns</b>: the number of C type IDs that this + * dispatcher has configured. + */ + size_t n_types; + /** + * An array of message queues, indexed by channel ID. + */ + dqueue_t *queues; + /** + * An array of entries about how to handle particular message types, indexed + * by message ID. + */ + dtbl_entry_t **table; + /** + * An array of function tables for manipulating types, index by message + * type ID. + **/ + dispatch_typefns_t *typefns; +}; + +#endif /* defined(DISPATCH_PRIVATE) */ + +#endif /* !defined(TOR_DISPATCH_ST_H) */ diff --git a/src/lib/dispatch/include.am b/src/lib/dispatch/include.am new file mode 100644 index 0000000000..4a0e0dfd90 --- /dev/null +++ b/src/lib/dispatch/include.am @@ -0,0 +1,27 @@ + +noinst_LIBRARIES += src/lib/libtor-dispatch.a + +if UNITTESTS_ENABLED +noinst_LIBRARIES += src/lib/libtor-dispatch-testing.a +endif + +# ADD_C_FILE: INSERT SOURCES HERE. +src_lib_libtor_dispatch_a_SOURCES = \ + src/lib/dispatch/dispatch_cfg.c \ + src/lib/dispatch/dispatch_core.c \ + src/lib/dispatch/dispatch_naming.c \ + src/lib/dispatch/dispatch_new.c + +src_lib_libtor_dispatch_testing_a_SOURCES = \ + $(src_lib_libtor_dispatch_a_SOURCES) +src_lib_libtor_dispatch_testing_a_CPPFLAGS = $(AM_CPPFLAGS) $(TEST_CPPFLAGS) +src_lib_libtor_dispatch_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) + +# ADD_C_FILE: INSERT HEADERS HERE. +noinst_HEADERS += \ + src/lib/dispatch/dispatch.h \ + src/lib/dispatch/dispatch_cfg.h \ + src/lib/dispatch/dispatch_cfg_st.h \ + src/lib/dispatch/dispatch_naming.h \ + src/lib/dispatch/dispatch_st.h \ + src/lib/dispatch/msgtypes.h diff --git a/src/lib/dispatch/lib_dispatch.md b/src/lib/dispatch/lib_dispatch.md new file mode 100644 index 0000000000..153ca50080 --- /dev/null +++ b/src/lib/dispatch/lib_dispatch.md @@ -0,0 +1,14 @@ +@dir /lib/dispatch +@brief lib/dispatch: In-process message delivery. + +This module provides a general in-process "message dispatch" system in which +typed messages are sent on channels. The dispatch.h header has far more +information. + +It is used by by \refdir{lib/pubsub} to implement our general +inter-module publish/subscribe system. + +This is not a fancy multi-threaded many-to-many dispatcher as you may be used +to from more sophisticated architectures: this dispatcher is intended only +for use in improving Tor's architecture. + diff --git a/src/lib/dispatch/msgtypes.h b/src/lib/dispatch/msgtypes.h new file mode 100644 index 0000000000..01d969dcb5 --- /dev/null +++ b/src/lib/dispatch/msgtypes.h @@ -0,0 +1,80 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2020, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file msgtypes.h + * \brief Types used for messages in the dispatcher code. + **/ + +#ifndef TOR_DISPATCH_MSGTYPES_H +#define TOR_DISPATCH_MSGTYPES_H + +#include <stdint.h> + +#include "ext/tor_queue.h" + +/** + * These types are aliases for subsystems, channels, and message IDs. + **/ +typedef uint16_t subsys_id_t; +typedef uint16_t channel_id_t; +typedef uint16_t message_id_t; + +/** + * This identifies a C type that can be sent along with a message. + **/ +typedef uint16_t msg_type_id_t; + +/** + * An ID value returned for *_type_t when none exists. + */ +#define ERROR_ID 65535 + +/** + * Auxiliary (untyped) data sent along with a message. + * + * We define this as a union of a pointer and a u64, so that the integer + * types will have the same range across platforms. + **/ +typedef union { + void *ptr; + uint64_t u64; +} msg_aux_data_t; + +/** + * Structure of a received message. + **/ +typedef struct msg_t { + TOR_SIMPLEQ_ENTRY(msg_t) next; + subsys_id_t sender; + channel_id_t channel; + message_id_t msg; + /** We could omit this field, since it is implicit in the message type, but + * IMO let's leave it in for safety. */ + msg_type_id_t type; + /** Untyped auxiliary data. You shouldn't have to mess with this + * directly. */ + msg_aux_data_t aux_data__; +} msg_t; + +/** + * A function that a subscriber uses to receive a message. + **/ +typedef void (*recv_fn_t)(const msg_t *m); + +/** + * Table of functions to use for a given C type. Any omitted (NULL) functions + * will be treated as no-ops. + **/ +typedef struct dispatch_typefns_t { + /** Release storage held for the auxiliary data of this type. */ + void (*free_fn)(msg_aux_data_t); + /** Format and return a newly allocated string describing the contents + * of this data element. */ + char *(*fmt_fn)(msg_aux_data_t); +} dispatch_typefns_t; + +#endif /* !defined(TOR_DISPATCH_MSGTYPES_H) */ |