diff options
author | Nick Mathewson <nickm@torproject.org> | 2019-01-15 09:01:20 -0500 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2019-03-25 16:35:33 -0400 |
commit | 9e60482b8073f2d43187c36c9159fd4367d7140a (patch) | |
tree | e73931e80e7f67442f2f865b630d98a338b1b3cb /src/test | |
parent | 24b945f713a713bba0ec4f0d8297b49cbc45c5a1 (diff) | |
download | tor-9e60482b8073f2d43187c36c9159fd4367d7140a.tar.gz tor-9e60482b8073f2d43187c36c9159fd4367d7140a.zip |
Pubsub: an OO layer on top of lib/dispatch
This "publish/subscribe" layer sits on top of lib/dispatch, and
tries to provide more type-safety and cross-checking for the
lower-level layer.
Even with this commit, we're still not done: more checking will come
in the next commit, and a set of usability/typesafety macros will
come after.
Diffstat (limited to 'src/test')
-rw-r--r-- | src/test/include.am | 1 | ||||
-rw-r--r-- | src/test/test.c | 1 | ||||
-rw-r--r-- | src/test/test.h | 1 | ||||
-rw-r--r-- | src/test/test_pubsub_msg.c | 305 |
4 files changed, 308 insertions, 0 deletions
diff --git a/src/test/include.am b/src/test/include.am index d4e8eb4e8c..7734b73de8 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -165,6 +165,7 @@ src_test_test_SOURCES += \ src/test/test_proto_misc.c \ src/test/test_protover.c \ src/test/test_pt.c \ + src/test/test_pubsub_msg.c \ src/test/test_relay.c \ src/test/test_relaycell.c \ src/test/test_relaycrypt.c \ diff --git a/src/test/test.c b/src/test/test.c index 4c6d9775b4..da0be41332 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -910,6 +910,7 @@ struct testgroup_t testgroups[] = { { "proto/misc/", proto_misc_tests }, { "protover/", protover_tests }, { "pt/", pt_tests }, + { "pubsub/msg/", pubsub_msg_tests }, { "relay/" , relay_tests }, { "relaycell/", relaycell_tests }, { "relaycrypt/", relaycrypt_tests }, diff --git a/src/test/test.h b/src/test/test.h index cd0ce4f6df..fb417124ce 100644 --- a/src/test/test.h +++ b/src/test/test.h @@ -253,6 +253,7 @@ extern struct testcase_t proto_http_tests[]; extern struct testcase_t proto_misc_tests[]; extern struct testcase_t protover_tests[]; extern struct testcase_t pt_tests[]; +extern struct testcase_t pubsub_msg_tests[]; extern struct testcase_t relay_tests[]; extern struct testcase_t relaycell_tests[]; extern struct testcase_t relaycrypt_tests[]; diff --git a/src/test/test_pubsub_msg.c b/src/test/test_pubsub_msg.c new file mode 100644 index 0000000000..5b771d45a5 --- /dev/null +++ b/src/test/test_pubsub_msg.c @@ -0,0 +1,305 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#define DISPATCH_PRIVATE + +#include "test/test.h" + +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_naming.h" +#include "lib/dispatch/dispatch_st.h" +#include "lib/dispatch/msgtypes.h" +#include "lib/pubsub/pubsub_flags.h" +#include "lib/pubsub/pub_binding_st.h" +#include "lib/pubsub/pubsub_build.h" +#include "lib/pubsub/pubsub_builder_st.h" +#include "lib/pubsub/pubsub_connect.h" +#include "lib/pubsub/pubsub_publish.h" + +#include "lib/log/escape.h" +#include "lib/malloc/malloc.h" +#include "lib/string/printf.h" + +#include <stdio.h> +#include <string.h> + +static char * +ex_str_fmt(msg_aux_data_t aux) +{ + return esc_for_log(aux.ptr); +} +static void +ex_str_free(msg_aux_data_t aux) +{ + tor_free_(aux.ptr); +} +static dispatch_typefns_t stringfns = { + .free_fn = ex_str_free, + .fmt_fn = ex_str_fmt +}; + +// We're using the lowest-level publish/subscribe logic here, to avoid the +// pubsub_macros.h macros and just test the dispatch core. We'll use a string +// type for everything. + +#define DECLARE_MESSAGE(suffix) \ + static pub_binding_t pub_binding_##suffix; \ + static int msg_received_##suffix = 0; \ + static void recv_msg_##suffix(const msg_t *m) { \ + (void)m; \ + ++msg_received_##suffix; \ + } \ + EAT_SEMICOLON + +#define ADD_PUBLISH(binding_suffix, subsys, channel, msg, flags) \ + STMT_BEGIN { \ + con = pubsub_connector_for_subsystem(builder, \ + get_subsys_id(#subsys)); \ + pubsub_add_pub_(con, &pub_binding_##binding_suffix, \ + get_channel_id(#channel), \ + get_message_id(#msg), get_msg_type_id("string"), \ + (flags), __FILE__, __LINE__); \ + pubsub_connector_free(con); \ + } STMT_END + +#define ADD_SUBSCRIBE(hook_suffix, subsys, channel, msg, flags) \ + STMT_BEGIN { \ + con = pubsub_connector_for_subsystem(builder, \ + get_subsys_id(#subsys)); \ + pubsub_add_sub_(con, recv_msg_##hook_suffix, \ + get_channel_id(#channel), \ + get_message_id(#msg), get_msg_type_id("string"), \ + (flags), __FILE__, __LINE__); \ + pubsub_connector_free(con); \ + } STMT_END + +#define SEND(binding_suffix, val) \ + STMT_BEGIN { \ + msg_aux_data_t data_; \ + data_.ptr = tor_strdup(val); \ + pubsub_pub_(&pub_binding_##binding_suffix, data_); \ + } STMT_END + +DECLARE_MESSAGE(msg1); +DECLARE_MESSAGE(msg2); +DECLARE_MESSAGE(msg3); +DECLARE_MESSAGE(msg4); +DECLARE_MESSAGE(msg5); + +static smartlist_t *strings_received = NULL; +static void +recv_msg_copy_string(const msg_t *m) +{ + const char *s = m->aux_data__.ptr; + smartlist_add(strings_received, tor_strdup(s)); +} + +static void * +setup_dispatcher(const struct testcase_t *testcase) +{ + (void)testcase; + pubsub_builder_t *builder = pubsub_builder_new(); + pubsub_connector_t *con; + + { + con = pubsub_connector_for_subsystem(builder, get_subsys_id("types")); + pubsub_connector_define_type_(con, + get_msg_type_id("string"), + &stringfns, + "nowhere.c", 99); + pubsub_connector_free(con); + } + // message1 has one publisher and one subscriber. + ADD_PUBLISH(msg1, sys1, main, message1, 0); + ADD_SUBSCRIBE(msg1, sys2, main, message1, 0); + + // message2 has a publisher and a stub subscriber. + ADD_PUBLISH(msg2, sys1, main, message2, 0); + ADD_SUBSCRIBE(msg2, sys2, main, message2, DISP_FLAG_STUB); + + // message3 has a publisher and three subscribers. + ADD_PUBLISH(msg3, sys1, main, message3, 0); + ADD_SUBSCRIBE(msg3, sys2, main, message3, 0); + ADD_SUBSCRIBE(msg3, sys3, main, message3, 0); + ADD_SUBSCRIBE(msg3, sys4, main, message3, 0); + + // message4 has one publisher and two subscribers, but it's on another + // channel. + ADD_PUBLISH(msg4, sys2, other, message4, 0); + ADD_SUBSCRIBE(msg4, sys1, other, message4, 0); + ADD_SUBSCRIBE(msg4, sys3, other, message4, 0); + + // message5 has a huge number of recipients. + ADD_PUBLISH(msg5, sys3, main, message5, 0); + ADD_SUBSCRIBE(msg5, sys4, main, message5, 0); + ADD_SUBSCRIBE(msg5, sys5, main, message5, 0); + ADD_SUBSCRIBE(msg5, sys6, main, message5, 0); + ADD_SUBSCRIBE(msg5, sys7, main, message5, 0); + ADD_SUBSCRIBE(msg5, sys8, main, message5, 0); + for (int i = 0; i < 1000-5; ++i) { + char *sys; + tor_asprintf(&sys, "xsys-%d", i); + con = pubsub_connector_for_subsystem(builder, get_subsys_id(sys)); + pubsub_add_sub_(con, recv_msg_copy_string, + get_channel_id("main"), + get_message_id("message5"), + get_msg_type_id("string"), 0, "here", 100); + pubsub_connector_free(con); + tor_free(sys); + } + + return pubsub_builder_finalize(builder); +} + +static int +cleanup_dispatcher(const struct testcase_t *testcase, void *dispatcher_) +{ + (void)testcase; + dispatch_t *dispatcher = dispatcher_; + dispatch_free(dispatcher); + return 1; +} + +static const struct testcase_setup_t dispatcher_setup = { + setup_dispatcher, cleanup_dispatcher +}; + +static void +test_pubsub_msg_minimal(void *arg) +{ + dispatch_t *d = arg; + + tt_int_op(0, OP_EQ, msg_received_msg1); + SEND(msg1, "hello world"); + tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet. + + tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000)); + tt_int_op(1, OP_EQ, msg_received_msg1); // we got the message! + + done: + ; +} + +static void +test_pubsub_msg_send_to_stub(void *arg) +{ + dispatch_t *d = arg; + + tt_int_op(0, OP_EQ, msg_received_msg2); + SEND(msg2, "hello silence"); + tt_int_op(0, OP_EQ, msg_received_msg2); // hasn't actually arrived yet. + + tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000)); + tt_int_op(0, OP_EQ, msg_received_msg2); // doesn't arrive -- stub hook. + + done: + ; +} + +static void +test_pubsub_msg_cancel_msgs(void *arg) +{ + dispatch_t *d = arg; + + tt_int_op(0, OP_EQ, msg_received_msg1); + for (int i = 0; i < 100; ++i) { + SEND(msg1, "hello world"); + } + tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet. + + tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 10)); + tt_int_op(10, OP_EQ, msg_received_msg1); // we got the message 10 times. + + // At this point, the dispatcher will be freed with queued, undelivered + // messages. + done: + ; +} + +struct alertfn_target { + dispatch_t *d; + channel_id_t ch; + int count; +}; +static void +alertfn_generic(dispatch_t *d, channel_id_t ch, void *arg) +{ + struct alertfn_target *t = arg; + tt_ptr_op(d, OP_EQ, t->d); + tt_int_op(ch, OP_EQ, t->ch); + ++t->count; + done: + ; +} + +static void +test_pubsub_msg_alertfns(void *arg) +{ + dispatch_t *d = arg; + struct alertfn_target ch1_a = { d, get_channel_id("main"), 0 }; + struct alertfn_target ch2_a = { d, get_channel_id("other"), 0 }; + + tt_int_op(0, OP_EQ, + dispatch_set_alert_fn(d, get_channel_id("main"), + alertfn_generic, &ch1_a)); + tt_int_op(0, OP_EQ, + dispatch_set_alert_fn(d, get_channel_id("other"), + alertfn_generic, &ch2_a)); + + SEND(msg3, "hello"); + tt_int_op(ch1_a.count, OP_EQ, 1); + SEND(msg3, "world"); + tt_int_op(ch1_a.count, OP_EQ, 1); // only the first message sends an alert + tt_int_op(ch2_a.count, OP_EQ, 0); // no alert for 'other' + + SEND(msg4, "worse things happen in C"); + tt_int_op(ch2_a.count, OP_EQ, 1); + + // flush the first (main) channel... + tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000)); + tt_int_op(6, OP_EQ, msg_received_msg3); // 3 subscribers, 2 instances. + + // now that the main channel is flushed, sending another message on it + // starts another alert. + tt_int_op(ch1_a.count, OP_EQ, 1); + SEND(msg1, "plover"); + tt_int_op(ch1_a.count, OP_EQ, 2); + tt_int_op(ch2_a.count, OP_EQ, 1); + + done: + ; +} + +/* try more than N_FAST_FNS hooks on msg5 */ +static void +test_pubsub_msg_many_hooks(void *arg) +{ + dispatch_t *d = arg; + strings_received = smartlist_new(); + + tt_int_op(0, OP_EQ, msg_received_msg5); + SEND(msg5, "hello world"); + tt_int_op(0, OP_EQ, msg_received_msg5); + tt_int_op(0, OP_EQ, smartlist_len(strings_received)); + + tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 100000)); + tt_int_op(5, OP_EQ, msg_received_msg5); + tt_int_op(995, OP_EQ, smartlist_len(strings_received)); + + done: + SMARTLIST_FOREACH(strings_received, char *, s, tor_free(s)); + smartlist_free(strings_received); +} + +#define T(name) \ + { #name, test_pubsub_msg_ ## name , TT_FORK, \ + &dispatcher_setup, NULL } + +struct testcase_t pubsub_msg_tests[] = { + T(minimal), + T(send_to_stub), + T(cancel_msgs), + T(alertfns), + T(many_hooks), + END_OF_TESTCASES +}; |