diff options
author | George Kadianakis <desnacked@riseup.net> | 2019-03-27 14:30:53 +0200 |
---|---|---|
committer | George Kadianakis <desnacked@riseup.net> | 2019-03-27 14:30:53 +0200 |
commit | 00ca3d04cfa0625349a47d1a2f7c16ac66e7822d (patch) | |
tree | d9e9327f80c62cdf038711682edaaebf7778f0da | |
parent | 8991280f89be0cb157efcaec82880040fba4031b (diff) | |
parent | 7b9732063cf32c319d36246b314ff2cb1fbebb08 (diff) | |
download | tor-00ca3d04cfa0625349a47d1a2f7c16ac66e7822d.tar.gz tor-00ca3d04cfa0625349a47d1a2f7c16ac66e7822d.zip |
Merge branch 'tor-github/pr/859'
59 files changed, 5035 insertions, 162 deletions
diff --git a/.gitignore b/.gitignore index a40cda02dd..0865f981b1 100644 --- a/.gitignore +++ b/.gitignore @@ -168,6 +168,8 @@ uptime-*.json /src/lib/libtor-crypt-ops-testing.a /src/lib/libtor-ctime.a /src/lib/libtor-ctime-testing.a +/src/lib/libtor-dispatch.a +/src/lib/libtor-dispatch-testing.a /src/lib/libtor-encoding.a /src/lib/libtor-encoding-testing.a /src/lib/libtor-evloop.a @@ -200,6 +202,8 @@ uptime-*.json /src/lib/libtor-osinfo-testing.a /src/lib/libtor-process.a /src/lib/libtor-process-testing.a +/src/lib/libtor-pubsub.a +/src/lib/libtor-pubsub-testing.a /src/lib/libtor-sandbox.a /src/lib/libtor-sandbox-testing.a /src/lib/libtor-string.a diff --git a/Makefile.am b/Makefile.am index eea7c3a12f..673643cd31 100644 --- a/Makefile.am +++ b/Makefile.am @@ -41,6 +41,8 @@ TOR_UTIL_LIBS = \ src/lib/libtor-geoip.a \ src/lib/libtor-process.a \ src/lib/libtor-buf.a \ + src/lib/libtor-pubsub.a \ + src/lib/libtor-dispatch.a \ src/lib/libtor-time.a \ src/lib/libtor-fs.a \ src/lib/libtor-encoding.a \ @@ -72,6 +74,8 @@ TOR_UTIL_TESTING_LIBS = \ src/lib/libtor-geoip-testing.a \ src/lib/libtor-process-testing.a \ src/lib/libtor-buf-testing.a \ + src/lib/libtor-pubsub-testing.a \ + src/lib/libtor-dispatch-testing.a \ src/lib/libtor-time-testing.a \ src/lib/libtor-fs-testing.a \ src/lib/libtor-encoding-testing.a \ diff --git a/changes/pubsub b/changes/pubsub new file mode 100644 index 0000000000..f67b36b988 --- /dev/null +++ b/changes/pubsub @@ -0,0 +1,5 @@ + o Major features (code organization): + - Tor now includes a generic publish-subscribe message-passing subsystem + that we can use to organize intermodule dependencies. We hope to use + this to reduce dependencies between modules that don't need to be + related, and to generally simplify our codebase. Closes ticket 28226. diff --git a/doc/tor.1.txt b/doc/tor.1.txt index ac79ed231b..889b397e51 100644 --- a/doc/tor.1.txt +++ b/doc/tor.1.txt @@ -679,7 +679,7 @@ GENERAL OPTIONS The currently recognized domains are: general, crypto, net, config, fs, protocol, mm, http, app, control, circ, rend, bug, dir, dirserv, or, edge, acct, hist, handshake, heartbeat, channel, sched, guard, consdiff, dos, - process, pt, and btrack. + process, pt, btrack, and mesg. Domain names are case-insensitive. + + For example, "`Log [handshake]debug [~net,~mm]info notice stdout`" sends diff --git a/scripts/maint/practracker/exceptions.txt b/scripts/maint/practracker/exceptions.txt index 93d27e7642..cd56eba0e0 100644 --- a/scripts/maint/practracker/exceptions.txt +++ b/scripts/maint/practracker/exceptions.txt @@ -47,12 +47,13 @@ problem function-size /src/app/config/config.c:parse_ports() 170 problem function-size /src/app/config/config.c:getinfo_helper_config() 116 problem function-size /src/app/config/confparse.c:config_assign_value() 205 problem function-size /src/app/config/confparse.c:config_get_assigned_option() 129 -problem include-count /src/app/main/main.c 85 +problem include-count /src/app/main/main.c 67 problem function-size /src/app/main/main.c:dumpstats() 102 problem function-size /src/app/main/main.c:tor_init() 136 problem function-size /src/app/main/main.c:sandbox_init_filter() 291 problem function-size /src/app/main/main.c:run_tor_main_loop() 105 problem function-size /src/app/main/ntmain.c:nt_service_install() 125 +problem include-count /src/app/main/shutdown.c 52 problem file-size /src/core/mainloop/connection.c 5554 problem include-count /src/core/mainloop/connection.c 61 problem function-size /src/core/mainloop/connection.c:connection_free_minimal() 184 diff --git a/src/app/main/main.c b/src/app/main/main.c index 83eef0dc70..184b9e91da 100644 --- a/src/app/main/main.c +++ b/src/app/main/main.c @@ -15,68 +15,51 @@ #include "app/config/statefile.h" #include "app/main/main.h" #include "app/main/ntmain.h" +#include "app/main/shutdown.h" #include "app/main/subsysmgr.h" #include "core/mainloop/connection.h" #include "core/mainloop/cpuworker.h" #include "core/mainloop/mainloop.h" +#include "core/mainloop/mainloop_pubsub.h" #include "core/mainloop/netstatus.h" #include "core/or/channel.h" #include "core/or/channelpadding.h" #include "core/or/circuitpadding.h" -#include "core/or/channeltls.h" #include "core/or/circuitlist.h" -#include "core/or/circuitmux_ewma.h" #include "core/or/command.h" -#include "core/or/connection_edge.h" #include "core/or/connection_or.h" -#include "core/or/dos.h" -#include "core/or/policies.h" -#include "core/or/protover.h" #include "core/or/relay.h" -#include "core/or/scheduler.h" #include "core/or/status.h" -#include "core/or/versions.h" #include "feature/api/tor_api.h" #include "feature/api/tor_api_internal.h" #include "feature/client/addressmap.h" -#include "feature/client/bridges.h" -#include "feature/client/entrynodes.h" -#include "feature/client/transports.h" #include "feature/control/control.h" #include "feature/control/control_auth.h" #include "feature/control/control_events.h" -#include "feature/dirauth/bwauth.h" #include "feature/dirauth/keypin.h" #include "feature/dirauth/process_descs.h" #include "feature/dircache/consdiffmgr.h" -#include "feature/dircache/dirserv.h" #include "feature/dirparse/routerparse.h" #include "feature/hibernate/hibernate.h" -#include "feature/hs/hs_cache.h" #include "feature/nodelist/authcert.h" -#include "feature/nodelist/microdesc.h" #include "feature/nodelist/networkstatus.h" -#include "feature/nodelist/nodelist.h" #include "feature/nodelist/routerlist.h" #include "feature/relay/dns.h" #include "feature/relay/ext_orport.h" -#include "feature/relay/onion_queue.h" #include "feature/relay/routerkeys.h" #include "feature/relay/routermode.h" #include "feature/rend/rendcache.h" -#include "feature/rend/rendclient.h" #include "feature/rend/rendservice.h" -#include "feature/stats/geoip_stats.h" #include "feature/stats/predict_ports.h" #include "feature/stats/rephist.h" #include "lib/compress/compress.h" #include "lib/buf/buffers.h" #include "lib/crypt_ops/crypto_rand.h" #include "lib/crypt_ops/crypto_s2k.h" -#include "lib/geoip/geoip.h" #include "lib/net/resolve.h" #include "lib/process/waitpid.h" +#include "lib/pubsub/pubsub_build.h" #include "lib/meminfo/meminfo.h" #include "lib/osinfo/uname.h" @@ -92,7 +75,6 @@ #include <event2/event.h> -#include "feature/dirauth/dirvote.h" #include "feature/dirauth/authmode.h" #include "feature/dirauth/shared_random.h" @@ -113,8 +95,6 @@ #include <systemd/sd-daemon.h> #endif /* defined(HAVE_SYSTEMD) */ -void evdns_shutdown(int); - #ifdef HAVE_RUST // helper function defined in Rust to output a log message indicating if tor is // running with Rust enabled. See src/rust/tor_util @@ -744,86 +724,6 @@ release_lockfile(void) } } -/** Free all memory that we might have allocated somewhere. - * If <b>postfork</b>, we are a worker process and we want to free - * only the parts of memory that we won't touch. If !<b>postfork</b>, - * Tor is shutting down and we should free everything. - * - * Helps us find the real leaks with sanitizers and the like. Also valgrind - * should then report 0 reachable in its leak report (in an ideal world -- - * in practice libevent, SSL, libc etc never quite free everything). */ -void -tor_free_all(int postfork) -{ - if (!postfork) { - evdns_shutdown(1); - } - geoip_free_all(); - geoip_stats_free_all(); - dirvote_free_all(); - routerlist_free_all(); - networkstatus_free_all(); - addressmap_free_all(); - dirserv_free_fingerprint_list(); - dirserv_free_all(); - dirserv_clear_measured_bw_cache(); - rend_cache_free_all(); - rend_service_authorization_free_all(); - rep_hist_free_all(); - dns_free_all(); - clear_pending_onions(); - circuit_free_all(); - circpad_machines_free(); - entry_guards_free_all(); - pt_free_all(); - channel_tls_free_all(); - channel_free_all(); - connection_free_all(); - connection_edge_free_all(); - scheduler_free_all(); - nodelist_free_all(); - microdesc_free_all(); - routerparse_free_all(); - ext_orport_free_all(); - control_free_all(); - protover_free_all(); - bridges_free_all(); - consdiffmgr_free_all(); - hs_free_all(); - dos_free_all(); - circuitmux_ewma_free_all(); - accounting_free_all(); - protover_summary_cache_free_all(); - - if (!postfork) { - config_free_all(); - or_state_free_all(); - router_free_all(); - routerkeys_free_all(); - policies_free_all(); - } - if (!postfork) { -#ifndef _WIN32 - tor_getpwnam(NULL); -#endif - } - /* stuff in main.c */ - - tor_mainloop_free_all(); - - if (!postfork) { - release_lockfile(); - } - tor_libevent_free_all(); - - subsystems_shutdown(); - - /* Stuff in util.c and address.c*/ - if (!postfork) { - esc_router_info(NULL); - } -} - /** * Remove the specified file, and log a warning if the operation fails for * any reason other than the file not existing. Ignores NULL filenames. @@ -837,50 +737,6 @@ tor_remove_file(const char *filename) } } -/** Do whatever cleanup is necessary before shutting Tor down. */ -void -tor_cleanup(void) -{ - const or_options_t *options = get_options(); - if (options->command == CMD_RUN_TOR) { - time_t now = time(NULL); - /* Remove our pid file. We don't care if there was an error when we - * unlink, nothing we could do about it anyways. */ - tor_remove_file(options->PidFile); - /* Remove control port file */ - tor_remove_file(options->ControlPortWriteToFile); - /* Remove cookie authentication file */ - { - char *cookie_fname = get_controller_cookie_file_name(); - tor_remove_file(cookie_fname); - tor_free(cookie_fname); - } - /* Remove Extended ORPort cookie authentication file */ - { - char *cookie_fname = get_ext_or_auth_cookie_file_name(); - tor_remove_file(cookie_fname); - tor_free(cookie_fname); - } - if (accounting_is_enabled(options)) - accounting_record_bandwidth_usage(now, get_or_state()); - or_state_mark_dirty(get_or_state(), 0); /* force an immediate save. */ - or_state_save(now); - if (authdir_mode(options)) { - sr_save_and_cleanup(); - } - if (authdir_mode_tests_reachability(options)) - rep_hist_record_mtbf_data(now, 0); - keypin_close_journal(); - } - - timers_shutdown(); - - tor_free_all(0); /* We could move tor_free_all back into the ifdef below - later, if it makes shutdown unacceptably slow. But for - now, leave it here: it's helped us catch bugs in the - past. */ -} - /** Read/create keys as needed, and echo our fingerprint to stdout. */ static int do_list_fingerprint(void) @@ -1378,6 +1234,30 @@ run_tor_main_loop(void) return do_main_loop(); } +/** Install the publish/subscribe relationships for all the subsystems. */ +static void +pubsub_install(void) +{ + pubsub_builder_t *builder = pubsub_builder_new(); + int r = subsystems_add_pubsub(builder); + tor_assert(r == 0); + r = tor_mainloop_connect_pubsub(builder); // consumes builder + tor_assert(r == 0); +} + +/** Connect the mainloop to its publish/subscribe message delivery events if + * appropriate, and configure the global channels appropriately. */ +static void +pubsub_connect(void) +{ + if (get_options()->command == CMD_RUN_TOR) { + tor_mainloop_connect_pubsub_events(); + /* XXXX For each pubsub channel, its delivery strategy should be set at + * this XXXX point, using tor_mainloop_set_delivery_strategy(). + */ + } +} + /* Main entry point for the Tor process. Called from tor_main(), and by * anybody embedding Tor. */ int @@ -1409,6 +1289,9 @@ tor_run_main(const tor_main_configuration_t *tor_cfg) } } #endif /* defined(NT_SERVICE) */ + + pubsub_install(); + { int init_rv = tor_init(argc, argv); if (init_rv) { @@ -1418,6 +1301,8 @@ tor_run_main(const tor_main_configuration_t *tor_cfg) } } + pubsub_connect(); + if (get_options()->Sandbox && get_options()->command == CMD_RUN_TOR) { sandbox_cfg_t* cfg = sandbox_init_filter(); diff --git a/src/app/main/main.h b/src/app/main/main.h index bbbbf984fb..9dfaf4b8ef 100644 --- a/src/app/main/main.h +++ b/src/app/main/main.h @@ -21,9 +21,6 @@ void release_lockfile(void); void tor_remove_file(const char *filename); -void tor_cleanup(void); -void tor_free_all(int postfork); - int tor_init(int argc, char **argv); int run_tor_main_loop(void); diff --git a/src/app/main/ntmain.c b/src/app/main/ntmain.c index 05d203b0be..f00b712702 100644 --- a/src/app/main/ntmain.c +++ b/src/app/main/ntmain.c @@ -24,6 +24,7 @@ #include "app/config/config.h" #include "app/main/main.h" #include "app/main/ntmain.h" +#include "app/main/shutdown.h" #include "core/mainloop/mainloop.h" #include "lib/evloop/compat_libevent.h" #include "lib/fs/winlib.h" diff --git a/src/app/main/shutdown.c b/src/app/main/shutdown.c new file mode 100644 index 0000000000..314e33f228 --- /dev/null +++ b/src/app/main/shutdown.c @@ -0,0 +1,192 @@ +/* Copyright (c) 2001 Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file shutdown.c + * @brief Code to free global resources used by Tor. + * + * In the future, this should all be handled by the subsystem manager. */ + +#include "core/or/or.h" + +#include "app/config/config.h" +#include "app/config/statefile.h" +#include "app/main/main.h" +#include "app/main/shutdown.h" +#include "app/main/subsysmgr.h" +#include "core/mainloop/connection.h" +#include "core/mainloop/mainloop.h" +#include "core/mainloop/mainloop_pubsub.h" +#include "core/or/channeltls.h" +#include "core/or/circuitlist.h" +#include "core/or/circuitmux_ewma.h" +#include "core/or/circuitpadding.h" +#include "core/or/connection_edge.h" +#include "core/or/dos.h" +#include "core/or/policies.h" +#include "core/or/protover.h" +#include "core/or/scheduler.h" +#include "core/or/versions.h" +#include "feature/client/addressmap.h" +#include "feature/client/bridges.h" +#include "feature/client/entrynodes.h" +#include "feature/client/transports.h" +#include "feature/control/control.h" +#include "feature/control/control_auth.h" +#include "feature/dirauth/authmode.h" +#include "feature/dirauth/bwauth.h" +#include "feature/dirauth/dirvote.h" +#include "feature/dirauth/keypin.h" +#include "feature/dirauth/process_descs.h" +#include "feature/dirauth/shared_random.h" +#include "feature/dircache/consdiffmgr.h" +#include "feature/dircache/dirserv.h" +#include "feature/dirparse/routerparse.h" +#include "feature/hibernate/hibernate.h" +#include "feature/hs/hs_common.h" +#include "feature/nodelist/microdesc.h" +#include "feature/nodelist/networkstatus.h" +#include "feature/nodelist/nodelist.h" +#include "feature/nodelist/routerlist.h" +#include "feature/nodelist/routerlist.h" +#include "feature/relay/dns.h" +#include "feature/relay/ext_orport.h" +#include "feature/relay/onion_queue.h" +#include "feature/relay/routerkeys.h" +#include "feature/rend/rendcache.h" +#include "feature/rend/rendclient.h" +#include "feature/stats/geoip_stats.h" +#include "feature/stats/rephist.h" +#include "lib/evloop/compat_libevent.h" +#include "lib/geoip/geoip.h" +#include "src/feature/relay/router.h" + +void evdns_shutdown(int); + +/** Do whatever cleanup is necessary before shutting Tor down. */ +void +tor_cleanup(void) +{ + const or_options_t *options = get_options(); + if (options->command == CMD_RUN_TOR) { + time_t now = time(NULL); + /* Remove our pid file. We don't care if there was an error when we + * unlink, nothing we could do about it anyways. */ + tor_remove_file(options->PidFile); + /* Remove control port file */ + tor_remove_file(options->ControlPortWriteToFile); + /* Remove cookie authentication file */ + { + char *cookie_fname = get_controller_cookie_file_name(); + tor_remove_file(cookie_fname); + tor_free(cookie_fname); + } + /* Remove Extended ORPort cookie authentication file */ + { + char *cookie_fname = get_ext_or_auth_cookie_file_name(); + tor_remove_file(cookie_fname); + tor_free(cookie_fname); + } + if (accounting_is_enabled(options)) + accounting_record_bandwidth_usage(now, get_or_state()); + or_state_mark_dirty(get_or_state(), 0); /* force an immediate save. */ + or_state_save(now); + if (authdir_mode(options)) { + sr_save_and_cleanup(); + } + if (authdir_mode_tests_reachability(options)) + rep_hist_record_mtbf_data(now, 0); + keypin_close_journal(); + } + + timers_shutdown(); + + tor_free_all(0); /* We could move tor_free_all back into the ifdef below + later, if it makes shutdown unacceptably slow. But for + now, leave it here: it's helped us catch bugs in the + past. */ +} + +/** Free all memory that we might have allocated somewhere. + * If <b>postfork</b>, we are a worker process and we want to free + * only the parts of memory that we won't touch. If !<b>postfork</b>, + * Tor is shutting down and we should free everything. + * + * Helps us find the real leaks with sanitizers and the like. Also valgrind + * should then report 0 reachable in its leak report (in an ideal world -- + * in practice libevent, SSL, libc etc never quite free everything). */ +void +tor_free_all(int postfork) +{ + if (!postfork) { + evdns_shutdown(1); + } + geoip_free_all(); + geoip_stats_free_all(); + dirvote_free_all(); + routerlist_free_all(); + networkstatus_free_all(); + addressmap_free_all(); + dirserv_free_fingerprint_list(); + dirserv_free_all(); + dirserv_clear_measured_bw_cache(); + rend_cache_free_all(); + rend_service_authorization_free_all(); + rep_hist_free_all(); + dns_free_all(); + clear_pending_onions(); + circuit_free_all(); + circpad_machines_free(); + entry_guards_free_all(); + pt_free_all(); + channel_tls_free_all(); + channel_free_all(); + connection_free_all(); + connection_edge_free_all(); + scheduler_free_all(); + nodelist_free_all(); + microdesc_free_all(); + routerparse_free_all(); + ext_orport_free_all(); + control_free_all(); + protover_free_all(); + bridges_free_all(); + consdiffmgr_free_all(); + hs_free_all(); + dos_free_all(); + circuitmux_ewma_free_all(); + accounting_free_all(); + protover_summary_cache_free_all(); + + if (!postfork) { + config_free_all(); + or_state_free_all(); + router_free_all(); + routerkeys_free_all(); + policies_free_all(); + } + if (!postfork) { +#ifndef _WIN32 + tor_getpwnam(NULL); +#endif + } + /* stuff in main.c */ + + tor_mainloop_disconnect_pubsub(); + tor_mainloop_free_all(); + + if (!postfork) { + release_lockfile(); + } + tor_libevent_free_all(); + + subsystems_shutdown(); + + /* Stuff in util.c and address.c*/ + if (!postfork) { + esc_router_info(NULL); + } +} diff --git a/src/app/main/shutdown.h b/src/app/main/shutdown.h new file mode 100644 index 0000000000..1bca96a0aa --- /dev/null +++ b/src/app/main/shutdown.h @@ -0,0 +1,18 @@ +/* Copyright (c) 2001 Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file shutdown.h + * \brief Header file for shutdown.c. + **/ + +#ifndef TOR_SHUTDOWN_H +#define TOR_SHUTDOWN_H + +void tor_cleanup(void); +void tor_free_all(int postfork); + +#endif /* !defined(TOR_SHUTDOWN_H) */ diff --git a/src/app/main/subsysmgr.c b/src/app/main/subsysmgr.c index e0ca3ce4df..5aa4fd76c9 100644 --- a/src/app/main/subsysmgr.c +++ b/src/app/main/subsysmgr.c @@ -5,9 +5,14 @@ #include "orconfig.h" #include "app/main/subsysmgr.h" -#include "lib/err/torerr.h" +#include "lib/dispatch/dispatch_naming.h" +#include "lib/dispatch/msgtypes.h" +#include "lib/err/torerr.h" #include "lib/log/log.h" +#include "lib/malloc/malloc.h" +#include "lib/pubsub/pubsub_build.h" +#include "lib/pubsub/pubsub_connect.h" #include <stdio.h> #include <stdlib.h> @@ -106,6 +111,51 @@ subsystems_init_upto(int target_level) } /** + * Add publish/subscribe relationships to <b>builder</b> for all + * initialized subsystems of level no more than <b>target_level</b>. + **/ +int +subsystems_add_pubsub_upto(pubsub_builder_t *builder, + int target_level) +{ + for (unsigned i = 0; i < n_tor_subsystems; ++i) { + const subsys_fns_t *sys = tor_subsystems[i]; + if (!sys->supported) + continue; + if (sys->level > target_level) + break; + if (! sys_initialized[i]) + continue; + int r = 0; + if (sys->add_pubsub) { + subsys_id_t sysid = get_subsys_id(sys->name); + raw_assert(sysid != ERROR_ID); + pubsub_connector_t *connector; + connector = pubsub_connector_for_subsystem(builder, sysid); + r = sys->add_pubsub(connector); + pubsub_connector_free(connector); + } + if (r < 0) { + fprintf(stderr, "BUG: subsystem %s (at %u) could not connect to " + "publish/subscribe system.", sys->name, sys->level); + raw_assert_unreached_msg("A subsystem couldn't be connected."); + } + } + + return 0; +} + +/** + * Add publish/subscribe relationships to <b>builder</b> for all + * initialized subsystems. + **/ +int +subsystems_add_pubsub(pubsub_builder_t *builder) +{ + return subsystems_add_pubsub_upto(builder, MAX_SUBSYS_LEVEL); +} + +/** * Shut down all the subsystems. **/ void diff --git a/src/app/main/subsysmgr.h b/src/app/main/subsysmgr.h index a5e62f71d9..4ac44afca7 100644 --- a/src/app/main/subsysmgr.h +++ b/src/app/main/subsysmgr.h @@ -14,6 +14,11 @@ extern const unsigned n_tor_subsystems; int subsystems_init(void); int subsystems_init_upto(int level); +struct pubsub_builder_t; +int subsystems_add_pubsub_upto(struct pubsub_builder_t *builder, + int target_level); +int subsystems_add_pubsub(struct pubsub_builder_t *builder); + void subsystems_shutdown(void); void subsystems_shutdown_downto(int level); diff --git a/src/core/include.am b/src/core/include.am index 27954af5f5..9824601725 100644 --- a/src/core/include.am +++ b/src/core/include.am @@ -11,6 +11,7 @@ LIBTOR_APP_A_SOURCES = \ src/app/config/confparse.c \ src/app/config/statefile.c \ src/app/main/main.c \ + src/app/main/shutdown.c \ src/app/main/subsystem_list.c \ src/app/main/subsysmgr.c \ src/core/crypto/hs_ntor.c \ @@ -22,6 +23,7 @@ LIBTOR_APP_A_SOURCES = \ src/core/mainloop/connection.c \ src/core/mainloop/cpuworker.c \ src/core/mainloop/mainloop.c \ + src/core/mainloop/mainloop_pubsub.c \ src/core/mainloop/netstatus.c \ src/core/mainloop/periodic.c \ src/core/or/address_set.c \ @@ -208,6 +210,7 @@ noinst_HEADERS += \ src/app/config/statefile.h \ src/app/main/main.h \ src/app/main/ntmain.h \ + src/app/main/shutdown.h \ src/app/main/subsysmgr.h \ src/core/crypto/hs_ntor.h \ src/core/crypto/onion_crypto.h \ @@ -218,6 +221,7 @@ noinst_HEADERS += \ src/core/mainloop/connection.h \ src/core/mainloop/cpuworker.h \ src/core/mainloop/mainloop.h \ + src/core/mainloop/mainloop_pubsub.h \ src/core/mainloop/netstatus.h \ src/core/mainloop/periodic.h \ src/core/or/addr_policy_st.h \ diff --git a/src/core/mainloop/mainloop_pubsub.c b/src/core/mainloop/mainloop_pubsub.c new file mode 100644 index 0000000000..724a3115c8 --- /dev/null +++ b/src/core/mainloop/mainloop_pubsub.c @@ -0,0 +1,170 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "orconfig.h" + +#include "src/core/or/or.h" +#include "src/core/mainloop/mainloop.h" +#include "src/core/mainloop/mainloop_pubsub.h" + +#include "lib/container/smartlist.h" +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_naming.h" +#include "lib/evloop/compat_libevent.h" +#include "lib/pubsub/pubsub.h" +#include "lib/pubsub/pubsub_build.h" + +/** + * Dispatcher to use for delivering messages. + **/ +static dispatch_t *the_dispatcher = NULL; +static pubsub_items_t *the_pubsub_items = NULL; +/** + * A list of mainloop_event_t, indexed by channel ID, to flush the messages + * on a channel. + **/ +static smartlist_t *alert_events = NULL; + +/** + * Mainloop event callback: flush all the messages in a channel. + * + * The channel is encoded as a pointer, and passed via arg. + **/ +static void +flush_channel_event(mainloop_event_t *ev, void *arg) +{ + (void)ev; + if (!the_dispatcher) + return; + + channel_id_t chan = (channel_id_t)(uintptr_t)(arg); + dispatch_flush(the_dispatcher, chan, INT_MAX); +} + +/** + * Construct our global pubsub object from <b>builder</b>. Return 0 on + * success, -1 on failure. */ +int +tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder) +{ + int rv = -1; + tor_mainloop_disconnect_pubsub(); + + the_dispatcher = pubsub_builder_finalize(builder, &the_pubsub_items); + if (! the_dispatcher) + goto err; + + rv = 0; + goto done; + err: + tor_mainloop_disconnect_pubsub(); + done: + return rv; +} + +/** + * Install libevent events for all of the pubsub channels. + * + * Invoke this after tor_mainloop_connect_pubsub, and after libevent has been + * initialized. + */ +void +tor_mainloop_connect_pubsub_events(void) +{ + tor_assert(the_dispatcher); + tor_assert(! alert_events); + + const size_t num_channels = get_num_channel_ids(); + alert_events = smartlist_new(); + for (size_t i = 0; i < num_channels; ++i) { + smartlist_add(alert_events, + mainloop_event_postloop_new(flush_channel_event, + (void*)(uintptr_t)(i))); + } +} + +/** + * Dispatch alertfn callback: do nothing. Implements DELIV_NEVER. + **/ +static void +alertfn_never(dispatch_t *d, channel_id_t chan, void *arg) +{ + (void)d; + (void)chan; + (void)arg; +} + +/** + * Dispatch alertfn callback: activate a mainloop event. Implements + * DELIV_PROMPT. + **/ +static void +alertfn_prompt(dispatch_t *d, channel_id_t chan, void *arg) +{ + (void)d; + (void)chan; + mainloop_event_t *event = arg; + mainloop_event_activate(event); +} + +/** + * Dispatch alertfn callback: flush all messages right now. Implements + * DELIV_IMMEDIATE. + **/ +static void +alertfn_immediate(dispatch_t *d, channel_id_t chan, void *arg) +{ + (void) arg; + dispatch_flush(d, chan, INT_MAX); +} + +/** + * Set the strategy to be used for delivering messages on the named channel. + * + * This function needs to be called once globally for each channel, to + * set up how messages are delivered. + **/ +int +tor_mainloop_set_delivery_strategy(const char *msg_channel_name, + deliv_strategy_t strategy) +{ + channel_id_t chan = get_channel_id(msg_channel_name); + if (BUG(chan == ERROR_ID) || + BUG(chan >= smartlist_len(alert_events))) + return -1; + + switch (strategy) { + case DELIV_NEVER: + dispatch_set_alert_fn(the_dispatcher, chan, alertfn_never, NULL); + break; + case DELIV_PROMPT: + dispatch_set_alert_fn(the_dispatcher, chan, alertfn_prompt, + smartlist_get(alert_events, chan)); + break; + case DELIV_IMMEDIATE: + dispatch_set_alert_fn(the_dispatcher, chan, alertfn_immediate, NULL); + break; + } + return 0; +} + +/** + * Remove all pubsub dispatchers and events from the mainloop. + **/ +void +tor_mainloop_disconnect_pubsub(void) +{ + if (the_pubsub_items) { + pubsub_items_clear_bindings(the_pubsub_items); + pubsub_items_free(the_pubsub_items); + } + if (alert_events) { + SMARTLIST_FOREACH(alert_events, mainloop_event_t *, ev, + mainloop_event_free(ev)); + smartlist_free(alert_events); + } + dispatch_free(the_dispatcher); +} diff --git a/src/core/mainloop/mainloop_pubsub.h b/src/core/mainloop/mainloop_pubsub.h new file mode 100644 index 0000000000..a31b2b4ba7 --- /dev/null +++ b/src/core/mainloop/mainloop_pubsub.h @@ -0,0 +1,24 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_MAINLOOP_PUBSUB_H +#define TOR_MAINLOOP_PUBSUB_H + +struct pubsub_builder_t; + +typedef enum { + DELIV_NEVER=0, + DELIV_PROMPT, + DELIV_IMMEDIATE, +} deliv_strategy_t; + +int tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder); +void tor_mainloop_connect_pubsub_events(void); +int tor_mainloop_set_delivery_strategy(const char *msg_channel_name, + deliv_strategy_t strategy); +void tor_mainloop_disconnect_pubsub(void); + +#endif diff --git a/src/include.am b/src/include.am index 9070a69a03..77c126ba45 100644 --- a/src/include.am +++ b/src/include.am @@ -8,6 +8,7 @@ include src/lib/compress/include.am include src/lib/container/include.am include src/lib/crypt_ops/include.am include src/lib/defs/include.am +include src/lib/dispatch/include.am include src/lib/encoding/include.am include src/lib/evloop/include.am include src/lib/fdio/include.am @@ -24,6 +25,7 @@ include src/lib/malloc/include.am include src/lib/net/include.am include src/lib/osinfo/include.am include src/lib/process/include.am +include src/lib/pubsub/include.am include src/lib/sandbox/include.am include src/lib/string/include.am include src/lib/subsys/include.am diff --git a/src/lib/cc/compat_compiler.h b/src/lib/cc/compat_compiler.h index 3a0f307186..18b76cc1a1 100644 --- a/src/lib/cc/compat_compiler.h +++ b/src/lib/cc/compat_compiler.h @@ -217,4 +217,16 @@ /** Macro: Yields the number of elements in array x. */ #define ARRAY_LENGTH(x) ((sizeof(x)) / sizeof(x[0])) +/** + * "Eat" a semicolon that somebody puts at the end of a top-level macro. + * + * Frequently, we want to declare a macro that people will use at file scope, + * and we want to allow people to put a semicolon after the macro. + * + * This declaration of a struct can be repeated any number of times, and takes + * a trailing semicolon afterwards. + **/ +#define EAT_SEMICOLON \ + struct dummy_semicolon_eater__ + #endif /* !defined(TOR_COMPAT_H) */ diff --git a/src/lib/container/include.am b/src/lib/container/include.am index 032e4033da..50d35e749b 100644 --- a/src/lib/container/include.am +++ b/src/lib/container/include.am @@ -8,6 +8,7 @@ endif src_lib_libtor_container_a_SOURCES = \ src/lib/container/bloomfilt.c \ src/lib/container/map.c \ + src/lib/container/namemap.c \ src/lib/container/order.c \ src/lib/container/smartlist.c @@ -21,5 +22,7 @@ noinst_HEADERS += \ src/lib/container/bloomfilt.h \ src/lib/container/handles.h \ src/lib/container/map.h \ + src/lib/container/namemap.h \ + src/lib/container/namemap_st.h \ src/lib/container/order.h \ src/lib/container/smartlist.h diff --git a/src/lib/container/namemap.c b/src/lib/container/namemap.c new file mode 100644 index 0000000000..a90057b32c --- /dev/null +++ b/src/lib/container/namemap.c @@ -0,0 +1,184 @@ +/* Copyright (c) 2003-2004, Roger Dingledine + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "orconfig.h" +#include "lib/container/smartlist.h" +#include "lib/container/namemap.h" +#include "lib/container/namemap_st.h" +#include "lib/log/util_bug.h" +#include "lib/malloc/malloc.h" +#include "lib/string/printf.h" + +#include "ext/siphash.h" + +#include <string.h> + +/** Helper for namemap hashtable implementation: compare two entries. */ +static inline int +mapped_name_eq(const mapped_name_t *a, const mapped_name_t *b) +{ + return !strcmp(a->name, b->name); +} + +/** Helper for namemap hashtable implementation: hash an entry. */ +static inline unsigned +mapped_name_hash(const mapped_name_t *a) +{ + return (unsigned) siphash24g(a->name, strlen(a->name)); +} + +HT_PROTOTYPE(namemap_ht, mapped_name_t, node, mapped_name_hash, + mapped_name_eq) +HT_GENERATE2(namemap_ht, mapped_name_t, node, mapped_name_hash, + mapped_name_eq, 0.6, tor_reallocarray_, tor_free_) + +/** Set up an uninitialized <b>map</b>. */ +void +namemap_init(namemap_t *map) +{ + memset(map, 0, sizeof(*map)); + HT_INIT(namemap_ht, &map->ht); + map->names = smartlist_new(); +} + +/** Return the name that <b>map</b> associates with a given <b>id</b>, or + * NULL if there is no such name. */ +const char * +namemap_get_name(const namemap_t *map, unsigned id) +{ + if (map->names && id < (unsigned)smartlist_len(map->names)) { + mapped_name_t *name = smartlist_get(map->names, (int)id); + return name->name; + } else { + return NULL; + } +} + +/** + * Return the name that <b>map</b> associates with a given <b>id</b>, or a + * pointer to a statically allocated string describing the value of <b>id</b> + * if no such name exists. + **/ +const char * +namemap_fmt_name(const namemap_t *map, unsigned id) +{ + static char buf[32]; + + const char *name = namemap_get_name(map, id); + if (name) + return name; + + tor_snprintf(buf, sizeof(buf), "{%u}", id); + + return buf; +} + +/** + * Helper: As namemap_get_id(), but requires that <b>name</b> is + * <b>namelen</b> charaters long, and that <b>namelen</b> is no more than + * MAX_NAMEMAP_NAME_LEN. + */ +static unsigned +namemap_get_id_unchecked(const namemap_t *map, + const char *name, + size_t namelen) +{ + union { + mapped_name_t n; + char storage[MAX_NAMEMAP_NAME_LEN + sizeof(mapped_name_t) + 1]; + } u; + memcpy(u.n.name, name, namelen); + u.n.name[namelen] = 0; + const mapped_name_t *found = HT_FIND(namemap_ht, &map->ht, &u.n); + if (found) { + tor_assert(map->names); + tor_assert(smartlist_get(map->names, found->intval) == found); + return found->intval; + } + + return NAMEMAP_ERR; +} + +/** + * Return the identifier currently associated by <b>map</b> with the name + * <b>name</b>, or NAMEMAP_ERR if no such identifier exists. + **/ +unsigned +namemap_get_id(const namemap_t *map, + const char *name) +{ + size_t namelen = strlen(name); + if (namelen > MAX_NAMEMAP_NAME_LEN) { + return NAMEMAP_ERR; + } + + return namemap_get_id_unchecked(map, name, namelen); +} + +/** + * Return the identifier associated by <b>map</b> with the name + * <b>name</b>, allocating a new identifier in <b>map</b> if none exists. + * + * Return NAMEMAP_ERR if <b>name</b> is too long, or if there are no more + * identifiers we can allocate. + **/ +unsigned +namemap_get_or_create_id(namemap_t *map, + const char *name) +{ + size_t namelen = strlen(name); + if (namelen > MAX_NAMEMAP_NAME_LEN) { + return NAMEMAP_ERR; + } + + if (PREDICT_UNLIKELY(map->names == NULL)) + map->names = smartlist_new(); + + unsigned found = namemap_get_id_unchecked(map, name, namelen); + if (found != NAMEMAP_ERR) + return found; + + unsigned new_id = (unsigned)smartlist_len(map->names); + if (new_id == NAMEMAP_ERR) + return NAMEMAP_ERR; /* Can't allocate any more. */ + + mapped_name_t *insert = tor_malloc_zero( + offsetof(mapped_name_t, name) + namelen + 1); + memcpy(insert->name, name, namelen+1); + insert->intval = new_id; + + HT_INSERT(namemap_ht, &map->ht, insert); + smartlist_add(map->names, insert); + + return new_id; +} + +/** Return the number of entries in 'names' */ +size_t +namemap_get_size(const namemap_t *map) +{ + if (PREDICT_UNLIKELY(map->names == NULL)) + return 0; + + return smartlist_len(map->names); +} + +/** + * Release all storage held in <b>map</b>. + */ +void +namemap_clear(namemap_t *map) +{ + if (!map) + return; + + HT_CLEAR(namemap_ht, &map->ht); + if (map->names) { + SMARTLIST_FOREACH(map->names, mapped_name_t *, n, + tor_free(n)); + smartlist_free(map->names); + } + memset(map, 0, sizeof(*map)); +} diff --git a/src/lib/container/namemap.h b/src/lib/container/namemap.h new file mode 100644 index 0000000000..97792e13ba --- /dev/null +++ b/src/lib/container/namemap.h @@ -0,0 +1,35 @@ +/* Copyright (c) 2003-2004, Roger Dingledine + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_NAMEMAP_H +#define TOR_NAMEMAP_H + +/** + * \file namemap.h + * + * \brief Header for namemap.c + **/ + +#include "lib/cc/compat_compiler.h" +#include "ext/ht.h" + +#include <stddef.h> + +typedef struct namemap_t namemap_t; + +/** Returned in place of an identifier when an error occurs. */ +#define NAMEMAP_ERR UINT_MAX + +void namemap_init(namemap_t *map); +const char *namemap_get_name(const namemap_t *map, unsigned id); +const char *namemap_fmt_name(const namemap_t *map, unsigned id); +unsigned namemap_get_id(const namemap_t *map, + const char *name); +unsigned namemap_get_or_create_id(namemap_t *map, + const char *name); +size_t namemap_get_size(const namemap_t *map); +void namemap_clear(namemap_t *map); + +#endif diff --git a/src/lib/container/namemap_st.h b/src/lib/container/namemap_st.h new file mode 100644 index 0000000000..5717352fa2 --- /dev/null +++ b/src/lib/container/namemap_st.h @@ -0,0 +1,34 @@ +/* Copyright (c) 2003-2004, Roger Dingledine + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef NAMEMAP_ST_H +#define NAMEMAP_ST_H + +#include "lib/cc/compat_compiler.h" +#include "ext/ht.h" + +struct smartlist_t; + +/** Longest allowed name that's allowed in a namemap_t. */ +#define MAX_NAMEMAP_NAME_LEN 128 + +/** An entry inside a namemap_t. Maps a string to a numeric identifier. */ +typedef struct mapped_name_t { + HT_ENTRY(mapped_name_t) node; + unsigned intval; + char name[FLEXIBLE_ARRAY_MEMBER]; +} mapped_name_t; + +/** A structure that allocates small numeric identifiers for names and maps + * back and forth between them. */ +struct namemap_t { + HT_HEAD(namemap_ht, mapped_name_t) ht; + struct smartlist_t *names; +}; + +/** Macro to initialize a namemap. */ +#define NAMEMAP_INIT() { HT_INITIALIZER(), NULL } + +#endif diff --git a/src/lib/dispatch/.may_include b/src/lib/dispatch/.may_include new file mode 100644 index 0000000000..7f2df5859f --- /dev/null +++ b/src/lib/dispatch/.may_include @@ -0,0 +1,10 @@ +orconfig.h + +ext/tor_queue.h + +lib/cc/*.h +lib/container/*.h +lib/dispatch/*.h +lib/intmath/*.h +lib/log/*.h +lib/malloc/*.h diff --git a/src/lib/dispatch/dispatch.h b/src/lib/dispatch/dispatch.h new file mode 100644 index 0000000000..8e62e8f168 --- /dev/null +++ b/src/lib/dispatch/dispatch.h @@ -0,0 +1,114 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_DISPATCH_H +#define TOR_DISPATCH_H + +#include "lib/dispatch/msgtypes.h" + +/** + * \file dispatch.h + * \brief Low-level APIs for message-passing system. + * + * This module implements message dispatch based on a set of short integer + * identifiers. For a higher-level interface, see pubsub.h. + * + * Each message is represented as a generic msg_t object, and is discriminated + * by its message_id_t. Messages are delivered by a dispatch_t object, which + * delivers each message to its recipients by a configured "channel". + * + * A "channel" is a means of delivering messages. Every message_id_t must + * be associated with exactly one channel, identified by channel_id_t. + * When a channel receives messages, a callback is invoked to either process + * the messages immediately, or to cause them to be processed later. + * + * Every message_id_t has zero or more associated receiver functions set up in + * the dispatch_t object. Once the dispatch_t object is created, receivers + * can be enabled or disabled [TODO], but not added or removed. + * + * Every message_id_t has an associated datatype, identified by a + * msg_type_id_t. These datatypes can be associated with functions to + * (for example) free them, or format them for debugging. + * + * To setup a dispatch_t object, first create a dispatch_cfg_t object, and + * configure messages with their types, channels, and receivers. Then, use + * dispatch_new() with that dispatch_cfg_t to create the dispatch_t object. + * + * (We use a two-phase contruction procedure here to enable better static + * reasoning about publish/subscribe relationships.) + * + * Once you have a dispatch_t, you can queue messages on it with + * dispatch_send*(), and cause those messages to be delivered with + * dispatch_flush(). + **/ + +/** + * A "dispatcher" is the highest-level object; it handles making sure that + * messages are received and delivered properly. Only the mainloop + * should handle this type directly. + */ +typedef struct dispatch_t dispatch_t; + +struct dispatch_cfg_t; + +dispatch_t *dispatch_new(const struct dispatch_cfg_t *cfg); + +/** + * Free a dispatcher. Tor does this at exit. + */ +#define dispatch_free(d) \ + FREE_AND_NULL(dispatch_t, dispatch_free_, (d)) + +void dispatch_free_(dispatch_t *); + +int dispatch_send(dispatch_t *d, + subsys_id_t sender, + channel_id_t channel, + message_id_t msg, + msg_type_id_t type, + msg_aux_data_t auxdata); + +int dispatch_send_msg(dispatch_t *d, msg_t *m); + +int dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m); + +/* Flush up to <b>max_msgs</b> currently pending messages from the + * dispatcher. Messages that are not pending when this function are + * called, are not flushed by this call. Return 0 on success, -1 on + * unrecoverable error. + */ +int dispatch_flush(dispatch_t *, channel_id_t chan, int max_msgs); + +/** + * Function callback type used to alert some other module when a channel's + * queue changes from empty to nonempty. + * + * Ex 1: To cause messages to be processed immediately on-stack, this callback + * should invoke dispatch_flush() directly. + * + * Ex 2: To cause messages to be processed very soon, from the event queue, + * this callback should schedule an event callback to run dispatch_flush(). + * + * Ex 3: To cause messages to be processed periodically, this function should + * do nothing, and a periodic event should invoke dispatch_flush(). + **/ +typedef void (*dispatch_alertfn_t)(struct dispatch_t *, + channel_id_t, void *); + +int dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan, + dispatch_alertfn_t fn, void *userdata); + +#define dispatch_free_msg(d,msg) \ + STMT_BEGIN { \ + msg_t **msg_tmp_ptr__ = &(msg); \ + dispatch_free_msg_((d), *msg_tmp_ptr__); \ + *msg_tmp_ptr__= NULL; \ + } STMT_END +void dispatch_free_msg_(const dispatch_t *d, msg_t *msg); + +char *dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg); + +#endif diff --git a/src/lib/dispatch/dispatch_cfg.c b/src/lib/dispatch/dispatch_cfg.c new file mode 100644 index 0000000000..b3a72ec22f --- /dev/null +++ b/src/lib/dispatch/dispatch_cfg.c @@ -0,0 +1,141 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file dispatch_cfg.c + * \brief Create and configure a dispatch_cfg_t. + * + * A dispatch_cfg_t object is used to configure a set of messages and + * associated information before creating a dispatch_t. + */ + +#define DISPATCH_PRIVATE + +#include "orconfig.h" +#include "lib/dispatch/dispatch_cfg.h" +#include "lib/dispatch/dispatch_cfg_st.h" +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_st.h" + +#include "lib/container/smartlist.h" +#include "lib/malloc/malloc.h" + +/** + * Create and return a new dispatch_cfg_t. + **/ +dispatch_cfg_t * +dcfg_new(void) +{ + dispatch_cfg_t *cfg = tor_malloc(sizeof(dispatch_cfg_t)); + cfg->type_by_msg = smartlist_new(); + cfg->chan_by_msg = smartlist_new(); + cfg->fns_by_type = smartlist_new(); + cfg->recv_by_msg = smartlist_new(); + return cfg; +} + +/** + * Associate a message with a datatype. Return 0 on success, -1 if a + * different type was previously associated with the message ID. + **/ +int +dcfg_msg_set_type(dispatch_cfg_t *cfg, message_id_t msg, + msg_type_id_t type) +{ + smartlist_grow(cfg->type_by_msg, msg+1); + msg_type_id_t *oldval = smartlist_get(cfg->type_by_msg, msg); + if (oldval != NULL && *oldval != type) { + return -1; + } + if (!oldval) + smartlist_set(cfg->type_by_msg, msg, tor_memdup(&type, sizeof(type))); + return 0; +} + +/** + * Associate a message with a channel. Return 0 on success, -1 if a + * different channel was previously associated with the message ID. + **/ +int +dcfg_msg_set_chan(dispatch_cfg_t *cfg, message_id_t msg, + channel_id_t chan) +{ + smartlist_grow(cfg->chan_by_msg, msg+1); + channel_id_t *oldval = smartlist_get(cfg->chan_by_msg, msg); + if (oldval != NULL && *oldval != chan) { + return -1; + } + if (!oldval) + smartlist_set(cfg->chan_by_msg, msg, tor_memdup(&chan, sizeof(chan))); + return 0; +} + +/** + * Associate a set of functions with a datatype. Return 0 on success, -1 if + * different functions were previously associated with the type. + **/ +int +dcfg_type_set_fns(dispatch_cfg_t *cfg, msg_type_id_t type, + const dispatch_typefns_t *fns) +{ + smartlist_grow(cfg->fns_by_type, type+1); + dispatch_typefns_t *oldfns = smartlist_get(cfg->fns_by_type, type); + if (oldfns && (oldfns->free_fn != fns->free_fn || + oldfns->fmt_fn != fns->fmt_fn)) + return -1; + if (!oldfns) + smartlist_set(cfg->fns_by_type, type, tor_memdup(fns, sizeof(*fns))); + return 0; +} + +/** + * Associate a receiver with a message ID. Multiple receivers may be + * associated with a single messasge ID. + * + * Return 0 on success, on failure. + **/ +int +dcfg_add_recv(dispatch_cfg_t *cfg, message_id_t msg, + subsys_id_t sys, recv_fn_t fn) +{ + smartlist_grow(cfg->recv_by_msg, msg+1); + smartlist_t *receivers = smartlist_get(cfg->recv_by_msg, msg); + if (!receivers) { + receivers = smartlist_new(); + smartlist_set(cfg->recv_by_msg, msg, receivers); + } + + dispatch_rcv_t *rcv = tor_malloc(sizeof(dispatch_rcv_t)); + rcv->sys = sys; + rcv->enabled = true; + rcv->fn = fn; + smartlist_add(receivers, (void*)rcv); + return 0; +} + +/** Helper: release all storage held by <b>cfg</b>. */ +void +dcfg_free_(dispatch_cfg_t *cfg) +{ + if (!cfg) + return; + + SMARTLIST_FOREACH(cfg->type_by_msg, msg_type_id_t *, id, tor_free(id)); + SMARTLIST_FOREACH(cfg->chan_by_msg, channel_id_t *, id, tor_free(id)); + SMARTLIST_FOREACH(cfg->fns_by_type, dispatch_typefns_t *, f, tor_free(f)); + smartlist_free(cfg->type_by_msg); + smartlist_free(cfg->chan_by_msg); + smartlist_free(cfg->fns_by_type); + SMARTLIST_FOREACH_BEGIN(cfg->recv_by_msg, smartlist_t *, receivers) { + if (!receivers) + continue; + SMARTLIST_FOREACH(receivers, dispatch_rcv_t *, rcv, tor_free(rcv)); + smartlist_free(receivers); + } SMARTLIST_FOREACH_END(receivers); + smartlist_free(cfg->recv_by_msg); + + tor_free(cfg); +} diff --git a/src/lib/dispatch/dispatch_cfg.h b/src/lib/dispatch/dispatch_cfg.h new file mode 100644 index 0000000000..2c755e39bc --- /dev/null +++ b/src/lib/dispatch/dispatch_cfg.h @@ -0,0 +1,39 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_DISPATCH_CFG_H +#define TOR_DISPATCH_CFG_H + +#include "lib/dispatch/msgtypes.h" + +/** + * A "dispatch_cfg" is the configuration used to set up a dispatcher. + * It is created and accessed with a set of dcfg_* functions, and then + * used with dispatcher_new() to make the dispatcher. + */ +typedef struct dispatch_cfg_t dispatch_cfg_t; + +dispatch_cfg_t *dcfg_new(void); + +int dcfg_msg_set_type(dispatch_cfg_t *cfg, message_id_t msg, + msg_type_id_t type); + +int dcfg_msg_set_chan(dispatch_cfg_t *cfg, message_id_t msg, + channel_id_t chan); + +int dcfg_type_set_fns(dispatch_cfg_t *cfg, msg_type_id_t type, + const dispatch_typefns_t *fns); + +int dcfg_add_recv(dispatch_cfg_t *cfg, message_id_t msg, + subsys_id_t sys, recv_fn_t fn); + +/** Free a dispatch_cfg_t. */ +#define dcfg_free(cfg) \ + FREE_AND_NULL(dispatch_cfg_t, dcfg_free_, (cfg)) + +void dcfg_free_(dispatch_cfg_t *cfg); + +#endif diff --git a/src/lib/dispatch/dispatch_cfg_st.h b/src/lib/dispatch/dispatch_cfg_st.h new file mode 100644 index 0000000000..d004fe5934 --- /dev/null +++ b/src/lib/dispatch/dispatch_cfg_st.h @@ -0,0 +1,25 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_DISPATCH_CFG_ST_H +#define TOR_DISPATCH_CFG_ST_H + +struct smartlist_t; + +/* Information needed to create a dispatcher, but in a less efficient, more + * mutable format. */ +struct dispatch_cfg_t { + /** A list of msg_type_id_t (cast to void*), indexed by msg_t. */ + struct smartlist_t *type_by_msg; + /** A list of channel_id_t (cast to void*), indexed by msg_t. */ + struct smartlist_t *chan_by_msg; + /** A list of dispatch_rcv_t, indexed by msg_type_id_t. */ + struct smartlist_t *fns_by_type; + /** A list of dispatch_typefns_t, indexed by msg_t. */ + struct smartlist_t *recv_by_msg; +}; + +#endif diff --git a/src/lib/dispatch/dispatch_core.c b/src/lib/dispatch/dispatch_core.c new file mode 100644 index 0000000000..da54f9b437 --- /dev/null +++ b/src/lib/dispatch/dispatch_core.c @@ -0,0 +1,260 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file dispatch_core.c + * \brief Core module for sending and receiving messages. + */ + +#define DISPATCH_PRIVATE +#include "orconfig.h" + +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_st.h" +#include "lib/dispatch/dispatch_naming.h" + +#include "lib/malloc/malloc.h" +#include "lib/log/util_bug.h" + +#include <string.h> + +/** + * Use <b>d</b> to drop all storage held for <b>msg</b>. + * + * (We need the dispatcher so we know how to free the auxiliary data.) + **/ +void +dispatch_free_msg_(const dispatch_t *d, msg_t *msg) +{ + if (!msg) + return; + + d->typefns[msg->type].free_fn(msg->aux_data__); + tor_free(msg); +} + +/** + * Format the auxiliary data held by msg. + **/ +char * +dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg) +{ + if (!msg) + return NULL; + + return d->typefns[msg->type].fmt_fn(msg->aux_data__); +} + +/** + * Release all storage held by <b>d</b>. + **/ +void +dispatch_free_(dispatch_t *d) +{ + if (d == NULL) + return; + + size_t n_queues = d->n_queues; + for (size_t i = 0; i < n_queues; ++i) { + msg_t *m, *mtmp; + TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) { + dispatch_free_msg(d, m); + } + } + + size_t n_msgs = d->n_msgs; + + for (size_t i = 0; i < n_msgs; ++i) { + tor_free(d->table[i]); + } + tor_free(d->table); + tor_free(d->typefns); + tor_free(d->queues); + + // This is the only time we will treat d->cfg as non-const. + //dispatch_cfg_free_((dispatch_items_t *) d->cfg); + + tor_free(d); +} + +/** + * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever + * <b>chan</b> becomes nonempty. Return 0 on success, -1 on error. + **/ +int +dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan, + dispatch_alertfn_t fn, void *userdata) +{ + if (BUG(chan >= d->n_queues)) + return -1; + + dqueue_t *q = &d->queues[chan]; + q->alert_fn = fn; + q->alert_fn_arg = userdata; + return 0; +} + +/** + * Send a message on the appropriate channel notifying that channel if + * necessary. + * + * This function takes ownership of the auxiliary data; it can't be static or + * stack-allocated, and the caller is not allowed to use it afterwards. + * + * This function does not check the various vields of the message object for + * consistency. + **/ +int +dispatch_send(dispatch_t *d, + subsys_id_t sender, + channel_id_t channel, + message_id_t msg, + msg_type_id_t type, + msg_aux_data_t auxdata) +{ + if (!d->table[msg]) { + /* Fast path: nobody wants this data. */ + + d->typefns[type].free_fn(auxdata); + return 0; + } + + msg_t *m = tor_malloc(sizeof(msg_t)); + + m->sender = sender; + m->channel = channel; + m->msg = msg; + m->type = type; + memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t)); + + return dispatch_send_msg(d, m); +} + +int +dispatch_send_msg(dispatch_t *d, msg_t *m) +{ + if (BUG(!d)) + goto err; + if (BUG(!m)) + goto err; + if (BUG(m->channel >= d->n_queues)) + goto err; + if (BUG(m->msg >= d->n_msgs)) + goto err; + + dtbl_entry_t *ent = d->table[m->msg]; + if (ent) { + if (BUG(m->type != ent->type)) + goto err; + if (BUG(m->channel != ent->channel)) + goto err; + } + + return dispatch_send_msg_unchecked(d, m); + err: + /* Probably it isn't safe to free m, since type could be wrong. */ + return -1; +} + +/** + * Send a message on the appropriate queue, notifying that queue if necessary. + * + * This function takes ownership of the message object and its auxiliary data; + * it can't be static or stack-allocated, and the caller isn't allowed to use + * it afterwards. + * + * This function does not check the various fields of the message object for + * consistency, and can crash if they are out of range. Only functions that + * have already constructed the message in a safe way, or checked it for + * correctness themselves, should call this function. + **/ +int +dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m) +{ + /* Find the right queue. */ + dqueue_t *q = &d->queues[m->channel]; + bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue); + + /* Append the message. */ + TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next); + + if (debug_logging_enabled()) { + char *arg = dispatch_fmt_msg_data(d, m); + log_debug(LD_MESG, + "Queued: %s (%s) from %s, on %s.", + get_message_id_name(m->msg), + arg, + get_subsys_id_name(m->sender), + get_channel_id_name(m->channel)); + tor_free(arg); + } + + /* If we just made the queue nonempty for the first time, call the alert + * function. */ + if (was_empty) { + q->alert_fn(d, m->channel, q->alert_fn_arg); + } + + return 0; +} + +/** + * Run all of the callbacks on <b>d</b> associated with <b>m</b>. + **/ +static void +dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m) +{ + tor_assert(m->msg <= d->n_msgs); + dtbl_entry_t *ent = d->table[m->msg]; + int n_fns = ent->n_fns; + + if (debug_logging_enabled()) { + char *arg = dispatch_fmt_msg_data(d, m); + log_debug(LD_MESG, + "Delivering: %s (%s) from %s, on %s:", + get_message_id_name(m->msg), + arg, + get_subsys_id_name(m->sender), + get_channel_id_name(m->channel)); + tor_free(arg); + } + + int i; + for (i=0; i < n_fns; ++i) { + if (ent->rcv[i].enabled) { + log_debug(LD_MESG, " Delivering to %s.", + get_subsys_id_name(ent->rcv[i].sys)); + ent->rcv[i].fn(m); + } + } +} + +/** + * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b> + * on the given dispatcher. Return 0 on success or recoverable failure, + * -1 on unrecoverable error. + **/ +int +dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs) +{ + if (BUG(ch >= d->n_queues)) + return 0; + + int n_flushed = 0; + dqueue_t *q = &d->queues[ch]; + + while (n_flushed < max_msgs) { + msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue); + if (!m) + break; + TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next); + dispatcher_run_msg_cbs(d, m); + dispatch_free_msg(d, m); + ++n_flushed; + } + + return 0; +} diff --git a/src/lib/dispatch/dispatch_naming.c b/src/lib/dispatch/dispatch_naming.c new file mode 100644 index 0000000000..83d9a2d604 --- /dev/null +++ b/src/lib/dispatch/dispatch_naming.c @@ -0,0 +1,63 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "orconfig.h" + +#include "lib/cc/compat_compiler.h" + +#include "lib/dispatch/dispatch_naming.h" +#include "lib/dispatch/msgtypes.h" + +#include "lib/container/namemap.h" +#include "lib/container/namemap_st.h" + +#include "lib/log/util_bug.h" +#include "lib/log/log.h" + +#include <stdlib.h> + +/** Global namemap for message IDs. */ +static namemap_t message_id_map = NAMEMAP_INIT(); +/** Global namemap for subsystem IDs. */ +static namemap_t subsys_id_map = NAMEMAP_INIT(); +/** Global namemap for channel IDs. */ +static namemap_t channel_id_map = NAMEMAP_INIT(); +/** Global namemap for message type IDs. */ +static namemap_t msg_type_id_map = NAMEMAP_INIT(); + +void +dispatch_naming_init(void) +{ +} + +/* Helper macro: declare functions to map IDs to and from names for a given + * type in a namemap_t. + */ +#define DECLARE_ID_MAP_FNS(type) \ + type##_id_t \ + get_##type##_id(const char *name) \ + { \ + unsigned u = namemap_get_or_create_id(&type##_id_map, name); \ + tor_assert(u != NAMEMAP_ERR); \ + tor_assert(u != ERROR_ID); \ + return (type##_id_t) u; \ + } \ + const char * \ + get_##type##_id_name(type##_id_t id) \ + { \ + return namemap_fmt_name(&type##_id_map, id); \ + } \ + size_t \ + get_num_##type##_ids(void) \ + { \ + return namemap_get_size(&type##_id_map); \ + } \ + EAT_SEMICOLON + +DECLARE_ID_MAP_FNS(message); +DECLARE_ID_MAP_FNS(channel); +DECLARE_ID_MAP_FNS(subsys); +DECLARE_ID_MAP_FNS(msg_type); diff --git a/src/lib/dispatch/dispatch_naming.h b/src/lib/dispatch/dispatch_naming.h new file mode 100644 index 0000000000..c116d2184d --- /dev/null +++ b/src/lib/dispatch/dispatch_naming.h @@ -0,0 +1,46 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_DISPATCH_NAMING_H +#define TOR_DISPATCH_NAMING_H + +#include "lib/dispatch/msgtypes.h" +#include <stddef.h> + +/** + * Return an existing channel ID by name, allocating the channel ID if + * if necessary. Returns ERROR_ID if we have run out of + * channels + */ +channel_id_t get_channel_id(const char *); +/** + * Return the name corresponding to a given channel ID. + **/ +const char *get_channel_id_name(channel_id_t); +/** + * Return the total number of _named_ channel IDs. + **/ +size_t get_num_channel_ids(void); + +/* As above, but for messages. */ +message_id_t get_message_id(const char *); +const char *get_message_id_name(message_id_t); +size_t get_num_message_ids(void); + +/* As above, but for subsystems */ +subsys_id_t get_subsys_id(const char *); +const char *get_subsys_id_name(subsys_id_t); +size_t get_num_subsys_ids(void); + +/* As above, but for types. Note that types additionally must be + * "defined", if any message is to use them. */ +msg_type_id_t get_msg_type_id(const char *); +const char *get_msg_type_id_name(msg_type_id_t); +size_t get_num_msg_type_ids(void); + +void dispatch_naming_init(void); + +#endif diff --git a/src/lib/dispatch/dispatch_new.c b/src/lib/dispatch/dispatch_new.c new file mode 100644 index 0000000000..b89ef43ea7 --- /dev/null +++ b/src/lib/dispatch/dispatch_new.c @@ -0,0 +1,174 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file dispatch_new.c + * \brief Code to construct a dispatch_t from a dispatch_cfg_t. + **/ + +#define DISPATCH_PRIVATE +#include "orconfig.h" + +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_st.h" +#include "lib/dispatch/dispatch_cfg.h" +#include "lib/dispatch/dispatch_cfg_st.h" + +#include "lib/cc/ctassert.h" +#include "lib/intmath/cmp.h" +#include "lib/malloc/malloc.h" +#include "lib/log/util_bug.h" + +#include <string.h> + +/** Given a smartlist full of (possibly NULL) pointers to uint16_t values, + * return the largest value, or dflt if the list is empty. */ +static int +max_in_sl(const smartlist_t *sl, int dflt) +{ + uint16_t *maxptr = NULL; + SMARTLIST_FOREACH_BEGIN(sl, uint16_t *, u) { + if (!maxptr) + maxptr = u; + else if (*u > *maxptr) + maxptr = u; + } SMARTLIST_FOREACH_END(u); + + return maxptr ? *maxptr : dflt; +} + +/* The above function is only safe to call if we are sure that channel_id_t + * and msg_type_id_t are really uint16_t. They should be so defined in + * msgtypes.h, but let's be extra cautious. + */ +CTASSERT(sizeof(uint16_t) == sizeof(msg_type_id_t)); +CTASSERT(sizeof(uint16_t) == sizeof(channel_id_t)); + +/** Helper: Format an unformattable message auxiliary data item: just return a +* copy of the string <>. */ +static char * +type_fmt_nop(msg_aux_data_t arg) +{ + (void)arg; + return tor_strdup("<>"); +} + +/** Helper: Free an unfreeable message auxiliary data item: do nothing. */ +static void +type_free_nop(msg_aux_data_t arg) +{ + (void)arg; +} + +/** Type functions to use when no type functions are provided. */ +static dispatch_typefns_t nop_typefns = { + .free_fn = type_free_nop, + .fmt_fn = type_fmt_nop +}; + +/** + * Alert function to use when none is configured: do nothing. + **/ +static void +alert_fn_nop(dispatch_t *d, channel_id_t ch, void *arg) +{ + (void)d; + (void)ch; + (void)arg; +} + +/** + * Given a list of recvfn_t, create and return a new dtbl_entry_t mapping + * to each of those functions. + **/ +static dtbl_entry_t * +dtbl_entry_from_lst(smartlist_t *receivers) +{ + if (!receivers) + return NULL; + + size_t n_recv = smartlist_len(receivers); + dtbl_entry_t *ent; + ent = tor_malloc_zero(offsetof(dtbl_entry_t, rcv) + + sizeof(dispatch_rcv_t) * n_recv); + + ent->n_fns = n_recv; + + SMARTLIST_FOREACH_BEGIN(receivers, const dispatch_rcv_t *, rcv) { + memcpy(&ent->rcv[rcv_sl_idx], rcv, sizeof(*rcv)); + if (rcv->enabled) { + ++ent->n_enabled; + } + } SMARTLIST_FOREACH_END(rcv); + + return ent; +} + +/** Create and return a new dispatcher from a given dispatch_cfg_t. */ +dispatch_t * +dispatch_new(const dispatch_cfg_t *cfg) +{ + dispatch_t *d = tor_malloc_zero(sizeof(dispatch_t)); + + /* Any message that has a type or a receiver counts towards our messages */ + const size_t n_msgs = MAX(smartlist_len(cfg->type_by_msg), + smartlist_len(cfg->recv_by_msg)) + 1; + + /* Any channel that any message has counts towards the number of channels. */ + const size_t n_chans = (size_t) MAX(1, max_in_sl(cfg->chan_by_msg,0)) + 1; + + /* Any type that a message has, or that has functions, counts towards + * the number of types. */ + const size_t n_types = (size_t) MAX(max_in_sl(cfg->type_by_msg,0), + smartlist_len(cfg->fns_by_type)) + 1; + + d->n_msgs = n_msgs; + d->n_queues = n_chans; + d->n_types = n_types; + + /* Initialize the array of type-functions. */ + d->typefns = tor_calloc(n_types, sizeof(dispatch_typefns_t)); + for (size_t i = 0; i < n_types; ++i) { + /* Default to no-op for everything... */ + memcpy(&d->typefns[i], &nop_typefns, sizeof(dispatch_typefns_t)); + } + SMARTLIST_FOREACH_BEGIN(cfg->fns_by_type, dispatch_typefns_t *, fns) { + /* Set the functions if they are provided. */ + if (fns) { + if (fns->free_fn) + d->typefns[fns_sl_idx].free_fn = fns->free_fn; + if (fns->fmt_fn) + d->typefns[fns_sl_idx].fmt_fn = fns->fmt_fn; + } + } SMARTLIST_FOREACH_END(fns); + + /* Initialize the message queues: one for each channel. */ + d->queues = tor_calloc(d->n_queues, sizeof(dqueue_t)); + for (size_t i = 0; i < d->n_queues; ++i) { + TOR_SIMPLEQ_INIT(&d->queues[i].queue); + d->queues[i].alert_fn = alert_fn_nop; + } + + /* Build the dispatch tables mapping message IDs to receivers. */ + d->table = tor_calloc(d->n_msgs, sizeof(dtbl_entry_t *)); + SMARTLIST_FOREACH_BEGIN(cfg->recv_by_msg, smartlist_t *, rcv) { + d->table[rcv_sl_idx] = dtbl_entry_from_lst(rcv); + } SMARTLIST_FOREACH_END(rcv); + + /* Fill in the empty entries in the dispatch tables: + * types and channels for each message. */ + SMARTLIST_FOREACH_BEGIN(cfg->type_by_msg, msg_type_id_t *, type) { + if (d->table[type_sl_idx]) + d->table[type_sl_idx]->type = *type; + } SMARTLIST_FOREACH_END(type); + + SMARTLIST_FOREACH_BEGIN(cfg->chan_by_msg, channel_id_t *, chan) { + if (d->table[chan_sl_idx]) + d->table[chan_sl_idx]->channel = *chan; + } SMARTLIST_FOREACH_END(chan); + + return d; +} diff --git a/src/lib/dispatch/dispatch_st.h b/src/lib/dispatch/dispatch_st.h new file mode 100644 index 0000000000..568107b700 --- /dev/null +++ b/src/lib/dispatch/dispatch_st.h @@ -0,0 +1,108 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file dispatch_st.h + * + * \brief private structures used for the dispatcher module + */ + +#ifndef TOR_DISPATCH_ST_H +#define TOR_DISPATCH_ST_H + +#ifdef DISPATCH_PRIVATE + +#include "lib/container/smartlist.h" + +/** + * Information about the recipient of a message. + **/ +typedef struct dispatch_rcv_t { + /** The subsystem receiving a message. */ + subsys_id_t sys; + /** True iff this recipient is enabled. */ + bool enabled; + /** The function that will handle the message. */ + recv_fn_t fn; +} dispatch_rcv_t; + +/** + * Information used by a dispatcher to handle and dispatch a single message + * ID. It maps that message ID to its type, channel, and list of receiver + * functions. + * + * This structure is used when the dispatcher is running. + **/ +typedef struct dtbl_entry_t { + /** The number of enabled non-stub subscribers for this message. + * + * Note that for now, this will be the same as <b>n_fns</b>, since there is + * no way to turn these subscribers on an off yet. */ + uint16_t n_enabled; + /** The channel that handles this message. */ + channel_id_t channel; + /** The associated C type for this message. */ + msg_type_id_t type; + /** + * The number of functions pointers for subscribers that receive this + * message, in rcv. */ + uint16_t n_fns; + /** + * The recipients for this message. + */ + dispatch_rcv_t rcv[FLEXIBLE_ARRAY_MEMBER]; +} dtbl_entry_t; + +/** + * A queue of messages for a given channel, used by a live dispatcher. + */ +typedef struct dqueue_t { + /** The queue of messages itself. */ + TOR_SIMPLEQ_HEAD( , msg_t) queue; + /** A function to be called when the queue becomes nonempty. */ + dispatch_alertfn_t alert_fn; + /** An argument for the alert_fn. */ + void *alert_fn_arg; +} dqueue_t ; + +/** + * A single dispatcher for cross-module messages. + */ +struct dispatch_t { + /** + * The length of <b>table</b>: the number of message IDs that this + * dispatcher can handle. + */ + size_t n_msgs; + /** + * The length of <b>queues</b>: the number of channels that this dispatcher + * has configured. + */ + size_t n_queues; + /** + * The length of <b>typefns</b>: the number of C type IDs that this + * dispatcher has configured. + */ + size_t n_types; + /** + * An array of message queues, indexed by channel ID. + */ + dqueue_t *queues; + /** + * An array of entries about how to handle particular message types, indexed + * by message ID. + */ + dtbl_entry_t **table; + /** + * An array of function tables for manipulating types, index by message + * type ID. + **/ + dispatch_typefns_t *typefns; +}; + +#endif + +#endif diff --git a/src/lib/dispatch/include.am b/src/lib/dispatch/include.am new file mode 100644 index 0000000000..4ec5b75cd1 --- /dev/null +++ b/src/lib/dispatch/include.am @@ -0,0 +1,25 @@ + +noinst_LIBRARIES += src/lib/libtor-dispatch.a + +if UNITTESTS_ENABLED +noinst_LIBRARIES += src/lib/libtor-dispatch-testing.a +endif + +src_lib_libtor_dispatch_a_SOURCES = \ + src/lib/dispatch/dispatch_cfg.c \ + src/lib/dispatch/dispatch_core.c \ + src/lib/dispatch/dispatch_naming.c \ + src/lib/dispatch/dispatch_new.c + +src_lib_libtor_dispatch_testing_a_SOURCES = \ + $(src_lib_libtor_dispatch_a_SOURCES) +src_lib_libtor_dispatch_testing_a_CPPFLAGS = $(AM_CPPFLAGS) $(TEST_CPPFLAGS) +src_lib_libtor_dispatch_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) + +noinst_HEADERS += \ + src/lib/dispatch/dispatch.h \ + src/lib/dispatch/dispatch_cfg.h \ + src/lib/dispatch/dispatch_cfg_st.h \ + src/lib/dispatch/dispatch_naming.h \ + src/lib/dispatch/dispatch_st.h \ + src/lib/dispatch/msgtypes.h diff --git a/src/lib/dispatch/msgtypes.h b/src/lib/dispatch/msgtypes.h new file mode 100644 index 0000000000..4e79e592a6 --- /dev/null +++ b/src/lib/dispatch/msgtypes.h @@ -0,0 +1,80 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file msgtypes.h + * \brief Types used for messages in the dispatcher code. + **/ + +#ifndef TOR_DISPATCH_MSGTYPES_H +#define TOR_DISPATCH_MSGTYPES_H + +#include <stdint.h> + +#include "ext/tor_queue.h" + +/** + * These types are aliases for subsystems, channels, and message IDs. + **/ +typedef uint16_t subsys_id_t; +typedef uint16_t channel_id_t; +typedef uint16_t message_id_t; + +/** + * This identifies a C type that can be sent along with a message. + **/ +typedef uint16_t msg_type_id_t; + +/** + * An ID value returned for *_type_t when none exists. + */ +#define ERROR_ID 65535 + +/** + * Auxiliary (untyped) data sent along with a message. + * + * We define this as a union of a pointer and a u64, so that the integer + * types will have the same range across platforms. + **/ +typedef union { + void *ptr; + uint64_t u64; +} msg_aux_data_t; + +/** + * Structure of a received message. + **/ +typedef struct msg_t { + TOR_SIMPLEQ_ENTRY(msg_t) next; + subsys_id_t sender; + channel_id_t channel; + message_id_t msg; + /** We could omit this field, since it is implicit in the message type, but + * IMO let's leave it in for safety. */ + msg_type_id_t type; + /** Untyped auxiliary data. You shouldn't have to mess with this + * directly. */ + msg_aux_data_t aux_data__; +} msg_t; + +/** + * A function that a subscriber uses to receive a message. + **/ +typedef void (*recv_fn_t)(const msg_t *m); + +/** + * Table of functions to use for a given C type. Any omitted (NULL) functions + * will be treated as no-ops. + **/ +typedef struct dispatch_typefns_t { + /** Release storage held for the auxiliary data of this type. */ + void (*free_fn)(msg_aux_data_t); + /** Format and return a newly allocated string describing the contents + * of this data element. */ + char *(*fmt_fn)(msg_aux_data_t); +} dispatch_typefns_t; + +#endif diff --git a/src/lib/log/log.c b/src/lib/log/log.c index d21d8d1d41..84e3eafc27 100644 --- a/src/lib/log/log.c +++ b/src/lib/log/log.c @@ -49,6 +49,7 @@ #include "lib/wallclock/approx_time.h" #include "lib/wallclock/time_to_tm.h" #include "lib/fdio/fdio.h" +#include "lib/cc/ctassert.h" #ifdef HAVE_ANDROID_LOG_H #include <android/log.h> @@ -1268,9 +1269,12 @@ static const char *domain_list[] = { "GENERAL", "CRYPTO", "NET", "CONFIG", "FS", "PROTOCOL", "MM", "HTTP", "APP", "CONTROL", "CIRC", "REND", "BUG", "DIR", "DIRSERV", "OR", "EDGE", "ACCT", "HIST", "HANDSHAKE", "HEARTBEAT", "CHANNEL", - "SCHED", "GUARD", "CONSDIFF", "DOS", "PROCESS", "PT", "BTRACK", NULL + "SCHED", "GUARD", "CONSDIFF", "DOS", "PROCESS", "PT", "BTRACK", "MESG", + NULL }; +CTASSERT(ARRAY_LENGTH(domain_list) == N_LOGGING_DOMAINS + 1); + /** Return a bitmask for the log domain for which <b>domain</b> is the name, * or 0 if there is no such name. */ static log_domain_mask_t diff --git a/src/lib/log/log.h b/src/lib/log/log.h index 0420f35ee0..a381220af0 100644 --- a/src/lib/log/log.h +++ b/src/lib/log/log.h @@ -114,8 +114,9 @@ #define LD_PT (1u<<27) /** Bootstrap tracker. */ #define LD_BTRACK (1u<<28) -/** Number of logging domains in the code. */ -#define N_LOGGING_DOMAINS 29 +/** Message-passing backend. */ +#define LD_MESG (1u<<29) +#define N_LOGGING_DOMAINS 30 /** This log message is not safe to send to a callback-based logger * immediately. Used as a flag, not a log domain. */ @@ -193,6 +194,15 @@ void tor_log_get_logfile_names(struct smartlist_t *out); extern int log_global_min_severity_; +static inline bool debug_logging_enabled(void); +/** + * Return true iff debug logging is enabled for at least one domain. + */ +static inline bool debug_logging_enabled(void) +{ + return PREDICT_UNLIKELY(log_global_min_severity_ == LOG_DEBUG); +} + void log_fn_(int severity, log_domain_mask_t domain, const char *funcname, const char *format, ...) CHECK_PRINTF(4,5); @@ -222,8 +232,8 @@ void tor_log_string(int severity, log_domain_mask_t domain, log_fn_ratelim_(ratelim, severity, domain, __FUNCTION__, args) #define log_debug(domain, args...) \ STMT_BEGIN \ - if (PREDICT_UNLIKELY(log_global_min_severity_ == LOG_DEBUG)) \ - log_fn_(LOG_DEBUG, domain, __FUNCTION__, args); \ + if (debug_logging_enabled()) \ + log_fn_(LOG_DEBUG, domain, __FUNCTION__, args); \ STMT_END #define log_info(domain, args...) \ log_fn_(LOG_INFO, domain, __FUNCTION__, args) @@ -240,8 +250,8 @@ void tor_log_string(int severity, log_domain_mask_t domain, #define log_debug(domain, args, ...) \ STMT_BEGIN \ - if (PREDICT_UNLIKELY(log_global_min_severity_ == LOG_DEBUG)) \ - log_fn_(LOG_DEBUG, domain, __FUNCTION__, args, ##__VA_ARGS__); \ + if (debug_logging_enabled()) \ + log_fn_(LOG_DEBUG, domain, __FUNCTION__, args, ##__VA_ARGS__); \ STMT_END #define log_info(domain, args,...) \ log_fn_(LOG_INFO, domain, __FUNCTION__, args, ##__VA_ARGS__) diff --git a/src/lib/pubsub/.may_include b/src/lib/pubsub/.may_include new file mode 100644 index 0000000000..5623492f00 --- /dev/null +++ b/src/lib/pubsub/.may_include @@ -0,0 +1,10 @@ +orconfig.h + +lib/cc/*.h +lib/container/*.h +lib/dispatch/*.h +lib/intmath/*.h +lib/log/*.h +lib/malloc/*.h +lib/pubsub/*.h +lib/string/*.h diff --git a/src/lib/pubsub/include.am b/src/lib/pubsub/include.am new file mode 100644 index 0000000000..c0ec13d039 --- /dev/null +++ b/src/lib/pubsub/include.am @@ -0,0 +1,26 @@ + +noinst_LIBRARIES += src/lib/libtor-pubsub.a + +if UNITTESTS_ENABLED +noinst_LIBRARIES += src/lib/libtor-pubsub-testing.a +endif + +src_lib_libtor_pubsub_a_SOURCES = \ + src/lib/pubsub/pubsub_build.c \ + src/lib/pubsub/pubsub_check.c \ + src/lib/pubsub/pubsub_publish.c + +src_lib_libtor_pubsub_testing_a_SOURCES = \ + $(src_lib_libtor_pubsub_a_SOURCES) +src_lib_libtor_pubsub_testing_a_CPPFLAGS = $(AM_CPPFLAGS) $(TEST_CPPFLAGS) +src_lib_libtor_pubsub_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) + +noinst_HEADERS += \ + src/lib/pubsub/pub_binding_st.h \ + src/lib/pubsub/pubsub.h \ + src/lib/pubsub/pubsub_build.h \ + src/lib/pubsub/pubsub_builder_st.h \ + src/lib/pubsub/pubsub_connect.h \ + src/lib/pubsub/pubsub_flags.h \ + src/lib/pubsub/pubsub_macros.h \ + src/lib/pubsub/pubsub_publish.h diff --git a/src/lib/pubsub/pub_binding_st.h b/src/lib/pubsub/pub_binding_st.h new file mode 100644 index 0000000000..4f5df8ff38 --- /dev/null +++ b/src/lib/pubsub/pub_binding_st.h @@ -0,0 +1,38 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file pub_binding_st.h + * @brief Declaration of pub_binding_t. + * + * This is an internal type for the pubsub implementation. + */ + +#ifndef TOR_PUB_BINDING_ST_H +#define TOR_PUB_BINDING_ST_H + +#include "lib/dispatch/msgtypes.h" +struct dispatch_t; + +/** + * A pub_binding_t is an opaque object that subsystems use to publish + * messages. The DISPATCH_ADD_PUB*() macros set it up. + **/ +typedef struct pub_binding_t { + /** + * A pointer to a configured dispatch_t object. This is filled in + * when the dispatch_t is finally constructed. + **/ + struct dispatch_t *dispatch_ptr; + /** + * A template for the msg_t fields that are filled in for this message. + * This is copied into outgoing messages, ensuring that their fields are set + * corretly. + **/ + msg_t msg_template; +} pub_binding_t; + +#endif diff --git a/src/lib/pubsub/pubsub.h b/src/lib/pubsub/pubsub.h new file mode 100644 index 0000000000..08e3f42f7c --- /dev/null +++ b/src/lib/pubsub/pubsub.h @@ -0,0 +1,89 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file pubsub.h + * @brief Header for OO publish-subscribe functionality. + * + * This module provides a wrapper around the "dispatch" module, + * ensuring type-safety and allowing us to do static analysis on + * publication and subscriptions. + * + * With this module, we enforce: + * <ul> + * <li>that every message has (potential) publishers and subscribers; + * <li>that every message is published and subscribed from the correct + * channels, with the correct type ID, every time it is published. + * <li>that type IDs correspond to a single C type, and that the C types are + * used correctly. + * <li>that when a message is published or subscribed, it is done with + * a correct subsystem identifier + * </ul> + * + * We do this by making "publication requests" and "subscription requests" + * into objects, and doing some computation on them before we create + * a dispatch_t with them. + * + * Rather than using the dispatch module directly, a publishing module + * receives a "binding" object that it uses to send messages with the right + * settings. + * + * Most users of this module will want to use this header, and the + * pubsub_macros.h header for convenience. + */ + +/* + * + * Overview: Messages are sent over channels. Before sending a message on a + * channel, or receiving a message on a channel, a subsystem needs to register + * that it publishes, or subscribes, to that message, on that channel. + * + * Messages, channels, and subsystems are represented internally as short + * integers, though they are associated with human-readable strings for + * initialization and debugging. + * + * When registering for a message, a subsystem must say whether it is an + * exclusive publisher/subscriber to that message type, or whether other + * subsystems may also publish/subscribe to it. + * + * All messages and their publishers/subscribers must be registered early in + * the initialization process. + * + * By default, it is an error for a message type to have publishers and no + * subscribers on a channel, or subscribers and no publishers on a channel. + * + * A subsystem may register for a message with a note that delivery or + * production is disabled -- for example, because the subsystem is + * disabled at compile-time. It is not an error for a message type to + * have all of its publishers or subscribers disabled. + * + * After a message is sent, it is delivered to every recipient. This + * delivery happens from the top level of the event loop; it may be + * interleaved with network events, timers, etc. + * + * Messages may have associated data. This data is typed, and is owned + * by the message. Strings, byte-arrays, and integers have built-in + * support. Other types may be added. If objects are to be sent, + * they should be identified by handle. If an object requires cleanup, + * it should be declared with an associated free function. + * + * Semantically, if two subsystems communicate only by this kind of + * message passing, neither is considered to depend on the other, though + * both are considered to have a dependency on the message and on any + * types it contains. + * + * (Or generational index?) + */ +#ifndef TOR_PUBSUB_PUBSUB_H +#define TOR_PUBSUB_PUBSUB_H + +#include "lib/pubsub/pub_binding_st.h" +#include "lib/pubsub/pubsub_connect.h" +#include "lib/pubsub/pubsub_flags.h" +#include "lib/pubsub/pubsub_macros.h" +#include "lib/pubsub/pubsub_publish.h" + +#endif diff --git a/src/lib/pubsub/pubsub_build.c b/src/lib/pubsub/pubsub_build.c new file mode 100644 index 0000000000..e44b7d76ec --- /dev/null +++ b/src/lib/pubsub/pubsub_build.c @@ -0,0 +1,307 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file pubsub_build.c + * @brief Construct a dispatch_t in safer, more OO way. + **/ + +#define PUBSUB_PRIVATE + +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_cfg.h" +#include "lib/dispatch/dispatch_naming.h" +#include "lib/dispatch/msgtypes.h" +#include "lib/pubsub/pubsub_flags.h" +#include "lib/pubsub/pub_binding_st.h" +#include "lib/pubsub/pubsub_build.h" +#include "lib/pubsub/pubsub_builder_st.h" +#include "lib/pubsub/pubsub_connect.h" + +#include "lib/container/smartlist.h" +#include "lib/log/util_bug.h" +#include "lib/malloc/malloc.h" + + #include <string.h> + +/** Construct and return a new empty pubsub_items_t. */ +static pubsub_items_t * +pubsub_items_new(void) +{ + pubsub_items_t *cfg = tor_malloc_zero(sizeof(*cfg)); + cfg->items = smartlist_new(); + cfg->type_items = smartlist_new(); + return cfg; +} + +/** Release all storage held in a pubsub_items_t. */ +void +pubsub_items_free_(pubsub_items_t *cfg) +{ + if (! cfg) + return; + SMARTLIST_FOREACH(cfg->items, pubsub_cfg_t *, item, tor_free(item)); + SMARTLIST_FOREACH(cfg->type_items, + pubsub_type_cfg_t *, item, tor_free(item)); + smartlist_free(cfg->items); + smartlist_free(cfg->type_items); + tor_free(cfg); +} + +/** Construct and return a new pubsub_builder_t. */ +pubsub_builder_t * +pubsub_builder_new(void) +{ + dispatch_naming_init(); + + pubsub_builder_t *pb = tor_malloc_zero(sizeof(*pb)); + pb->cfg = dcfg_new(); + pb->items = pubsub_items_new(); + return pb; +} + +/** + * Release all storage held by a pubsub_builder_t. + * + * You'll (mostly) only want to call this function on an error case: if you're + * constructing a dispatch_t instead, you should call + * pubsub_builder_finalize() to consume the pubsub_builder_t. + */ +void +pubsub_builder_free_(pubsub_builder_t *pb) +{ + if (pb == NULL) + return; + pubsub_items_free(pb->items); + dcfg_free(pb->cfg); + tor_free(pb); +} + +/** + * Create and return a pubsub_connector_t for the subsystem with ID + * <b>subsys</b> to use in adding publications, subscriptions, and types to + * <b>builder</b>. + **/ +pubsub_connector_t * +pubsub_connector_for_subsystem(pubsub_builder_t *builder, + subsys_id_t subsys) +{ + tor_assert(builder); + ++builder->n_connectors; + + pubsub_connector_t *con = tor_malloc_zero(sizeof(*con)); + + con->builder = builder; + con->subsys_id = subsys; + + return con; +} + +/** + * Release all storage held by a pubsub_connector_t. + **/ +void +pubsub_connector_free_(pubsub_connector_t *con) +{ + if (!con) + return; + + if (con->builder) { + --con->builder->n_connectors; + tor_assert(con->builder->n_connectors >= 0); + } + tor_free(con); +} + +/** + * Use <b>con</b> to add a request for being able to publish messages of type + * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>. + **/ +int +pubsub_add_pub_(pubsub_connector_t *con, + pub_binding_t *out, + channel_id_t channel, + message_id_t msg, + msg_type_id_t type, + unsigned flags, + const char *file, + unsigned line) +{ + pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg)); + + memset(out, 0, sizeof(*out)); + cfg->is_publish = true; + + out->msg_template.sender = cfg->subsys = con->subsys_id; + out->msg_template.channel = cfg->channel = channel; + out->msg_template.msg = cfg->msg = msg; + out->msg_template.type = cfg->type = type; + + cfg->flags = flags; + cfg->added_by_file = file; + cfg->added_by_line = line; + + /* We're grabbing a pointer to the pub_binding_t so we can tell it about + * the dispatcher later on. + */ + cfg->pub_binding = out; + + smartlist_add(con->builder->items->items, cfg); + + if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0) + goto err; + if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0) + goto err; + + return 0; + err: + ++con->builder->n_errors; + return -1; +} + +/** + * Use <b>con</b> to add a request for being able to publish messages of type + * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>, + * passing them to the callback in <b>recv_fn</b>. + **/ +int +pubsub_add_sub_(pubsub_connector_t *con, + recv_fn_t recv_fn, + channel_id_t channel, + message_id_t msg, + msg_type_id_t type, + unsigned flags, + const char *file, + unsigned line) +{ + pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg)); + + cfg->is_publish = false; + cfg->subsys = con->subsys_id; + cfg->channel = channel; + cfg->msg = msg; + cfg->type = type; + cfg->flags = flags; + cfg->added_by_file = file; + cfg->added_by_line = line; + + cfg->recv_fn = recv_fn; + + smartlist_add(con->builder->items->items, cfg); + + if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0) + goto err; + if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0) + goto err; + if (! (flags & DISP_FLAG_STUB)) { + if (dcfg_add_recv(con->builder->cfg, msg, cfg->subsys, recv_fn) < 0) + goto err; + } + + return 0; + err: + ++con->builder->n_errors; + return -1; +} + +/** + * Use <b>con</b> to define the functions to use for manipulating the type + * <b>type</b>. Any function pointers left as NULL will be implemented as + * no-ops. + **/ +int +pubsub_connector_register_type_(pubsub_connector_t *con, + msg_type_id_t type, + dispatch_typefns_t *fns, + const char *file, + unsigned line) +{ + pubsub_type_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg)); + cfg->type = type; + memcpy(&cfg->fns, fns, sizeof(*fns)); + cfg->subsys = con->subsys_id; + cfg->added_by_file = file; + cfg->added_by_line = line; + + smartlist_add(con->builder->items->type_items, cfg); + + if (dcfg_type_set_fns(con->builder->cfg, type, fns) < 0) + goto err; + + return 0; + err: + ++con->builder->n_errors; + return -1; +} + +/** + * Initialize the dispatch_ptr field in every relevant publish binding + * for <b>d</b>. + */ +static void +pubsub_items_install_bindings(pubsub_items_t *items, + dispatch_t *d) +{ + SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) { + if (cfg->pub_binding) { + // XXXX we could skip this for STUB publishers, and for any publishers + // XXXX where all subscribers are STUB. + cfg->pub_binding->dispatch_ptr = d; + } + } SMARTLIST_FOREACH_END(cfg); +} + +/** + * Remove the dispatch_ptr fields for all the relevant publish bindings + * in <b>items</b>. The prevents subsequent dispatch_pub_() calls from + * sending messages to a dispatcher that has been freed. + **/ +void +pubsub_items_clear_bindings(pubsub_items_t *items) +{ + SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) { + if (cfg->pub_binding) { + cfg->pub_binding->dispatch_ptr = NULL; + } + } SMARTLIST_FOREACH_END(cfg); +} + +/** + * Create a new dispatcher as configured in a pubsub_builder_t. + * + * Consumes and frees its input. + **/ +dispatch_t * +pubsub_builder_finalize(pubsub_builder_t *builder, + pubsub_items_t **items_out) +{ + dispatch_t *dispatcher = NULL; + tor_assert_nonfatal(builder->n_connectors == 0); + + if (pubsub_builder_check(builder) < 0) + goto err; + + if (builder->n_errors) { + log_warn(LD_GENERAL, "At least one error occurred previously when " + "configuring the dispatcher."); + goto err; + } + + dispatcher = dispatch_new(builder->cfg); + + if (!dispatcher) + goto err; + + pubsub_items_install_bindings(builder->items, dispatcher); + if (items_out) { + *items_out = builder->items; + builder->items = NULL; /* Prevent free */ + } + + err: + pubsub_builder_free(builder); + return dispatcher; +} diff --git a/src/lib/pubsub/pubsub_build.h b/src/lib/pubsub/pubsub_build.h new file mode 100644 index 0000000000..93aad50b28 --- /dev/null +++ b/src/lib/pubsub/pubsub_build.h @@ -0,0 +1,92 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file pubsub_build.h + * @brief Header used for constructing the OO publish-subscribe facility. + * + * (See pubsub.h for more general information on this API.) + **/ + +#ifndef TOR_PUBSUB_BUILD_H +#define TOR_PUBSUB_BUILD_H + +#include "lib/dispatch/msgtypes.h" + +struct dispatch_t; +struct pubsub_connector_t; + +/** + * A "dispatch builder" is an incomplete dispatcher, used when + * registering messages. It does not have the same integrity guarantees + * as a dispatcher. It cannot actually handle messages itself: once all + * subsystems have registered, it is converted into a dispatch_t. + **/ +typedef struct pubsub_builder_t pubsub_builder_t; + +/** + * A "pubsub items" holds the configuration items used to configure a + * pubsub_builder. After the builder is finalized, this field is extracted, + * and used later to tear down pointers that enable publishing. + **/ +typedef struct pubsub_items_t pubsub_items_t; + +/** + * Create a new pubsub_builder. This should only happen in the + * main-init code. + */ +pubsub_builder_t *pubsub_builder_new(void); + +/** DOCDOC */ +int pubsub_builder_check(pubsub_builder_t *); + +/** + * Free a pubsub builder. This should only happen on error paths, where + * we have decided not to construct a dispatcher for some reason. + */ +#define pubsub_builder_free(db) \ + FREE_AND_NULL(pubsub_builder_t, pubsub_builder_free_, (db)) + +/** Internal implementation of pubsub_builder_free(). */ +void pubsub_builder_free_(pubsub_builder_t *); + +/** + * Create a pubsub connector that a single subsystem will use to + * register its messages. The main-init code does this during susbsystem + * initialization. + */ +struct pubsub_connector_t *pubsub_connector_for_subsystem(pubsub_builder_t *, + subsys_id_t); + +/** + * The main-init code does this after subsystem initialization. + */ +#define pubsub_connector_free(c) \ + FREE_AND_NULL(struct pubsub_connector_t, pubsub_connector_free_, (c)) + +void pubsub_connector_free_(struct pubsub_connector_t *); + +/** + * Constructs a dispatcher from a dispatch_builder, after checking that the + * invariances on the messages, channels, and connections have been + * respected. + * + * This should happen after every subsystem has initialized, and before + * entering the mainloop. + */ +struct dispatch_t *pubsub_builder_finalize(pubsub_builder_t *, + pubsub_items_t **items_out); + +/** + * Clear all pub_binding_t backpointers in <b>items</b>. + **/ +void pubsub_items_clear_bindings(pubsub_items_t *items); + +#define pubsub_items_free(cfg) \ + FREE_AND_NULL(pubsub_items_t, pubsub_items_free_, (cfg)) +void pubsub_items_free_(pubsub_items_t *cfg); + +#endif diff --git a/src/lib/pubsub/pubsub_builder_st.h b/src/lib/pubsub/pubsub_builder_st.h new file mode 100644 index 0000000000..cedeb02b16 --- /dev/null +++ b/src/lib/pubsub/pubsub_builder_st.h @@ -0,0 +1,161 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file pubsub_builder_st.h + * + * @brief private structures used for configuring dispatchers and messages. + */ + +#ifndef TOR_PUBSUB_BUILDER_ST_H +#define TOR_PUBSUB_BUILDER_ST_H + +#ifdef PUBSUB_PRIVATE + +#include <stdbool.h> +#include <stddef.h> + +struct dispatch_cfg_t; +struct smartlist_t; +struct pub_binding_t; + +/** + * Configuration for a single publication or subscription request. + * + * These can be stored while the dispatcher is in use, but are only used for + * setup, teardown, and debugging. + * + * There are various fields in this request describing the message; all of + * them must match other descriptions of the message, or a bug has occurred. + **/ +typedef struct pubsub_cfg_t { + /** True if this is a publishing request; false for a subscribing request. */ + bool is_publish; + /** The system making this request. */ + subsys_id_t subsys; + /** The channel on which the message is to be sent. */ + channel_id_t channel; + /** The message ID to be sent or received. */ + message_id_t msg; + /** The C type associated with the message. */ + msg_type_id_t type; + /** One or more DISP_FLAGS_* items, combined with bitwise OR. */ + unsigned flags; + + /** + * Publishing only: a pub_binding object that will receive the binding for + * this request. We will finish filling this in when the dispatcher is + * constructed, so that the subsystem can publish then and not before. + */ + struct pub_binding_t *pub_binding; + + /** + * Subscribing only: a function to receive message objects for this request. + */ + recv_fn_t recv_fn; + + /** The file from which this message was configured */ + const char *added_by_file; + /** The line at which this message was configured */ + unsigned added_by_line; +} pubsub_cfg_t; + +/** + * Configuration request for a single C type. + * + * These are stored while the dispatcher is in use, but are only used for + * setup, teardown, and debugging. + **/ +typedef struct pubsub_type_cfg_t { + /** + * The identifier for this type. + */ + msg_type_id_t type; + /** + * Functions to use when manipulating the type. + */ + dispatch_typefns_t fns; + + /** The subsystem that configured this type. */ + subsys_id_t subsys; + /** The file from which this type was configured */ + const char *added_by_file; + /** The line at which this type was configured */ + unsigned added_by_line; +} pubsub_type_cfg_t; + +/** + * The set of configuration requests for a dispatcher, as made by various + * subsystems. + **/ +struct pubsub_items_t { + /** List of pubsub_cfg_t. */ + struct smartlist_t *items; + /** List of pubsub_type_cfg_t. */ + struct smartlist_t *type_items; +}; + +/** + * Type used to construct a dispatcher. We use this type to build up the + * configuration for a dispatcher, and then pass ownership of that + * configuration to the newly constructed dispatcher. + **/ +struct pubsub_builder_t { + /** Number of outstanding pubsub_connector_t objects pointing to this + * pubsub_builder_t. */ + int n_connectors; + /** Number of errors encountered while constructing this object so far. */ + int n_errors; + /** In-progress configuration that we're constructing, as a list of the + * requests that have been made. */ + struct pubsub_items_t *items; + /** In-progress configuration that we're constructing, in a form that can + * be converted to a dispatch_t. */ + struct dispatch_cfg_t *cfg; +}; + +/** + * Type given to a subsystem when adding connections to a pubsub_builder_t. + * We use this type to force each subsystem to get blamed for the + * publications, subscriptions, and types that it adds. + **/ +struct pubsub_connector_t { + /** The pubsub_builder that this connector refers to. */ + struct pubsub_builder_t *builder; + /** The subsystem that has been given this connector. */ + subsys_id_t subsys_id; +}; + +/** + * Helper structure used when constructing a dispatcher that sorts the + * pubsub_cfg_t objects in various ways. + **/ +typedef struct pubsub_adjmap_t { + /* XXXX The next three fields are currently constructed but not yet + * XXXX used. I believe we'll want them in the future, though. -nickm + */ + /** Number of subsystems; length of the *_by_subsys arrays. */ + size_t n_subsystems; + /** Array of lists of publisher pubsub_cfg_t objects, indexed by + * subsystem. */ + struct smartlist_t **pub_by_subsys; + /** Array of lists of subscriber pubsub_cfg_t objects, indexed by + * subsystem. */ + struct smartlist_t **sub_by_subsys; + + /** Number of message IDs; length of the *_by_msg arrays. */ + size_t n_msgs; + /** Array of lists of publisher pubsub_cfg_t objects, indexed by + * message ID. */ + struct smartlist_t **pub_by_msg; + /** Array of lists of subscriber pubsub_cfg_t objects, indexed by + * message ID. */ + struct smartlist_t **sub_by_msg; +} pubsub_adjmap_t; + +#endif + +#endif diff --git a/src/lib/pubsub/pubsub_check.c b/src/lib/pubsub/pubsub_check.c new file mode 100644 index 0000000000..d308dc58aa --- /dev/null +++ b/src/lib/pubsub/pubsub_check.c @@ -0,0 +1,428 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file pubsub_check.c + * @brief Enforce various requirements on a pubsub_builder. + **/ + +#define PUBSUB_PRIVATE + +#include "lib/dispatch/dispatch_naming.h" +#include "lib/dispatch/msgtypes.h" +#include "lib/pubsub/pubsub_flags.h" +#include "lib/pubsub/pubsub_builder_st.h" +#include "lib/pubsub/pubsub_build.h" + +#include "lib/container/bitarray.h" +#include "lib/container/smartlist.h" +#include "lib/log/util_bug.h" +#include "lib/malloc/malloc.h" +#include "lib/string/compat_string.h" + +#include <string.h> + +static void pubsub_adjmap_add(pubsub_adjmap_t *map, + const pubsub_cfg_t *item); + +/** + * Helper: contruct and return a new pubsub_adjacency_map from <b>cfg</b>. + * Return NULL on error. + **/ +static pubsub_adjmap_t * +pubsub_build_adjacency_map(const pubsub_items_t *cfg) +{ + pubsub_adjmap_t *map = tor_malloc_zero(sizeof(*map)); + const size_t n_subsystems = get_num_subsys_ids(); + const size_t n_msgs = get_num_message_ids(); + + map->n_subsystems = n_subsystems; + map->n_msgs = n_msgs; + + map->pub_by_subsys = tor_calloc(n_subsystems, sizeof(smartlist_t*)); + map->sub_by_subsys = tor_calloc(n_subsystems, sizeof(smartlist_t*)); + map->pub_by_msg = tor_calloc(n_msgs, sizeof(smartlist_t*)); + map->sub_by_msg = tor_calloc(n_msgs, sizeof(smartlist_t*)); + + SMARTLIST_FOREACH_BEGIN(cfg->items, const pubsub_cfg_t *, item) { + pubsub_adjmap_add(map, item); + } SMARTLIST_FOREACH_END(item); + + return map; +} + +/** + * Helper: add a single pubsub_cfg_t to an adjacency map. + **/ +static void +pubsub_adjmap_add(pubsub_adjmap_t *map, + const pubsub_cfg_t *item) +{ + smartlist_t **by_subsys; + smartlist_t **by_msg; + + tor_assert(item->subsys < map->n_subsystems); + tor_assert(item->msg < map->n_msgs); + + if (item->is_publish) { + by_subsys = &map->pub_by_subsys[item->subsys]; + by_msg = &map->pub_by_msg[item->msg]; + } else { + by_subsys = &map->sub_by_subsys[item->subsys]; + by_msg = &map->sub_by_msg[item->msg]; + } + + if (! *by_subsys) + *by_subsys = smartlist_new(); + if (! *by_msg) + *by_msg = smartlist_new(); + smartlist_add(*by_subsys, (void*) item); + smartlist_add(*by_msg, (void *) item); +} + +/** + * Release all storage held by m and set m to NULL. + **/ +#define pubsub_adjmap_free(m) \ + FREE_AND_NULL(pubsub_adjmap_t, pubsub_adjmap_free_, m) + +/** + * Free every element of an <b>n</b>-element array of smartlists, then + * free the array itself. + **/ +static void +pubsub_adjmap_free_helper(smartlist_t **lsts, size_t n) +{ + if (!lsts) + return; + + for (unsigned i = 0; i < n; ++i) { + smartlist_free(lsts[i]); + } + tor_free(lsts); +} + +/** + * Release all storage held by <b>map</b>. + **/ +static void +pubsub_adjmap_free_(pubsub_adjmap_t *map) +{ + if (!map) + return; + pubsub_adjmap_free_helper(map->pub_by_subsys, map->n_subsystems); + pubsub_adjmap_free_helper(map->sub_by_subsys, map->n_subsystems); + pubsub_adjmap_free_helper(map->pub_by_msg, map->n_msgs); + pubsub_adjmap_free_helper(map->sub_by_msg, map->n_msgs); + tor_free(map); +} + +/** + * Helper: return the length of <b>sl</b>, or 0 if sl is NULL. + **/ +static int +smartlist_len_opt(const smartlist_t *sl) +{ + if (sl) + return smartlist_len(sl); + else + return 0; +} + +/** Return a pointer to a statically allocated string encoding the + * dispatcher flags in <b>flags</b>. */ +static const char * +format_flags(unsigned flags) +{ + static char buf[32]; + buf[0] = 0; + if (flags & DISP_FLAG_EXCL) { + strlcat(buf, " EXCL", sizeof(buf)); + } + if (flags & DISP_FLAG_STUB) { + strlcat(buf, " STUB", sizeof(buf)); + } + return buf[0] ? buf+1 : buf; +} + +/** + * Log a message containing a description of <b>cfg</b> at severity, prefixed + * by the string <b>prefix</b>. + */ +static void +pubsub_cfg_dump(const pubsub_cfg_t *cfg, int severity, const char *prefix) +{ + tor_assert(prefix); + + tor_log(severity, LD_MESG, + "%s%s %s: %s{%s} on %s (%s) <%u %u %u %u %x> [%s:%d]", + prefix, + get_subsys_id_name(cfg->subsys), + cfg->is_publish ? "PUB" : "SUB", + get_message_id_name(cfg->msg), + get_msg_type_id_name(cfg->type), + get_channel_id_name(cfg->channel), + format_flags(cfg->flags), + cfg->subsys, cfg->msg, cfg->type, cfg->channel, cfg->flags, + cfg->added_by_file, cfg->added_by_line); +} + +/** + * Helper: fill a bitarray <b>out</b> with entries corresponding to the + * subsystems listed in <b>items</b>. If any subsystem is listed more than + * once, log a warning. Return 0 on success, -1 on failure. + **/ +static int +get_message_bitarray(const pubsub_adjmap_t *map, + message_id_t msg, + const smartlist_t *items, + const char *operation, + bitarray_t **out) +{ + bool ok = true; + *out = bitarray_init_zero((unsigned)map->n_subsystems); + if (! items) + return 0; + + SMARTLIST_FOREACH_BEGIN(items, const pubsub_cfg_t *, cfg) { + if (bitarray_is_set(*out, cfg->subsys)) { + log_warn(LD_MESG|LD_BUG, + "Message \"%s\" is configured to be %s by subsystem " + "\"%s\" more than once.", + get_message_id_name(msg), operation, + get_subsys_id_name(cfg->subsys)); + ok = false; + } + bitarray_set(*out, cfg->subsys); + } SMARTLIST_FOREACH_END(cfg); + + return ok ? 0 : -1; +} + +/** + * Helper for lint_message: check that all the pubsub_cfg_t items in the two + * respective smartlists obey our local graph topology rules. + * + * (Right now this is just a matter of "each subsystem only + * publishes/subscribes once; no subsystem is a publisher and subscriber for + * the same message.") + * + * Return 0 on success, -1 on failure. + **/ +static int +lint_message_graph(const pubsub_adjmap_t *map, + message_id_t msg, + const smartlist_t *pub, + const smartlist_t *sub) +{ + bitarray_t *published_by = NULL; + bitarray_t *subscribed_by = NULL; + bool ok = true; + + if (get_message_bitarray(map, msg, pub, "published", &published_by) < 0) + ok = false; + if (get_message_bitarray(map, msg, sub, "subscribed", &subscribed_by) < 0) + ok = false; + + /* Check whether any subsystem is publishing and subscribing the same + * message. [??] + */ + for (unsigned i = 0; i < map->n_subsystems; ++i) { + if (bitarray_is_set(published_by, i) && + bitarray_is_set(subscribed_by, i)) { + log_warn(LD_MESG|LD_BUG, + "Message \"%s\" is published and subscribed by the same " + "subsystem \"%s\".", + get_message_id_name(msg), + get_subsys_id_name(i)); + ok = false; + } + } + + bitarray_free(published_by); + bitarray_free(subscribed_by); + + return ok ? 0 : -1; +} + +/** + * Helper for lint_message: check that all the pubsub_cfg_t items in the two + * respective smartlists have compatible flags, channels, and types. + **/ +static int +lint_message_consistency(message_id_t msg, + const smartlist_t *pub, + const smartlist_t *sub) +{ + if (!smartlist_len_opt(pub) && !smartlist_len_opt(sub)) + return 0; // LCOV_EXCL_LINE -- this was already checked. + + /* The 'all' list has the publishers and the subscribers. */ + smartlist_t *all = smartlist_new(); + if (pub) + smartlist_add_all(all, pub); + if (sub) + smartlist_add_all(all, sub); + + const pubsub_cfg_t *item0 = smartlist_get(all, 0); + + /* Indicates which subsystems we've found publishing/subscribing here. */ + bool pub_excl = false, sub_excl = false, chan_same = true, type_same = true; + + /* Simple message consistency properties across messages. + */ + SMARTLIST_FOREACH_BEGIN(all, const pubsub_cfg_t *, cfg) { + chan_same &= (cfg->channel == item0->channel); + type_same &= (cfg->type == item0->type); + if (cfg->is_publish) + pub_excl |= (cfg->flags & DISP_FLAG_EXCL) != 0; + else + sub_excl |= (cfg->flags & DISP_FLAG_EXCL) != 0; + } SMARTLIST_FOREACH_END(cfg); + + bool ok = true; + + if (! chan_same) { + log_warn(LD_MESG|LD_BUG, + "Message \"%s\" is associated with multiple inconsistent " + "channels.", + get_message_id_name(msg)); + ok = false; + } + if (! type_same) { + log_warn(LD_MESG|LD_BUG, + "Message \"%s\" is associated with multiple inconsistent " + "message types.", + get_message_id_name(msg)); + ok = false; + } + + /* Enforce exclusive-ness for publishers and subscribers that have asked for + * it. + */ + if (pub_excl && smartlist_len(pub) > 1) { + log_warn(LD_MESG|LD_BUG, + "Message \"%s\" has multiple publishers, but at least one is " + "marked as exclusive.", + get_message_id_name(msg)); + ok = false; + } + if (sub_excl && smartlist_len(sub) > 1) { + log_warn(LD_MESG|LD_BUG, + "Message \"%s\" has multiple subscribers, but at least one is " + "marked as exclusive.", + get_message_id_name(msg)); + ok = false; + } + + smartlist_free(all); + + return ok ? 0 : -1; +} + +/** + * Check whether there are any errors or inconsistencies for the message + * described by <b>msg</b> in <b>map</b>. If there are problems, log about + * them, and return -1. Otherwise return 0. + **/ +static int +lint_message(const pubsub_adjmap_t *map, message_id_t msg) +{ + /* NOTE: Some of the checks in this function are maybe over-zealous, and we + * might not want to have them forever. I've marked them with [?] below. + */ + if (BUG(msg >= map->n_msgs)) + return 0; // LCOV_EXCL_LINE + + const smartlist_t *pub = map->pub_by_msg[msg]; + const smartlist_t *sub = map->sub_by_msg[msg]; + + const size_t n_pub = smartlist_len_opt(pub); + const size_t n_sub = smartlist_len_opt(sub); + + if (n_pub == 0 && n_sub == 0) { + log_info(LD_MESG, "Nobody is publishing or subscribing to message " + "\"%s\".", + get_message_id_name(msg)); + return 0; // No publishers or subscribers: nothing to do. + } + /* We'll set this to false if there are any problems. */ + bool ok = true; + + /* First make sure that if there are publishers, there are subscribers. */ + if (n_pub == 0) { + log_warn(LD_MESG|LD_BUG, + "Message \"%s\" has subscribers, but no publishers.", + get_message_id_name(msg)); + ok = false; + } else if (n_sub == 0) { + log_warn(LD_MESG|LD_BUG, + "Message \"%s\" has publishers, but no subscribers.", + get_message_id_name(msg)); + ok = false; + } + + /* Check the message graph topology. */ + if (lint_message_graph(map, msg, pub, sub) < 0) + ok = false; + + /* Check whether the messages have the same fields set on them. */ + if (lint_message_consistency(msg, pub, sub) < 0) + ok = false; + + if (!ok) { + /* There was a problem -- let's log all the publishers and subscribers on + * this message */ + if (pub) { + SMARTLIST_FOREACH(pub, pubsub_cfg_t *, cfg, + pubsub_cfg_dump(cfg, LOG_WARN, " ")); + } + if (sub) { + SMARTLIST_FOREACH(sub, pubsub_cfg_t *, cfg, + pubsub_cfg_dump(cfg, LOG_WARN, " ")); + } + } + + return ok ? 0 : -1; +} + +/** + * Check all the messages in <b>map</b> for consistency. Return 0 on success, + * -1 on problems. + **/ +static int +pubsub_adjmap_check(const pubsub_adjmap_t *map) +{ + bool all_ok = true; + for (unsigned i = 0; i < map->n_msgs; ++i) { + if (lint_message(map, i) < 0) { + all_ok = false; + } + } + return all_ok ? 0 : -1; +} + +/** + * Check builder for consistency and various constraints. Return 0 on success, + * -1 on failure. + **/ +int +pubsub_builder_check(pubsub_builder_t *builder) +{ + pubsub_adjmap_t *map = pubsub_build_adjacency_map(builder->items); + int rv = -1; + + if (!map) + goto err; // should be impossible + + if (pubsub_adjmap_check(map) < 0) + goto err; + + rv = 0; + err: + pubsub_adjmap_free(map); + return rv; +} diff --git a/src/lib/pubsub/pubsub_connect.h b/src/lib/pubsub/pubsub_connect.h new file mode 100644 index 0000000000..bdcb33d2f5 --- /dev/null +++ b/src/lib/pubsub/pubsub_connect.h @@ -0,0 +1,54 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file pubsub_connect.h + * @brief Header for functions that add relationships to a pubsub builder. + * + * These functions are used by modules that need to add publication and + * subscription requests. Most users will want to call these functions + * indirectly, via the macros in pubsub_macros.h. + **/ + +#ifndef TOR_PUBSUB_CONNECT_H +#define TOR_PUBSUB_CONNECT_H + +#include "lib/dispatch/msgtypes.h" + +struct pub_binding_t; +/** + * A "dispatch connector" is a view of the dispatcher that a subsystem + * uses while initializing itself. It is specific to the subsystem, and + * ensures that each subsystem doesn't need to identify itself + * repeatedly while registering its messages. + **/ +typedef struct pubsub_connector_t pubsub_connector_t; + +int pubsub_add_pub_(struct pubsub_connector_t *con, + struct pub_binding_t *out, + channel_id_t channel, + message_id_t msg, + msg_type_id_t type, + unsigned flags, + const char *file, + unsigned line); + +int pubsub_add_sub_(struct pubsub_connector_t *con, + recv_fn_t recv_fn, + channel_id_t channel, + message_id_t msg, + msg_type_id_t type, + unsigned flags, + const char *file, + unsigned line); + +int pubsub_connector_register_type_(struct pubsub_connector_t *, + msg_type_id_t, + dispatch_typefns_t *, + const char *file, + unsigned line); + +#endif diff --git a/src/lib/pubsub/pubsub_flags.h b/src/lib/pubsub/pubsub_flags.h new file mode 100644 index 0000000000..d07a06be7b --- /dev/null +++ b/src/lib/pubsub/pubsub_flags.h @@ -0,0 +1,32 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file pubsub_flags.h + * @brief Flags that can be set on publish/subscribe messages. + **/ + +#ifndef TOR_PUBSUB_FLAGS_H +#define TOR_PUBSUB_FLAGS_H + +/** + * Flag for registering a message: declare that no other module is allowed to + * publish this message if we are publishing it, or subscribe to it if we are + * subscribing to it. + */ +#define DISP_FLAG_EXCL (1u<<0) + +/** + * Flag for registering a message: declare that this message is a stub, and we + * will not actually publish/subscribe it, but that the dispatcher should + * treat us as if we did when typechecking. + * + * We use this so that messages aren't treated as "dangling" if they are + * potentially used by some other build of Tor. + */ +#define DISP_FLAG_STUB (1u<<1) + +#endif diff --git a/src/lib/pubsub/pubsub_macros.h b/src/lib/pubsub/pubsub_macros.h new file mode 100644 index 0000000000..d091e40dfa --- /dev/null +++ b/src/lib/pubsub/pubsub_macros.h @@ -0,0 +1,373 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file pubsub_macros.h + * \brief Macros to help with the publish/subscribe dispatch API. + * + * The dispatch API allows different subsystems of Tor to communicate with + * another asynchronously via a shared "message" system. Some subsystems + * declare that they publish a given message, and others declare that they + * subscribe to it. Both subsystems depend on the message, but not upon one + * another. + * + * To declare a message, use DECLARE_MESSAGE() (for messages that take their + * data as a pointer) or DECLARE_MESSAGE_INT() (for messages that take their + * data as an integer. For example, you might say + * + * DECLARE_MESSAGE(new_circuit, circ, circuit_handle_t *); + * or + * DECLARE_MESSAGE_INT(shutdown_requested, boolean, bool); + * + * Every message has a unique name, a "type name" that the dispatch system + * uses to manage associated data, and a C type name. You'll want to put + * these declarations in a header, to be included by all publishers and all + * subscribers. + * + * When a subsystem wants to publish a message, it uses DECLARE_PUBLISH() at + * file scope to create necessary static functions. Then, in its subsystem + * initialization (in the "bind to dispatcher" callback) (TODO: name this + * properly!), it calls DISPATCH_ADD_PUB() to tell the dispatcher about its + * intent to publish. When it actually wants to publish, it uses the + * PUBLISH() macro. For example: + * + * // At file scope + * DECLARE_PUBLISH(shutdown_requested); + * + * static void bind_to_dispatcher(pubsub_connector_t *con) + * { + * DISPATCH_ADD_PUB(con, mainchannel, shutdown_requested); + * } + * + * // somewhere in a function + * { + * PUBLISH(shutdown_requested, true); + * } + * + * When a subsystem wants to subscribe to a message, it uses + * DECLARE_SUBSCRIBE() at file scope to declare static functions. It must + * declare a hook function that receives the message type. Then, in its "bind + * to dispatcher" function, it calls DISPATCHER_ADD_SUB() to tell the + * dispatcher about its intent to subscribe. When another module publishes + * the message, the dispatcher will call the provided hook function. + * + * // At file scope. The first argument is the message that you're + * // subscribing to; the second argument is the hook function to declare. + * DECLARE_SUBSCRIBE(shutdown_requested, on_shutdown_req_cb); + * + * // You need to declare this function. + * static void on_shutdown_req_cb(const msg_t *msg, + * bool value) + * { + * // (do something here.) + * } + * + * static void bind_to_dispatcher(pubsub_connector_t *con) + * { + * DISPATCH_ADD_SUB(con, mainchannel, shutdown_requested); + * } + * + * Where did these types come from? Somewhere in the code, you need to call + * DISPATCH_REGISTER_TYPE() to make sure that the dispatcher can manage the + * message auxiliary data. It associates a vtbl-like structure with the + * type name, so that the dispatcher knows how to manipulate the type you're + * giving it. + * + * For example, the "boolean" type we're using above could be defined as: + * + * static char *boolean_fmt(msg_aux_data_t d) + * { + * // This is used for debugging and dumping messages. + * if (d.u64) + * return tor_strdup("true"); + * else + * return tor_strdup("false"); + * } + * + * static void boolean_free(msg_aux_data_t d) + * { + * // We don't actually need to do anything to free a boolean. + * // We could use "NULL" instead of this function, but I'm including + * // it as an example. + * } + * + * static void bind_to_dispatcher(pubsub_connector_t *con) + * { + * dispatch_typefns_t boolean_fns = { + * .fmt_fn = boolean_fmt, + * .free_fn = boolean_free, + * }; + * DISPATCH_REGISTER_TYPE(con, boolean, &boolean_fns); + * } + * + * + * + * So, how does this all work? (You can stop reading here, unless you're + * debugging something.) + * + * When you declare a message in a header with DECLARE_MESSAGE() or + * DECLARE_MESSAGE_INT(), it creates five things: + * + * * two typedefs for the message argument (constant and non-constant + * variants). + * * a constant string to hold the declared message type name + * * two inline functions, to coerce the message argument type to and from + * a "msg_aux_data_t" union. + * + * All of these declarations have names based on the message name. + * + * Later, when you say DECLARE_PUBLISH() or DECLARE_SUBSCRIBE(), we use the + * elements defined by DECLARE_MESSAGE() to make sure that the publish + * function takes the correct argument type, and that the subscription hook is + * declared with the right argument type. + **/ + +#ifndef TOR_DISPATCH_MSG_H +#define TOR_DISPATCH_MSG_H + +#include "lib/cc/compat_compiler.h" +#include "lib/dispatch/dispatch_naming.h" +#include "lib/pubsub/pub_binding_st.h" +#include "lib/pubsub/pubsub_connect.h" +#include "lib/pubsub/pubsub_flags.h" +#include "lib/pubsub/pubsub_publish.h" + +/* Implemenation notes: + * + * For a messagename "foo", the DECLARE_MESSAGE*() macros must declare: + * + * msg_arg_type__foo -- a typedef for the argument type of the foo message. + * msg_arg_consttype__foo -- a typedef for the const argument type of the + * foo message. + * msg_arg_name__foo[] -- a static string constant holding the unique + * identifier for the type of the foo message. + * msg_arg_get__foo() -- an inline function taking a msg_aux_data_t and + * returning the C data type. + * msg_arg_set__foo() -- an inline function taking a msg_aux_data_t and + * the C type, setting the msg_aux_data_t to hold the C type. + * + * For a messagename "foo", the DECLARE_PUBLISH() macro must declare: + * + * pub_binding__foo -- A static pub_binding_t object used to send messages + * from this module. + * publish_fn__foo -- A function taking an argument of the appropriate + * C type, to be invoked by PUBLISH(). + * + * For a messagename "foo", the DECLARE_SUBSCRIBE() macro must declare: + * + * hookfn -- A user-provided function name, with the correct signature. + * recv_fn__foo -- A wrapper callback that takes a msg_t *, and calls + * hookfn with the appropriate arguments. + */ + +/* Macro to declare common elements shared by DECLARE_MESSAGE and + * DECLARE_MESSAGE_INT. Don't call this directly. + * + * Note that the "msg_arg_name" string constant is defined in each + * translation unit. This might be undesirable; we can tweak it in the + * future if need be. + */ +#define DECLARE_MESSAGE_COMMON__(messagename, typename, c_type) \ + typedef c_type msg_arg_type__ ##messagename; \ + typedef const c_type msg_arg_consttype__ ##messagename; \ + ATTR_UNUSED static const char msg_arg_name__ ##messagename[] = # typename; + +/** + * Use this macro in a header to declare the existence of a given message, + * taking a pointer as auxiliary data. + * + * "messagename" is a unique identifier for the message; it must be a valid + * C identifier. + * + * "typename" is a unique identifier for the type of the auxiliary data. + * It needs to be defined somewhere in Tor, using + * "DISPATCH_REGISTER_TYPE." + * + * "c_ptr_type" is a C pointer type (like "char *" or "struct foo *"). + * The "*" needs to be included. + */ +#define DECLARE_MESSAGE(messagename, typename, c_ptr_type) \ + DECLARE_MESSAGE_COMMON__(messagename, typename, c_ptr_type) \ + ATTR_UNUSED static inline c_ptr_type \ + msg_arg_get__ ##messagename(msg_aux_data_t m) \ + { \ + return m.ptr; \ + } \ + ATTR_UNUSED static inline void \ + msg_arg_set__ ##messagename(msg_aux_data_t *m, c_ptr_type v) \ + { \ + m->ptr = v; \ + } \ + EAT_SEMICOLON + +/** + * Use this macro in a header to declare the existence of a given message, + * taking an integer as auxiliary data. + * + * "messagename" is a unique identifier for the message; it must be a valid + * C identifier. + * + * "typename" is a unique identifier for the type of the auxiliary data. It + * needs to be defined somewhere in Tor, using "DISPATCH_REGISTER_TYPE." + * + * "c_type" is a C integer type, like "int" or "bool". It needs to fit inside + * a uint64_t. + */ +#define DECLARE_MESSAGE_INT(messagename, typename, c_type) \ + DECLARE_MESSAGE_COMMON__(messagename, typename, c_type) \ + ATTR_UNUSED static inline c_type \ + msg_arg_get__ ##messagename(msg_aux_data_t m) \ + { \ + return (c_type)m.u64; \ + } \ + ATTR_UNUSED static inline void \ + msg_arg_set__ ##messagename(msg_aux_data_t *m, c_type v) \ + { \ + m->u64 = (uint64_t)v; \ + } \ + EAT_SEMICOLON + +/** + * Use this macro inside a C module declare that we'll be publishing a given + * message type from within this module. + * + * It creates necessary functions and wrappers to publish a message whose + * unique identifier is "messagename". + * + * Before you use this, you need to include the header where DECLARE_MESSAGE*() + * was used for this message. + * + * You can only use this once per message in each subsystem. + */ +#define DECLARE_PUBLISH(messagename) \ + static pub_binding_t pub_binding__ ##messagename; \ + static void \ + publish_fn__ ##messagename(msg_arg_type__ ##messagename arg) \ + { \ + msg_aux_data_t data; \ + msg_arg_set__ ##messagename(&data, arg); \ + pubsub_pub_(&pub_binding__ ##messagename, data); \ + } \ + EAT_SEMICOLON + +/** + * Use this macro inside a C file to declare that we're subscribing to a + * given message and associating it with a given "hook function". It + * declares the hook function static, and helps with strong typing. + * + * Before you use this, you need to include the header where + * DECLARE_MESSAGE*() was used for the message whose unique identifier is + * "messagename". + * + * You will need to define a function with the name that you provide for + * "hookfn". The type of this function will be: + * static void hookfn(const msg_t *, const c_type) + * where c_type is the c type that you declared in the header. + * + * You can only use this once per message in each subsystem. + */ +#define DECLARE_SUBSCRIBE(messagename, hookfn) \ + static void hookfn(const msg_t *, \ + const msg_arg_consttype__ ##messagename); \ + static void recv_fn__ ## messagename(const msg_t *m) \ + { \ + msg_arg_type__ ## messagename arg; \ + arg = msg_arg_get__ ##messagename(m->aux_data__); \ + hookfn(m, arg); \ + } \ + EAT_SEMICOLON + +/** + * Add a fake use of the publish function for 'messagename', so that + * the compiler does not call it unused. + */ +#define DISPATCH__FAKE_USE_OF_PUBFN_(messagename) \ + ( 0 ? (publish_fn__ ##messagename((msg_arg_type__##messagename)0), 1) \ + : 1) + +/* + * This macro is for internal use. It backs DISPATCH_ADD_PUB*() + */ +#define DISPATCH_ADD_PUB_(connector, channel, messagename, flags) \ + ( \ + DISPATCH__FAKE_USE_OF_PUBFN_(messagename), \ + pubsub_add_pub_((connector), \ + &pub_binding__ ##messagename, \ + get_channel_id(# channel), \ + get_message_id(# messagename), \ + get_msg_type_id(msg_arg_name__ ## messagename), \ + (flags), \ + __FILE__, \ + __LINE__) \ + ) + +/** + * Use a given connector and channel name to declare that this subsystem will + * publish a given message type. + * + * Call this macro from within the add_subscriptions() function of a module. + */ +#define DISPATCH_ADD_PUB(connector, channel, messagename) \ + DISPATCH_ADD_PUB_(connector, channel, messagename, 0) + +/** + * Use a given connector and channel name to declare that this subsystem will + * publish a given message type, and that no other subsystem is allowed to. + * + * Call this macro from within the add_subscriptions() function of a module. + */ +#define DISPATCH_ADD_PUB_EXCL(connector, channel, messagename) \ + DISPATCH_ADD_PUB_(connector, channel, messagename, DISP_FLAG_EXCL) + +/* + * This macro is for internal use. It backs DISPATCH_ADD_SUB*() + */ +#define DISPATCH_ADD_SUB_(connector, channel, messagename, flags) \ + pubsub_add_sub_((connector), \ + recv_fn__ ##messagename, \ + get_channel_id(#channel), \ + get_message_id(# messagename), \ + get_msg_type_id(msg_arg_name__ ##messagename), \ + (flags), \ + __FILE__, \ + __LINE__) +/* + * Use a given connector and channel name to declare that this subsystem will + * receive a given message type. + * + * Call this macro from within the add_subscriptions() function of a module. + */ +#define DISPATCH_ADD_SUB(connector, channel, messagename) \ + DISPATCH_ADD_SUB_(connector, channel, messagename, 0) +/** + * Use a given connector and channel name to declare that this subsystem will + * receive a given message type, and that no other subsystem is allowed to do + * so. + * + * Call this macro from within the add_subscriptions() function of a module. + */ +#define DISPATCH_ADD_SUB_EXCL(connector, channel, messagename) \ + DISPATCH_ADD_SUB_(connector, channel, messagename, DISP_FLAG_EXCL) + +/** + * Publish a given message with a given argument. (Takes ownership of the + * argument if it is a pointer.) + */ +#define PUBLISH(messagename, arg) \ + publish_fn__ ##messagename(arg) + +/** + * Use a given connector to declare that the functions to be used to manipuate + * a certain C type. + **/ +#define DISPATCH_REGISTER_TYPE(con, type, fns) \ + pubsub_connector_register_type_((con), \ + get_msg_type_id(#type), \ + (fns), \ + __FILE__, \ + __LINE__) + +#endif diff --git a/src/lib/pubsub/pubsub_publish.c b/src/lib/pubsub/pubsub_publish.c new file mode 100644 index 0000000000..454a335a78 --- /dev/null +++ b/src/lib/pubsub/pubsub_publish.c @@ -0,0 +1,72 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * @file pubsub_publish.c + * @brief Header for functions to publish using a pub_binding_t. + **/ + +#define PUBSUB_PRIVATE +#define DISPATCH_PRIVATE +#include "orconfig.h" + +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_st.h" + +#include "lib/pubsub/pub_binding_st.h" +#include "lib/pubsub/pubsub_publish.h" + +#include "lib/malloc/malloc.h" +#include "lib/log/util_bug.h" + +#include <string.h> + +/** + * Publish a message from the publication binding <b>pub</b> using the + * auxiliary data <b>auxdata</b>. + * + * Return 0 on success, -1 on failure. + **/ +int +pubsub_pub_(const pub_binding_t *pub, msg_aux_data_t auxdata) +{ + dispatch_t *d = pub->dispatch_ptr; + if (BUG(! d)) { + /* Tried to publish a message before the dispatcher was configured. */ + /* (Without a dispatcher, we don't know how to free auxdata.) */ + return -1; + } + + if (BUG(pub->msg_template.type >= d->n_types)) { + /* The type associated with this message is not known to the dispatcher. */ + /* (Without a correct type, we don't know how to free auxdata.) */ + return -1; + } + + if (BUG(pub->msg_template.msg >= d->n_msgs) || + BUG(pub->msg_template.channel >= d->n_queues)) { + /* The message ID or channel ID was out of bounds. */ + // LCOV_EXCL_START + d->typefns[pub->msg_template.type].free_fn(auxdata); + return -1; + // LCOV_EXCL_STOP + } + + if (! d->table[pub->msg_template.msg]) { + /* Fast path: nobody wants this data. */ + + // XXXX Faster path: we could store this in the pub_binding_t. + d->typefns[pub->msg_template.type].free_fn(auxdata); + return 0; + } + + /* Construct the message object */ + msg_t *m = tor_malloc(sizeof(msg_t)); + memcpy(m, &pub->msg_template, sizeof(msg_t)); + m->aux_data__ = auxdata; + + return dispatch_send_msg_unchecked(d, m); +} diff --git a/src/lib/pubsub/pubsub_publish.h b/src/lib/pubsub/pubsub_publish.h new file mode 100644 index 0000000000..0250fd0760 --- /dev/null +++ b/src/lib/pubsub/pubsub_publish.h @@ -0,0 +1,15 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_PUBSUB_PUBLISH_H +#define TOR_PUBSUB_PUBLISH_H + +#include "lib/dispatch/msgtypes.h" +struct pub_binding_t; + +int pubsub_pub_(const struct pub_binding_t *pub, msg_aux_data_t auxdata); + +#endif diff --git a/src/lib/smartlist_core/smartlist_core.c b/src/lib/smartlist_core/smartlist_core.c index ac85a6cc84..5947e76271 100644 --- a/src/lib/smartlist_core/smartlist_core.c +++ b/src/lib/smartlist_core/smartlist_core.c @@ -88,6 +88,30 @@ smartlist_ensure_capacity(smartlist_t *sl, size_t size) #undef MAX_CAPACITY } +/** Expand <b>sl</b> so that its length is at least <b>new_size</b>, + * filling in previously unused entries with NULL> + * + * Do nothing if <b>sl</b> already had at least <b>new_size</b> elements. + */ +void +smartlist_grow(smartlist_t *sl, size_t new_size) +{ + smartlist_ensure_capacity(sl, new_size); + + if (new_size > (size_t)sl->num_used) { + /* This memset() should be a no-op: everything else in the smartlist code + * tries to make sure that unused entries are always NULL. Still, that is + * meant as a safety mechanism, so let's clear the memory here. + */ + memset(sl->list + sl->num_used, 0, + sizeof(void *) * (new_size - sl->num_used)); + + /* This cast is safe, since we already asserted that we were below + * MAX_CAPACITY in smartlist_ensure_capacity(). */ + sl->num_used = (int)new_size; + } +} + /** Append element to the end of the list. */ void smartlist_add(smartlist_t *sl, void *element) diff --git a/src/lib/smartlist_core/smartlist_core.h b/src/lib/smartlist_core/smartlist_core.h index a7fbaa099b..a1a195f312 100644 --- a/src/lib/smartlist_core/smartlist_core.h +++ b/src/lib/smartlist_core/smartlist_core.h @@ -43,6 +43,7 @@ void smartlist_clear(smartlist_t *sl); void smartlist_add(smartlist_t *sl, void *element); void smartlist_add_all(smartlist_t *sl, const smartlist_t *s2); void smartlist_add_strdup(struct smartlist_t *sl, const char *string); +void smartlist_grow(smartlist_t *sl, size_t new_size); void smartlist_remove(smartlist_t *sl, const void *element); void smartlist_remove_keeporder(smartlist_t *sl, const void *element); diff --git a/src/lib/subsys/subsys.h b/src/lib/subsys/subsys.h index 241ad7829c..d78bb4a602 100644 --- a/src/lib/subsys/subsys.h +++ b/src/lib/subsys/subsys.h @@ -8,7 +8,7 @@ #include <stdbool.h> -struct dispatch_connector_t; +struct pubsub_connector_t; /** * A subsystem is a part of Tor that is initialized, shut down, configured, @@ -58,7 +58,7 @@ typedef struct subsys_fns_t { /** * Connect a subsystem to the message dispatch system. **/ - int (*add_pubsub)(struct dispatch_connector_t *); + int (*add_pubsub)(struct pubsub_connector_t *); /** * Perform any necessary pre-fork cleanup. This function may not fail. diff --git a/src/test/include.am b/src/test/include.am index 700107d6ce..497aa320a4 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -126,6 +126,7 @@ src_test_test_SOURCES += \ src/test/test_dir.c \ src/test/test_dir_common.c \ src/test/test_dir_handle_get.c \ + src/test/test_dispatch.c \ src/test/test_dos.c \ src/test/test_entryconn.c \ src/test/test_entrynodes.c \ @@ -150,6 +151,7 @@ src_test_test_SOURCES += \ src/test/test_logging.c \ src/test/test_mainloop.c \ src/test/test_microdesc.c \ + src/test/test_namemap.c \ src/test/test_netinfo.c \ src/test/test_nodelist.c \ src/test/test_oom.c \ @@ -165,6 +167,8 @@ src_test_test_SOURCES += \ src/test/test_proto_misc.c \ src/test/test_protover.c \ src/test/test_pt.c \ + src/test/test_pubsub_build.c \ + src/test/test_pubsub_msg.c \ src/test/test_relay.c \ src/test/test_relaycell.c \ src/test/test_relaycrypt.c \ diff --git a/src/test/test.c b/src/test/test.c index 25e9da5591..fbc30fb64e 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -857,6 +857,7 @@ struct testgroup_t testgroups[] = { { "consdiff/", consdiff_tests }, { "consdiffmgr/", consdiffmgr_tests }, { "container/", container_tests }, + { "container/namemap/", namemap_tests }, { "control/", controller_tests }, { "control/btrack/", btrack_tests }, { "control/event/", controller_event_tests }, @@ -872,6 +873,7 @@ struct testgroup_t testgroups[] = { { "dir/voting/flags/", voting_flags_tests }, { "dir/voting/schedule/", voting_schedule_tests }, { "dir_handle_get/", dir_handle_get_tests }, + { "dispatch/", dispatch_tests, }, { "dns/", dns_tests }, { "dos/", dos_tests }, { "entryconn/", entryconn_tests }, @@ -909,6 +911,8 @@ struct testgroup_t testgroups[] = { { "proto/misc/", proto_misc_tests }, { "protover/", protover_tests }, { "pt/", pt_tests }, + { "pubsub/build/", pubsub_build_tests }, + { "pubsub/msg/", pubsub_msg_tests }, { "relay/" , relay_tests }, { "relaycell/", relaycell_tests }, { "relaycrypt/", relaycrypt_tests }, diff --git a/src/test/test.h b/src/test/test.h index 9d9184e2aa..7d19af9b20 100644 --- a/src/test/test.h +++ b/src/test/test.h @@ -210,6 +210,7 @@ extern struct testcase_t crypto_rng_tests[]; extern struct testcase_t crypto_tests[]; extern struct testcase_t dir_handle_get_tests[]; extern struct testcase_t dir_tests[]; +extern struct testcase_t dispatch_tests[]; extern struct testcase_t dns_tests[]; extern struct testcase_t dos_tests[]; extern struct testcase_t entryconn_tests[]; @@ -235,6 +236,7 @@ extern struct testcase_t link_handshake_tests[]; extern struct testcase_t logging_tests[]; extern struct testcase_t mainloop_tests[]; extern struct testcase_t microdesc_tests[]; +extern struct testcase_t namemap_tests[]; extern struct testcase_t netinfo_tests[]; extern struct testcase_t nodelist_tests[]; extern struct testcase_t oom_tests[]; @@ -252,6 +254,8 @@ extern struct testcase_t proto_http_tests[]; extern struct testcase_t proto_misc_tests[]; extern struct testcase_t protover_tests[]; extern struct testcase_t pt_tests[]; +extern struct testcase_t pubsub_build_tests[]; +extern struct testcase_t pubsub_msg_tests[]; extern struct testcase_t relay_tests[]; extern struct testcase_t relaycell_tests[]; extern struct testcase_t relaycrypt_tests[]; diff --git a/src/test/test_containers.c b/src/test/test_containers.c index a0832f868e..7892a08853 100644 --- a/src/test/test_containers.c +++ b/src/test/test_containers.c @@ -606,6 +606,66 @@ test_container_smartlist_ints_eq(void *arg) smartlist_free(sl2); } +static void +test_container_smartlist_grow(void *arg) +{ + (void)arg; + smartlist_t *sl = smartlist_new(); + int i; + const char *s[] = { "first", "2nd", "3rd" }; + + /* case 1: starting from empty. */ + smartlist_grow(sl, 10); + tt_int_op(10, OP_EQ, smartlist_len(sl)); + for (i = 0; i < 10; ++i) { + tt_ptr_op(smartlist_get(sl, i), OP_EQ, NULL); + } + + /* case 2: starting with a few elements, probably not reallocating. */ + smartlist_free(sl); + sl = smartlist_new(); + smartlist_add(sl, (char*)s[0]); + smartlist_add(sl, (char*)s[1]); + smartlist_add(sl, (char*)s[2]); + smartlist_grow(sl, 5); + tt_int_op(5, OP_EQ, smartlist_len(sl)); + for (i = 0; i < 3; ++i) { + tt_ptr_op(smartlist_get(sl, i), OP_EQ, s[i]); + } + tt_ptr_op(smartlist_get(sl, 3), OP_EQ, NULL); + tt_ptr_op(smartlist_get(sl, 4), OP_EQ, NULL); + + /* case 3: starting with a few elements, but reallocating. */ + smartlist_free(sl); + sl = smartlist_new(); + smartlist_add(sl, (char*)s[0]); + smartlist_add(sl, (char*)s[1]); + smartlist_add(sl, (char*)s[2]); + smartlist_grow(sl, 100); + tt_int_op(100, OP_EQ, smartlist_len(sl)); + for (i = 0; i < 3; ++i) { + tt_ptr_op(smartlist_get(sl, i), OP_EQ, s[i]); + } + for (i = 3; i < 100; ++i) { + tt_ptr_op(smartlist_get(sl, i), OP_EQ, NULL); + } + + /* case 4: shrinking doesn't happen. */ + smartlist_free(sl); + sl = smartlist_new(); + smartlist_add(sl, (char*)s[0]); + smartlist_add(sl, (char*)s[1]); + smartlist_add(sl, (char*)s[2]); + smartlist_grow(sl, 1); + tt_int_op(3, OP_EQ, smartlist_len(sl)); + for (i = 0; i < 3; ++i) { + tt_ptr_op(smartlist_get(sl, i), OP_EQ, s[i]); + } + + done: + smartlist_free(sl); +} + /** Run unit tests for bitarray code */ static void test_container_bitarray(void *arg) @@ -1312,6 +1372,7 @@ struct testcase_t container_tests[] = { CONTAINER_LEGACY(smartlist_pos), CONTAINER(smartlist_remove, 0), CONTAINER(smartlist_ints_eq, 0), + CONTAINER(smartlist_grow, 0), CONTAINER_LEGACY(bitarray), CONTAINER_LEGACY(digestset), CONTAINER_LEGACY(strmap), diff --git a/src/test/test_dispatch.c b/src/test/test_dispatch.c new file mode 100644 index 0000000000..d6fe7e781a --- /dev/null +++ b/src/test/test_dispatch.c @@ -0,0 +1,249 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#define DISPATCH_PRIVATE + +#include "test/test.h" + +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_cfg.h" +#include "lib/dispatch/dispatch_st.h" +#include "lib/dispatch/msgtypes.h" + +#include "lib/log/escape.h" +#include "lib/malloc/malloc.h" +#include "lib/string/printf.h" + +#include <stdio.h> +#include <string.h> + +static dispatch_t *dispatcher_in_use=NULL; + +/* Construct an empty dispatch_t. */ +static void +test_dispatch_empty(void *arg) +{ + (void)arg; + + dispatch_t *d=NULL; + dispatch_cfg_t *cfg=NULL; + + cfg = dcfg_new(); + d = dispatch_new(cfg); + tt_assert(d); + + done: + dispatch_free(d); + dcfg_free(cfg); +} + +static int total_recv1_simple = 0; +static int total_recv2_simple = 0; + +static void +simple_recv1(const msg_t *m) +{ + total_recv1_simple += m->aux_data__.u64; +} + +static char *recv2_received = NULL; + +static void +simple_recv2(const msg_t *m) +{ + tor_free(recv2_received); + recv2_received = dispatch_fmt_msg_data(dispatcher_in_use, m); + + total_recv2_simple += m->aux_data__.u64*10; +} + +/* Construct a dispatch_t with two messages, make sure that they both get + * delivered. */ +static void +test_dispatch_simple(void *arg) +{ + (void)arg; + + dispatch_t *d=NULL; + dispatch_cfg_t *cfg=NULL; + int r; + + cfg = dcfg_new(); + r = dcfg_msg_set_type(cfg,0,0); + r += dcfg_msg_set_chan(cfg,0,0); + r += dcfg_add_recv(cfg,0,1,simple_recv1); + r += dcfg_msg_set_type(cfg,1,0); + r += dcfg_msg_set_chan(cfg,1,0); + r += dcfg_add_recv(cfg,1,1,simple_recv2); + r += dcfg_add_recv(cfg,1,1,simple_recv2); /* second copy */ + tt_int_op(r, OP_EQ, 0); + + d = dispatch_new(cfg); + tt_assert(d); + dispatcher_in_use = d; + + msg_aux_data_t data = {.u64 = 7}; + r = dispatch_send(d, 99, 0, 0, 0, data); + tt_int_op(r, OP_EQ, 0); + tt_int_op(total_recv1_simple, OP_EQ, 0); + + r = dispatch_flush(d, 0, INT_MAX); + tt_int_op(r, OP_EQ, 0); + tt_int_op(total_recv1_simple, OP_EQ, 7); + tt_int_op(total_recv2_simple, OP_EQ, 0); + + total_recv1_simple = 0; + r = dispatch_send(d, 99, 0, 1, 0, data); + tt_int_op(r, OP_EQ, 0); + r = dispatch_flush(d, 0, INT_MAX); + tt_int_op(total_recv1_simple, OP_EQ, 0); + tt_int_op(total_recv2_simple, OP_EQ, 140); + + tt_str_op(recv2_received, OP_EQ, "<>"); // no format function was set. + + done: + dispatch_free(d); + dcfg_free(cfg); + tor_free(recv2_received); +} + +/* Construct a dispatch_t with a message and no reciever; make sure that it + * gets dropped properly. */ +static void +test_dispatch_no_recipient(void *arg) +{ + (void)arg; + + dispatch_t *d=NULL; + dispatch_cfg_t *cfg=NULL; + int r; + + cfg = dcfg_new(); + r = dcfg_msg_set_type(cfg,0,0); + r += dcfg_msg_set_chan(cfg,0,0); + tt_int_op(r, OP_EQ, 0); + + d = dispatch_new(cfg); + tt_assert(d); + dispatcher_in_use = d; + + msg_aux_data_t data = { .u64 = 7}; + r = dispatch_send(d, 99, 0, 0, 0, data); + tt_int_op(r, OP_EQ, 0); + + r = dispatch_flush(d, 0, INT_MAX); + tt_int_op(r, OP_EQ, 0); + + done: + dispatch_free(d); + dcfg_free(cfg); +} + +struct coord { int x; int y; }; +static void +free_coord(msg_aux_data_t d) +{ + tor_free(d.ptr); +} +static char * +fmt_coord(msg_aux_data_t d) +{ + char *v; + struct coord *c = d.ptr; + tor_asprintf(&v, "[%d, %d]", c->x, c->y); + return v; +} +static dispatch_typefns_t coord_fns = { + .fmt_fn = fmt_coord, + .free_fn = free_coord, +}; +static void +alert_run_immediate(dispatch_t *d, channel_id_t ch, void *arg) +{ + (void)arg; + dispatch_flush(d, ch, INT_MAX); +} + +static char *received_data=NULL; + +static void +recv_typed_data(const msg_t *m) +{ + tor_free(received_data); + received_data = dispatch_fmt_msg_data(dispatcher_in_use, m); +} + +static void +test_dispatch_with_types(void *arg) +{ + (void)arg; + + dispatch_t *d=NULL; + dispatch_cfg_t *cfg=NULL; + int r; + + cfg = dcfg_new(); + r = dcfg_msg_set_type(cfg,5,3); + r += dcfg_msg_set_chan(cfg,5,2); + r += dcfg_add_recv(cfg,5,0,recv_typed_data); + r += dcfg_type_set_fns(cfg,3,&coord_fns); + tt_int_op(r, OP_EQ, 0); + + d = dispatch_new(cfg); + tt_assert(d); + dispatcher_in_use = d; + + /* Make this message get run immediately. */ + r = dispatch_set_alert_fn(d, 2, alert_run_immediate, NULL); + tt_int_op(r, OP_EQ, 0); + + struct coord *xy = tor_malloc(sizeof(*xy)); + xy->x = 13; + xy->y = 37; + msg_aux_data_t data = {.ptr = xy}; + r = dispatch_send(d, 99/*sender*/, 2/*channel*/, 5/*msg*/, 3/*type*/, data); + tt_int_op(r, OP_EQ, 0); + tt_str_op(received_data, OP_EQ, "[13, 37]"); + + done: + dispatch_free(d); + dcfg_free(cfg); + tor_free(received_data); + dispatcher_in_use = NULL; +} + +static void +test_dispatch_bad_type_setup(void *arg) +{ + (void)arg; + static dispatch_typefns_t fns; + dispatch_cfg_t *cfg = dcfg_new(); + + tt_int_op(0, OP_EQ, dcfg_type_set_fns(cfg, 7, &coord_fns)); + + fns = coord_fns; + fns.fmt_fn = NULL; + tt_int_op(-1, OP_EQ, dcfg_type_set_fns(cfg, 7, &fns)); + + fns = coord_fns; + fns.free_fn = NULL; + tt_int_op(-1, OP_EQ, dcfg_type_set_fns(cfg, 7, &fns)); + + fns = coord_fns; + tt_int_op(0, OP_EQ, dcfg_type_set_fns(cfg, 7, &fns)); + + done: + dcfg_free(cfg); +} + +#define T(name) \ + { #name, test_dispatch_ ## name, TT_FORK, NULL, NULL } + +struct testcase_t dispatch_tests[] = { + T(empty), + T(simple), + T(no_recipient), + T(with_types), + T(bad_type_setup), + END_OF_TESTCASES +}; diff --git a/src/test/test_namemap.c b/src/test/test_namemap.c new file mode 100644 index 0000000000..df77d4e2de --- /dev/null +++ b/src/test/test_namemap.c @@ -0,0 +1,174 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "test/test.h" + +#include "lib/cc/torint.h" +#include "lib/container/namemap.h" +#include "lib/container/namemap_st.h" +#include "lib/malloc/malloc.h" + +#include <stdio.h> +#include <string.h> + +static void +test_namemap_empty(void *arg) +{ + (void)arg; + + namemap_t m; + namemap_init(&m); + namemap_t m2 = NAMEMAP_INIT(); + + tt_uint_op(0, OP_EQ, namemap_get_size(&m)); + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m, "hello")); + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m, "hello")); + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m, "hello128")); + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m, "")); + tt_uint_op(0, OP_EQ, namemap_get_size(&m)); + + tt_uint_op(0, OP_EQ, namemap_get_size(&m2)); + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m2, "hello")); + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m2, "hello")); + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m2, "hello128")); + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m2, "")); + tt_uint_op(0, OP_EQ, namemap_get_size(&m)); + + done: + namemap_clear(&m); + namemap_clear(&m2); +} + +static void +test_namemap_toolong(void *arg) +{ + (void)arg; + namemap_t m; + char *ok = NULL; + char *toolong = NULL; + namemap_init(&m); + + ok = tor_malloc_zero(MAX_NAMEMAP_NAME_LEN+1); + memset(ok, 'x', MAX_NAMEMAP_NAME_LEN); + + toolong = tor_malloc_zero(MAX_NAMEMAP_NAME_LEN+2); + memset(toolong, 'x', MAX_NAMEMAP_NAME_LEN+1); + + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m, ok)); + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m, toolong)); + unsigned u1 = namemap_get_or_create_id(&m, toolong); + unsigned u2 = namemap_get_or_create_id(&m, ok); + tt_uint_op(u1, OP_EQ, NAMEMAP_ERR); + tt_uint_op(u2, OP_NE, NAMEMAP_ERR); + tt_uint_op(u2, OP_EQ, namemap_get_id(&m, ok)); + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m, toolong)); + + tt_str_op(ok, OP_EQ, namemap_get_name(&m, u2)); + tt_ptr_op(NULL, OP_EQ, namemap_get_name(&m, u1)); + + done: + tor_free(ok); + tor_free(toolong); + namemap_clear(&m); +} + +static void +test_namemap_blackbox(void *arg) +{ + (void)arg; + + namemap_t m1, m2; + namemap_init(&m1); + namemap_init(&m2); + + unsigned u1 = namemap_get_or_create_id(&m1, "hello"); + unsigned u2 = namemap_get_or_create_id(&m1, "world"); + tt_uint_op(u1, OP_NE, NAMEMAP_ERR); + tt_uint_op(u2, OP_NE, NAMEMAP_ERR); + tt_uint_op(u1, OP_NE, u2); + + tt_uint_op(u1, OP_EQ, namemap_get_id(&m1, "hello")); + tt_uint_op(u1, OP_EQ, namemap_get_or_create_id(&m1, "hello")); + tt_uint_op(u2, OP_EQ, namemap_get_id(&m1, "world")); + tt_uint_op(u2, OP_EQ, namemap_get_or_create_id(&m1, "world")); + + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m1, "HELLO")); + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m2, "hello")); + + unsigned u3 = namemap_get_or_create_id(&m2, "hola"); + tt_uint_op(u3, OP_NE, NAMEMAP_ERR); + tt_uint_op(NAMEMAP_ERR, OP_EQ, namemap_get_id(&m1, "hola")); + tt_uint_op(u3, OP_EQ, namemap_get_or_create_id(&m2, "hola")); + tt_uint_op(u3, OP_EQ, namemap_get_id(&m2, "hola")); + + unsigned int u4 = namemap_get_or_create_id(&m1, "hola"); + tt_uint_op(u4, OP_NE, NAMEMAP_ERR); + tt_uint_op(u4, OP_EQ, namemap_get_id(&m1, "hola")); + tt_uint_op(u3, OP_EQ, namemap_get_id(&m2, "hola")); + + tt_str_op("hello", OP_EQ, namemap_get_name(&m1, u1)); + tt_str_op("world", OP_EQ, namemap_get_name(&m1, u2)); + tt_str_op("hola", OP_EQ, namemap_get_name(&m2, u3)); + tt_str_op("hola", OP_EQ, namemap_get_name(&m1, u4)); + + tt_ptr_op(NULL, OP_EQ, namemap_get_name(&m2, u3 + 10)); + + done: + namemap_clear(&m1); + namemap_clear(&m2); +} + +static void +test_namemap_internals(void *arg) +{ + (void)arg; + // This test actually assumes know something about the identity layout. + namemap_t m; + namemap_init(&m); + + tt_uint_op(0, OP_EQ, namemap_get_or_create_id(&m, "that")); + tt_uint_op(0, OP_EQ, namemap_get_or_create_id(&m, "that")); + tt_uint_op(1, OP_EQ, namemap_get_or_create_id(&m, "is")); + tt_uint_op(1, OP_EQ, namemap_get_or_create_id(&m, "is")); + + tt_uint_op(0, OP_EQ, namemap_get_id(&m, "that")); + tt_uint_op(0, OP_EQ, namemap_get_id(&m, "that")); + tt_uint_op(1, OP_EQ, namemap_get_id(&m, "is")); + tt_uint_op(2, OP_EQ, namemap_get_or_create_id(&m, "not")); + tt_uint_op(1, OP_EQ, namemap_get_or_create_id(&m, "is")); + tt_uint_op(2, OP_EQ, namemap_get_or_create_id(&m, "not")); + + done: + namemap_clear(&m); +} + +static void +test_namemap_fmt(void *arg) +{ + (void)arg; + namemap_t m = NAMEMAP_INIT(); + + unsigned a = namemap_get_or_create_id(&m, "greetings"); + unsigned b = namemap_get_or_create_id(&m, "earthlings"); + + tt_str_op(namemap_fmt_name(&m, a), OP_EQ, "greetings"); + tt_str_op(namemap_fmt_name(&m, b), OP_EQ, "earthlings"); + tt_int_op(a, OP_NE, 100); + tt_int_op(b, OP_NE, 100); + tt_str_op(namemap_fmt_name(&m, 100), OP_EQ, "{100}"); + + done: + namemap_clear(&m); +} + +#define T(name) \ + { #name, test_namemap_ ## name , 0, NULL, NULL } + +struct testcase_t namemap_tests[] = { + T(empty), + T(toolong), + T(blackbox), + T(internals), + T(fmt), + END_OF_TESTCASES +}; diff --git a/src/test/test_pubsub_build.c b/src/test/test_pubsub_build.c new file mode 100644 index 0000000000..ce5bf60080 --- /dev/null +++ b/src/test/test_pubsub_build.c @@ -0,0 +1,621 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#define DISPATCH_PRIVATE +#define PUBSUB_PRIVATE + +#include "test/test.h" + +#include "lib/cc/torint.h" +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_naming.h" +#include "lib/dispatch/dispatch_st.h" +#include "lib/dispatch/msgtypes.h" +#include "lib/pubsub/pubsub_macros.h" +#include "lib/pubsub/pubsub_build.h" +#include "lib/pubsub/pubsub_builder_st.h" + +#include "lib/log/escape.h" +#include "lib/malloc/malloc.h" +#include "lib/string/printf.h" + +#include "test/log_test_helpers.h" + +#include <stdio.h> +#include <string.h> + +static char * +ex_int_fmt(msg_aux_data_t aux) +{ + int val = (int) aux.u64; + char *r=NULL; + tor_asprintf(&r, "%d", val); + return r; +} + +static char * +ex_str_fmt(msg_aux_data_t aux) +{ + return esc_for_log(aux.ptr); +} + +static void +ex_str_free(msg_aux_data_t aux) +{ + tor_free_(aux.ptr); +} + +static dispatch_typefns_t intfns = { + .fmt_fn = ex_int_fmt +}; + +static dispatch_typefns_t stringfns = { + .free_fn = ex_str_free, + .fmt_fn = ex_str_fmt +}; + +DECLARE_MESSAGE_INT(bunch_of_coconuts, int, int); +DECLARE_PUBLISH(bunch_of_coconuts); +DECLARE_SUBSCRIBE(bunch_of_coconuts, coconut_recipient_cb); + +DECLARE_MESSAGE(yes_we_have_no, string, char *); +DECLARE_PUBLISH(yes_we_have_no); +DECLARE_SUBSCRIBE(yes_we_have_no, absent_item_cb); + +static void +coconut_recipient_cb(const msg_t *m, int n_coconuts) +{ + (void)m; + (void)n_coconuts; +} + +static void +absent_item_cb(const msg_t *m, const char *fruitname) +{ + (void)m; + (void)fruitname; +} + +#define FLAG_SKIP 99999 + +static void +seed_dispatch_builder(pubsub_builder_t *b, + unsigned fl1, unsigned fl2, unsigned fl3, unsigned fl4) +{ + pubsub_connector_t *c = NULL; + + { + c = pubsub_connector_for_subsystem(b, get_subsys_id("sys1")); + DISPATCH_REGISTER_TYPE(c, int, &intfns); + if (fl1 != FLAG_SKIP) + DISPATCH_ADD_PUB_(c, main, bunch_of_coconuts, fl1); + if (fl2 != FLAG_SKIP) + DISPATCH_ADD_SUB_(c, main, yes_we_have_no, fl2); + pubsub_connector_free(c); + } + + { + c = pubsub_connector_for_subsystem(b, get_subsys_id("sys2")); + DISPATCH_REGISTER_TYPE(c, string, &stringfns); + if (fl3 != FLAG_SKIP) + DISPATCH_ADD_PUB_(c, main, yes_we_have_no, fl3); + if (fl4 != FLAG_SKIP) + DISPATCH_ADD_SUB_(c, main, bunch_of_coconuts, fl4); + pubsub_connector_free(c); + } +} + +static void +seed_pubsub_builder_basic(pubsub_builder_t *b) +{ + seed_dispatch_builder(b, 0, 0, 0, 0); +} + +/* Regular builder with valid types and messages. + */ +static void +test_pubsub_build_types_ok(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + pubsub_connector_t *c = NULL; + pubsub_items_t *items = NULL; + + b = pubsub_builder_new(); + seed_pubsub_builder_basic(b); + + dispatcher = pubsub_builder_finalize(b, &items); + b = NULL; + tt_assert(dispatcher); + tt_assert(items); + tt_int_op(smartlist_len(items->items), OP_EQ, 4); + + // Make sure that the bindings got build correctly. + SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, item) { + if (item->is_publish) { + tt_assert(item->pub_binding); + tt_ptr_op(item->pub_binding->dispatch_ptr, OP_EQ, dispatcher); + } + } SMARTLIST_FOREACH_END(item); + + tt_int_op(dispatcher->n_types, OP_GE, 2); + tt_assert(dispatcher->typefns); + + tt_assert(dispatcher->typefns[get_msg_type_id("int")].fmt_fn == ex_int_fmt); + tt_assert(dispatcher->typefns[get_msg_type_id("string")].fmt_fn == + ex_str_fmt); + + // Now clear the bindings, like we would do before freeing the + // the dispatcher. + pubsub_items_clear_bindings(items); + SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, item) { + if (item->is_publish) { + tt_assert(item->pub_binding); + tt_ptr_op(item->pub_binding->dispatch_ptr, OP_EQ, NULL); + } + } SMARTLIST_FOREACH_END(item); + + done: + pubsub_connector_free(c); + pubsub_builder_free(b); + dispatch_free(dispatcher); + pubsub_items_free(items); +} + +/* We fail if the same type is defined in two places with different functions. + */ +static void +test_pubsub_build_types_decls_conflict(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + pubsub_connector_t *c = NULL; + + b = pubsub_builder_new(); + seed_pubsub_builder_basic(b); + { + c = pubsub_connector_for_subsystem(b, get_subsys_id("sys3")); + // Extra declaration of int: we don't allow this. + DISPATCH_REGISTER_TYPE(c, int, &stringfns); + pubsub_connector_free(c); + } + + setup_full_capture_of_logs(LOG_WARN); + dispatcher = pubsub_builder_finalize(b, NULL); + b = NULL; + tt_assert(dispatcher == NULL); + // expect_log_msg_containing("(int) declared twice"); // XXXX + + done: + pubsub_connector_free(c); + pubsub_builder_free(b); + dispatch_free(dispatcher); + teardown_capture_of_logs(); +} + +/* If a message ID exists but nobody is publishing or subscribing to it, + * that's okay. */ +static void +test_pubsub_build_unused_message(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + + b = pubsub_builder_new(); + seed_pubsub_builder_basic(b); + + // This message isn't actually generated by anyone, but that will be fine: + // we just log it at info. + get_message_id("unused"); + setup_capture_of_logs(LOG_INFO); + + dispatcher = pubsub_builder_finalize(b, NULL); + b = NULL; + tt_assert(dispatcher); + expect_log_msg_containing( + "Nobody is publishing or subscribing to message"); + + done: + pubsub_builder_free(b); + dispatch_free(dispatcher); + teardown_capture_of_logs(); +} + +/* Publishing or subscribing to a message with no subscribers / publishers + * should fail and warn. */ +static void +test_pubsub_build_missing_pubsub(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + + b = pubsub_builder_new(); + seed_dispatch_builder(b, 0, 0, FLAG_SKIP, FLAG_SKIP); + + setup_full_capture_of_logs(LOG_WARN); + dispatcher = pubsub_builder_finalize(b, NULL); + b = NULL; + tt_assert(dispatcher == NULL); + + expect_log_msg_containing( + "Message \"bunch_of_coconuts\" has publishers, but no subscribers."); + expect_log_msg_containing( + "Message \"yes_we_have_no\" has subscribers, but no publishers."); + + done: + pubsub_builder_free(b); + dispatch_free(dispatcher); + teardown_capture_of_logs(); +} + +/* Make sure that a stub publisher or subscriber prevents an error from + * happening even if there are no other publishers/subscribers for a message + */ +static void +test_pubsub_build_stub_pubsub(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + + b = pubsub_builder_new(); + seed_dispatch_builder(b, 0, 0, DISP_FLAG_STUB, DISP_FLAG_STUB); + + dispatcher = pubsub_builder_finalize(b, NULL); + b = NULL; + tt_assert(dispatcher); + + // 1 subscriber. + tt_int_op(1, OP_EQ, + dispatcher->table[get_message_id("yes_we_have_no")]->n_enabled); + // no subscribers + tt_ptr_op(NULL, OP_EQ, + dispatcher->table[get_message_id("bunch_of_coconuts")]); + + done: + pubsub_builder_free(b); + dispatch_free(dispatcher); +} + +/* Only one channel per msg id. */ +static void +test_pubsub_build_channels_conflict(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + pubsub_connector_t *c = NULL; + + b = pubsub_builder_new(); + seed_pubsub_builder_basic(b); + pub_binding_t btmp; + + { + c = pubsub_connector_for_subsystem(b, get_subsys_id("problems")); + /* Usually the DISPATCH_ADD_PUB macro would keep us from using + * the wrong channel */ + pubsub_add_pub_(c, &btmp, get_channel_id("hithere"), + get_message_id("bunch_of_coconuts"), + get_msg_type_id("int"), + 0 /* flags */, + "somewhere.c", 22); + pubsub_connector_free(c); + }; + + setup_full_capture_of_logs(LOG_WARN); + dispatcher = pubsub_builder_finalize(b, NULL); + b = NULL; + tt_assert(dispatcher == NULL); + + expect_log_msg_containing("Message \"bunch_of_coconuts\" is associated " + "with multiple inconsistent channels."); + + done: + pubsub_builder_free(b); + dispatch_free(dispatcher); + teardown_capture_of_logs(); +} + +/* Only one type per msg id. */ +static void +test_pubsub_build_types_conflict(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + pubsub_connector_t *c = NULL; + + b = pubsub_builder_new(); + seed_pubsub_builder_basic(b); + pub_binding_t btmp; + + { + c = pubsub_connector_for_subsystem(b, get_subsys_id("problems")); + /* Usually the DISPATCH_ADD_PUB macro would keep us from using + * the wrong channel */ + pubsub_add_pub_(c, &btmp, get_channel_id("hithere"), + get_message_id("bunch_of_coconuts"), + get_msg_type_id("string"), + 0 /* flags */, + "somewhere.c", 22); + pubsub_connector_free(c); + }; + + setup_full_capture_of_logs(LOG_WARN); + dispatcher = pubsub_builder_finalize(b, NULL); + b = NULL; + tt_assert(dispatcher == NULL); + + expect_log_msg_containing("Message \"bunch_of_coconuts\" is associated " + "with multiple inconsistent message types."); + + done: + pubsub_builder_free(b); + dispatch_free(dispatcher); + teardown_capture_of_logs(); +} + +/* The same module can't publish and subscribe the same message */ +static void +test_pubsub_build_pubsub_same(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + pubsub_connector_t *c = NULL; + + b = pubsub_builder_new(); + seed_pubsub_builder_basic(b); + + { + c = pubsub_connector_for_subsystem(b, get_subsys_id("sys1")); + // already publishing this. + DISPATCH_ADD_SUB(c, main, bunch_of_coconuts); + pubsub_connector_free(c); + }; + + setup_full_capture_of_logs(LOG_WARN); + dispatcher = pubsub_builder_finalize(b, NULL); + b = NULL; + tt_assert(dispatcher == NULL); + + expect_log_msg_containing("Message \"bunch_of_coconuts\" is published " + "and subscribed by the same subsystem \"sys1\"."); + + done: + pubsub_builder_free(b); + dispatch_free(dispatcher); + teardown_capture_of_logs(); +} + +/* More than one subsystem may publish or subscribe, and that's okay. */ +static void +test_pubsub_build_pubsub_multi(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + pubsub_connector_t *c = NULL; + + b = pubsub_builder_new(); + seed_pubsub_builder_basic(b); + pub_binding_t btmp; + + { + c = pubsub_connector_for_subsystem(b, get_subsys_id("sys3")); + DISPATCH_ADD_SUB(c, main, bunch_of_coconuts); + pubsub_add_pub_(c, &btmp, get_channel_id("main"), + get_message_id("yes_we_have_no"), + get_msg_type_id("string"), + 0 /* flags */, + "somewhere.c", 22); + pubsub_connector_free(c); + }; + + dispatcher = pubsub_builder_finalize(b, NULL); + b = NULL; + tt_assert(dispatcher); + + // 1 subscribers + tt_int_op(1, OP_EQ, + dispatcher->table[get_message_id("yes_we_have_no")]->n_enabled); + // 2 subscribers. + dtbl_entry_t *ent = + dispatcher->table[get_message_id("bunch_of_coconuts")]; + tt_int_op(2, OP_EQ, ent->n_enabled); + tt_int_op(2, OP_EQ, ent->n_fns); + tt_ptr_op(ent->rcv[0].fn, OP_EQ, recv_fn__bunch_of_coconuts); + tt_ptr_op(ent->rcv[1].fn, OP_EQ, recv_fn__bunch_of_coconuts); + + done: + pubsub_builder_free(b); + dispatch_free(dispatcher); +} + +static void +some_other_coconut_hook(const msg_t *m) +{ + (void)m; +} + +/* Subscribe hooks should be build correctly when there are a bunch of + * them. */ +static void +test_pubsub_build_sub_many(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + pubsub_connector_t *c = NULL; + char *sysname = NULL; + b = pubsub_builder_new(); + seed_pubsub_builder_basic(b); + + int i; + for (i = 1; i < 100; ++i) { + tor_asprintf(&sysname, "system%d",i); + c = pubsub_connector_for_subsystem(b, get_subsys_id(sysname)); + if (i % 7) { + DISPATCH_ADD_SUB(c, main, bunch_of_coconuts); + } else { + pubsub_add_sub_(c, some_other_coconut_hook, + get_channel_id("main"), + get_message_id("bunch_of_coconuts"), + get_msg_type_id("int"), + 0 /* flags */, + "somewhere.c", 22); + } + pubsub_connector_free(c); + tor_free(sysname); + }; + + dispatcher = pubsub_builder_finalize(b, NULL); + b = NULL; + tt_assert(dispatcher); + + dtbl_entry_t *ent = + dispatcher->table[get_message_id("bunch_of_coconuts")]; + tt_int_op(100, OP_EQ, ent->n_enabled); + tt_int_op(100, OP_EQ, ent->n_fns); + tt_ptr_op(ent->rcv[0].fn, OP_EQ, recv_fn__bunch_of_coconuts); + tt_ptr_op(ent->rcv[1].fn, OP_EQ, recv_fn__bunch_of_coconuts); + tt_ptr_op(ent->rcv[76].fn, OP_EQ, recv_fn__bunch_of_coconuts); + tt_ptr_op(ent->rcv[77].fn, OP_EQ, some_other_coconut_hook); + tt_ptr_op(ent->rcv[78].fn, OP_EQ, recv_fn__bunch_of_coconuts); + + done: + pubsub_builder_free(b); + dispatch_free(dispatcher); + tor_free(sysname); +} + +/* The same subsystem can only declare one publish or subscribe. */ +static void +test_pubsub_build_pubsub_redundant(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + pubsub_connector_t *c = NULL; + + b = pubsub_builder_new(); + seed_pubsub_builder_basic(b); + pub_binding_t btmp; + + { + c = pubsub_connector_for_subsystem(b, get_subsys_id("sys2")); + DISPATCH_ADD_SUB(c, main, bunch_of_coconuts); + pubsub_add_pub_(c, &btmp, get_channel_id("main"), + get_message_id("yes_we_have_no"), + get_msg_type_id("string"), + 0 /* flags */, + "somewhere.c", 22); + pubsub_connector_free(c); + }; + + setup_full_capture_of_logs(LOG_WARN); + dispatcher = pubsub_builder_finalize(b, NULL); + b = NULL; + tt_assert(dispatcher == NULL); + + expect_log_msg_containing( + "Message \"yes_we_have_no\" is configured to be published by " + "subsystem \"sys2\" more than once."); + expect_log_msg_containing( + "Message \"bunch_of_coconuts\" is configured to be subscribed by " + "subsystem \"sys2\" more than once."); + + done: + pubsub_builder_free(b); + dispatch_free(dispatcher); + teardown_capture_of_logs(); +} + +/* It's fine to declare the excl flag. */ +static void +test_pubsub_build_excl_ok(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + + b = pubsub_builder_new(); + // Try one excl/excl pair and one excl/non pair. + seed_dispatch_builder(b, DISP_FLAG_EXCL, 0, + DISP_FLAG_EXCL, DISP_FLAG_EXCL); + + dispatcher = pubsub_builder_finalize(b, NULL); + b = NULL; + tt_assert(dispatcher); + + // 1 subscribers + tt_int_op(1, OP_EQ, + dispatcher->table[get_message_id("yes_we_have_no")]->n_enabled); + // 1 subscriber. + tt_int_op(1, OP_EQ, + dispatcher->table[get_message_id("bunch_of_coconuts")]->n_enabled); + + done: + pubsub_builder_free(b); + dispatch_free(dispatcher); +} + +/* but if you declare the excl flag, you need to mean it. */ +static void +test_pubsub_build_excl_bad(void *arg) +{ + (void)arg; + pubsub_builder_t *b = NULL; + dispatch_t *dispatcher = NULL; + pubsub_connector_t *c = NULL; + + b = pubsub_builder_new(); + seed_dispatch_builder(b, DISP_FLAG_EXCL, DISP_FLAG_EXCL, + 0, 0); + + { + c = pubsub_connector_for_subsystem(b, get_subsys_id("sys3")); + DISPATCH_ADD_PUB_(c, main, bunch_of_coconuts, 0); + DISPATCH_ADD_SUB_(c, main, yes_we_have_no, 0); + pubsub_connector_free(c); + }; + + setup_full_capture_of_logs(LOG_WARN); + dispatcher = pubsub_builder_finalize(b, NULL); + b = NULL; + tt_assert(dispatcher == NULL); + + expect_log_msg_containing("has multiple publishers, but at least one is " + "marked as exclusive."); + expect_log_msg_containing("has multiple subscribers, but at least one is " + "marked as exclusive."); + + done: + pubsub_builder_free(b); + dispatch_free(dispatcher); + teardown_capture_of_logs(); +} + +#define T(name, flags) \ + { #name, test_pubsub_build_ ## name , (flags), NULL, NULL } + +struct testcase_t pubsub_build_tests[] = { + T(types_ok, TT_FORK), + T(types_decls_conflict, TT_FORK), + T(unused_message, TT_FORK), + T(missing_pubsub, TT_FORK), + T(stub_pubsub, TT_FORK), + T(channels_conflict, TT_FORK), + T(types_conflict, TT_FORK), + T(pubsub_same, TT_FORK), + T(pubsub_multi, TT_FORK), + T(sub_many, TT_FORK), + T(pubsub_redundant, TT_FORK), + T(excl_ok, TT_FORK), + T(excl_bad, TT_FORK), + END_OF_TESTCASES +}; diff --git a/src/test/test_pubsub_msg.c b/src/test/test_pubsub_msg.c new file mode 100644 index 0000000000..73c7c9f540 --- /dev/null +++ b/src/test/test_pubsub_msg.c @@ -0,0 +1,305 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#define DISPATCH_PRIVATE + +#include "test/test.h" + +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_naming.h" +#include "lib/dispatch/dispatch_st.h" +#include "lib/dispatch/msgtypes.h" +#include "lib/pubsub/pubsub_flags.h" +#include "lib/pubsub/pub_binding_st.h" +#include "lib/pubsub/pubsub_build.h" +#include "lib/pubsub/pubsub_builder_st.h" +#include "lib/pubsub/pubsub_connect.h" +#include "lib/pubsub/pubsub_publish.h" + +#include "lib/log/escape.h" +#include "lib/malloc/malloc.h" +#include "lib/string/printf.h" + +#include <stdio.h> +#include <string.h> + +static char * +ex_str_fmt(msg_aux_data_t aux) +{ + return esc_for_log(aux.ptr); +} +static void +ex_str_free(msg_aux_data_t aux) +{ + tor_free_(aux.ptr); +} +static dispatch_typefns_t stringfns = { + .free_fn = ex_str_free, + .fmt_fn = ex_str_fmt +}; + +// We're using the lowest-level publish/subscribe logic here, to avoid the +// pubsub_macros.h macros and just test the dispatch core. We'll use a string +// type for everything. + +#define DECLARE_MESSAGE(suffix) \ + static pub_binding_t pub_binding_##suffix; \ + static int msg_received_##suffix = 0; \ + static void recv_msg_##suffix(const msg_t *m) { \ + (void)m; \ + ++msg_received_##suffix; \ + } \ + EAT_SEMICOLON + +#define ADD_PUBLISH(binding_suffix, subsys, channel, msg, flags) \ + STMT_BEGIN { \ + con = pubsub_connector_for_subsystem(builder, \ + get_subsys_id(#subsys)); \ + pubsub_add_pub_(con, &pub_binding_##binding_suffix, \ + get_channel_id(#channel), \ + get_message_id(#msg), get_msg_type_id("string"), \ + (flags), __FILE__, __LINE__); \ + pubsub_connector_free(con); \ + } STMT_END + +#define ADD_SUBSCRIBE(hook_suffix, subsys, channel, msg, flags) \ + STMT_BEGIN { \ + con = pubsub_connector_for_subsystem(builder, \ + get_subsys_id(#subsys)); \ + pubsub_add_sub_(con, recv_msg_##hook_suffix, \ + get_channel_id(#channel), \ + get_message_id(#msg), get_msg_type_id("string"), \ + (flags), __FILE__, __LINE__); \ + pubsub_connector_free(con); \ + } STMT_END + +#define SEND(binding_suffix, val) \ + STMT_BEGIN { \ + msg_aux_data_t data_; \ + data_.ptr = tor_strdup(val); \ + pubsub_pub_(&pub_binding_##binding_suffix, data_); \ + } STMT_END + +DECLARE_MESSAGE(msg1); +DECLARE_MESSAGE(msg2); +DECLARE_MESSAGE(msg3); +DECLARE_MESSAGE(msg4); +DECLARE_MESSAGE(msg5); + +static smartlist_t *strings_received = NULL; +static void +recv_msg_copy_string(const msg_t *m) +{ + const char *s = m->aux_data__.ptr; + smartlist_add(strings_received, tor_strdup(s)); +} + +static void * +setup_dispatcher(const struct testcase_t *testcase) +{ + (void)testcase; + pubsub_builder_t *builder = pubsub_builder_new(); + pubsub_connector_t *con; + + { + con = pubsub_connector_for_subsystem(builder, get_subsys_id("types")); + pubsub_connector_register_type_(con, + get_msg_type_id("string"), + &stringfns, + "nowhere.c", 99); + pubsub_connector_free(con); + } + // message1 has one publisher and one subscriber. + ADD_PUBLISH(msg1, sys1, main, message1, 0); + ADD_SUBSCRIBE(msg1, sys2, main, message1, 0); + + // message2 has a publisher and a stub subscriber. + ADD_PUBLISH(msg2, sys1, main, message2, 0); + ADD_SUBSCRIBE(msg2, sys2, main, message2, DISP_FLAG_STUB); + + // message3 has a publisher and three subscribers. + ADD_PUBLISH(msg3, sys1, main, message3, 0); + ADD_SUBSCRIBE(msg3, sys2, main, message3, 0); + ADD_SUBSCRIBE(msg3, sys3, main, message3, 0); + ADD_SUBSCRIBE(msg3, sys4, main, message3, 0); + + // message4 has one publisher and two subscribers, but it's on another + // channel. + ADD_PUBLISH(msg4, sys2, other, message4, 0); + ADD_SUBSCRIBE(msg4, sys1, other, message4, 0); + ADD_SUBSCRIBE(msg4, sys3, other, message4, 0); + + // message5 has a huge number of recipients. + ADD_PUBLISH(msg5, sys3, main, message5, 0); + ADD_SUBSCRIBE(msg5, sys4, main, message5, 0); + ADD_SUBSCRIBE(msg5, sys5, main, message5, 0); + ADD_SUBSCRIBE(msg5, sys6, main, message5, 0); + ADD_SUBSCRIBE(msg5, sys7, main, message5, 0); + ADD_SUBSCRIBE(msg5, sys8, main, message5, 0); + for (int i = 0; i < 1000-5; ++i) { + char *sys; + tor_asprintf(&sys, "xsys-%d", i); + con = pubsub_connector_for_subsystem(builder, get_subsys_id(sys)); + pubsub_add_sub_(con, recv_msg_copy_string, + get_channel_id("main"), + get_message_id("message5"), + get_msg_type_id("string"), 0, "here", 100); + pubsub_connector_free(con); + tor_free(sys); + } + + return pubsub_builder_finalize(builder, NULL); +} + +static int +cleanup_dispatcher(const struct testcase_t *testcase, void *dispatcher_) +{ + (void)testcase; + dispatch_t *dispatcher = dispatcher_; + dispatch_free(dispatcher); + return 1; +} + +static const struct testcase_setup_t dispatcher_setup = { + setup_dispatcher, cleanup_dispatcher +}; + +static void +test_pubsub_msg_minimal(void *arg) +{ + dispatch_t *d = arg; + + tt_int_op(0, OP_EQ, msg_received_msg1); + SEND(msg1, "hello world"); + tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet. + + tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000)); + tt_int_op(1, OP_EQ, msg_received_msg1); // we got the message! + + done: + ; +} + +static void +test_pubsub_msg_send_to_stub(void *arg) +{ + dispatch_t *d = arg; + + tt_int_op(0, OP_EQ, msg_received_msg2); + SEND(msg2, "hello silence"); + tt_int_op(0, OP_EQ, msg_received_msg2); // hasn't actually arrived yet. + + tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000)); + tt_int_op(0, OP_EQ, msg_received_msg2); // doesn't arrive -- stub hook. + + done: + ; +} + +static void +test_pubsub_msg_cancel_msgs(void *arg) +{ + dispatch_t *d = arg; + + tt_int_op(0, OP_EQ, msg_received_msg1); + for (int i = 0; i < 100; ++i) { + SEND(msg1, "hello world"); + } + tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet. + + tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 10)); + tt_int_op(10, OP_EQ, msg_received_msg1); // we got the message 10 times. + + // At this point, the dispatcher will be freed with queued, undelivered + // messages. + done: + ; +} + +struct alertfn_target { + dispatch_t *d; + channel_id_t ch; + int count; +}; +static void +alertfn_generic(dispatch_t *d, channel_id_t ch, void *arg) +{ + struct alertfn_target *t = arg; + tt_ptr_op(d, OP_EQ, t->d); + tt_int_op(ch, OP_EQ, t->ch); + ++t->count; + done: + ; +} + +static void +test_pubsub_msg_alertfns(void *arg) +{ + dispatch_t *d = arg; + struct alertfn_target ch1_a = { d, get_channel_id("main"), 0 }; + struct alertfn_target ch2_a = { d, get_channel_id("other"), 0 }; + + tt_int_op(0, OP_EQ, + dispatch_set_alert_fn(d, get_channel_id("main"), + alertfn_generic, &ch1_a)); + tt_int_op(0, OP_EQ, + dispatch_set_alert_fn(d, get_channel_id("other"), + alertfn_generic, &ch2_a)); + + SEND(msg3, "hello"); + tt_int_op(ch1_a.count, OP_EQ, 1); + SEND(msg3, "world"); + tt_int_op(ch1_a.count, OP_EQ, 1); // only the first message sends an alert + tt_int_op(ch2_a.count, OP_EQ, 0); // no alert for 'other' + + SEND(msg4, "worse things happen in C"); + tt_int_op(ch2_a.count, OP_EQ, 1); + + // flush the first (main) channel... + tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000)); + tt_int_op(6, OP_EQ, msg_received_msg3); // 3 subscribers, 2 instances. + + // now that the main channel is flushed, sending another message on it + // starts another alert. + tt_int_op(ch1_a.count, OP_EQ, 1); + SEND(msg1, "plover"); + tt_int_op(ch1_a.count, OP_EQ, 2); + tt_int_op(ch2_a.count, OP_EQ, 1); + + done: + ; +} + +/* try more than N_FAST_FNS hooks on msg5 */ +static void +test_pubsub_msg_many_hooks(void *arg) +{ + dispatch_t *d = arg; + strings_received = smartlist_new(); + + tt_int_op(0, OP_EQ, msg_received_msg5); + SEND(msg5, "hello world"); + tt_int_op(0, OP_EQ, msg_received_msg5); + tt_int_op(0, OP_EQ, smartlist_len(strings_received)); + + tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 100000)); + tt_int_op(5, OP_EQ, msg_received_msg5); + tt_int_op(995, OP_EQ, smartlist_len(strings_received)); + + done: + SMARTLIST_FOREACH(strings_received, char *, s, tor_free(s)); + smartlist_free(strings_received); +} + +#define T(name) \ + { #name, test_pubsub_msg_ ## name , TT_FORK, \ + &dispatcher_setup, NULL } + +struct testcase_t pubsub_msg_tests[] = { + T(minimal), + T(send_to_stub), + T(cancel_msgs), + T(alertfns), + T(many_hooks), + END_OF_TESTCASES +}; |