aboutsummaryrefslogtreecommitdiff
path: root/src/test/test_pubsub_msg.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/test_pubsub_msg.c')
-rw-r--r--src/test/test_pubsub_msg.c305
1 files changed, 305 insertions, 0 deletions
diff --git a/src/test/test_pubsub_msg.c b/src/test/test_pubsub_msg.c
new file mode 100644
index 0000000000..3054db885d
--- /dev/null
+++ b/src/test/test_pubsub_msg.c
@@ -0,0 +1,305 @@
+/* Copyright (c) 2018-2020, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#define DISPATCH_PRIVATE
+
+#include "test/test.h"
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/dispatch/dispatch_st.h"
+#include "lib/dispatch/msgtypes.h"
+#include "lib/pubsub/pubsub_flags.h"
+#include "lib/pubsub/pub_binding_st.h"
+#include "lib/pubsub/pubsub_build.h"
+#include "lib/pubsub/pubsub_builder_st.h"
+#include "lib/pubsub/pubsub_connect.h"
+#include "lib/pubsub/pubsub_publish.h"
+
+#include "lib/log/escape.h"
+#include "lib/malloc/malloc.h"
+#include "lib/string/printf.h"
+
+#include <stdio.h>
+#include <string.h>
+
+static char *
+ex_str_fmt(msg_aux_data_t aux)
+{
+ return esc_for_log(aux.ptr);
+}
+static void
+ex_str_free(msg_aux_data_t aux)
+{
+ tor_free_(aux.ptr);
+}
+static dispatch_typefns_t stringfns = {
+ .free_fn = ex_str_free,
+ .fmt_fn = ex_str_fmt
+};
+
+// We're using the lowest-level publish/subscribe logic here, to avoid the
+// pubsub_macros.h macros and just test the dispatch core. We'll use a string
+// type for everything.
+
+#define DECLARE_MESSAGE(suffix) \
+ static pub_binding_t pub_binding_##suffix; \
+ static int msg_received_##suffix = 0; \
+ static void recv_msg_##suffix(const msg_t *m) { \
+ (void)m; \
+ ++msg_received_##suffix; \
+ } \
+ EAT_SEMICOLON
+
+#define ADD_PUBLISH(binding_suffix, subsys, channel, msg, flags) \
+ STMT_BEGIN { \
+ con = pubsub_connector_for_subsystem(builder, \
+ get_subsys_id(#subsys)); \
+ pubsub_add_pub_(con, &pub_binding_##binding_suffix, \
+ get_channel_id(#channel), \
+ get_message_id(#msg), get_msg_type_id("string"), \
+ (flags), __FILE__, __LINE__); \
+ pubsub_connector_free(con); \
+ } STMT_END
+
+#define ADD_SUBSCRIBE(hook_suffix, subsys, channel, msg, flags) \
+ STMT_BEGIN { \
+ con = pubsub_connector_for_subsystem(builder, \
+ get_subsys_id(#subsys)); \
+ pubsub_add_sub_(con, recv_msg_##hook_suffix, \
+ get_channel_id(#channel), \
+ get_message_id(#msg), get_msg_type_id("string"), \
+ (flags), __FILE__, __LINE__); \
+ pubsub_connector_free(con); \
+ } STMT_END
+
+#define SEND(binding_suffix, val) \
+ STMT_BEGIN { \
+ msg_aux_data_t data_; \
+ data_.ptr = tor_strdup(val); \
+ pubsub_pub_(&pub_binding_##binding_suffix, data_); \
+ } STMT_END
+
+DECLARE_MESSAGE(msg1);
+DECLARE_MESSAGE(msg2);
+DECLARE_MESSAGE(msg3);
+DECLARE_MESSAGE(msg4);
+DECLARE_MESSAGE(msg5);
+
+static smartlist_t *strings_received = NULL;
+static void
+recv_msg_copy_string(const msg_t *m)
+{
+ const char *s = m->aux_data__.ptr;
+ smartlist_add(strings_received, tor_strdup(s));
+}
+
+static void *
+setup_dispatcher(const struct testcase_t *testcase)
+{
+ (void)testcase;
+ pubsub_builder_t *builder = pubsub_builder_new();
+ pubsub_connector_t *con;
+
+ {
+ con = pubsub_connector_for_subsystem(builder, get_subsys_id("types"));
+ pubsub_connector_register_type_(con,
+ get_msg_type_id("string"),
+ &stringfns,
+ "nowhere.c", 99);
+ pubsub_connector_free(con);
+ }
+ // message1 has one publisher and one subscriber.
+ ADD_PUBLISH(msg1, sys1, main, message1, 0);
+ ADD_SUBSCRIBE(msg1, sys2, main, message1, 0);
+
+ // message2 has a publisher and a stub subscriber.
+ ADD_PUBLISH(msg2, sys1, main, message2, 0);
+ ADD_SUBSCRIBE(msg2, sys2, main, message2, DISP_FLAG_STUB);
+
+ // message3 has a publisher and three subscribers.
+ ADD_PUBLISH(msg3, sys1, main, message3, 0);
+ ADD_SUBSCRIBE(msg3, sys2, main, message3, 0);
+ ADD_SUBSCRIBE(msg3, sys3, main, message3, 0);
+ ADD_SUBSCRIBE(msg3, sys4, main, message3, 0);
+
+ // message4 has one publisher and two subscribers, but it's on another
+ // channel.
+ ADD_PUBLISH(msg4, sys2, other, message4, 0);
+ ADD_SUBSCRIBE(msg4, sys1, other, message4, 0);
+ ADD_SUBSCRIBE(msg4, sys3, other, message4, 0);
+
+ // message5 has a huge number of recipients.
+ ADD_PUBLISH(msg5, sys3, main, message5, 0);
+ ADD_SUBSCRIBE(msg5, sys4, main, message5, 0);
+ ADD_SUBSCRIBE(msg5, sys5, main, message5, 0);
+ ADD_SUBSCRIBE(msg5, sys6, main, message5, 0);
+ ADD_SUBSCRIBE(msg5, sys7, main, message5, 0);
+ ADD_SUBSCRIBE(msg5, sys8, main, message5, 0);
+ for (int i = 0; i < 1000-5; ++i) {
+ char *sys;
+ tor_asprintf(&sys, "xsys-%d", i);
+ con = pubsub_connector_for_subsystem(builder, get_subsys_id(sys));
+ pubsub_add_sub_(con, recv_msg_copy_string,
+ get_channel_id("main"),
+ get_message_id("message5"),
+ get_msg_type_id("string"), 0, "here", 100);
+ pubsub_connector_free(con);
+ tor_free(sys);
+ }
+
+ return pubsub_builder_finalize(builder, NULL);
+}
+
+static int
+cleanup_dispatcher(const struct testcase_t *testcase, void *dispatcher_)
+{
+ (void)testcase;
+ dispatch_t *dispatcher = dispatcher_;
+ dispatch_free(dispatcher);
+ return 1;
+}
+
+static const struct testcase_setup_t dispatcher_setup = {
+ setup_dispatcher, cleanup_dispatcher
+};
+
+static void
+test_pubsub_msg_minimal(void *arg)
+{
+ dispatch_t *d = arg;
+
+ tt_int_op(0, OP_EQ, msg_received_msg1);
+ SEND(msg1, "hello world");
+ tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
+
+ tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
+ tt_int_op(1, OP_EQ, msg_received_msg1); // we got the message!
+
+ done:
+ ;
+}
+
+static void
+test_pubsub_msg_send_to_stub(void *arg)
+{
+ dispatch_t *d = arg;
+
+ tt_int_op(0, OP_EQ, msg_received_msg2);
+ SEND(msg2, "hello silence");
+ tt_int_op(0, OP_EQ, msg_received_msg2); // hasn't actually arrived yet.
+
+ tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
+ tt_int_op(0, OP_EQ, msg_received_msg2); // doesn't arrive -- stub hook.
+
+ done:
+ ;
+}
+
+static void
+test_pubsub_msg_cancel_msgs(void *arg)
+{
+ dispatch_t *d = arg;
+
+ tt_int_op(0, OP_EQ, msg_received_msg1);
+ for (int i = 0; i < 100; ++i) {
+ SEND(msg1, "hello world");
+ }
+ tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
+
+ tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 10));
+ tt_int_op(10, OP_EQ, msg_received_msg1); // we got the message 10 times.
+
+ // At this point, the dispatcher will be freed with queued, undelivered
+ // messages.
+ done:
+ ;
+}
+
+struct alertfn_target {
+ dispatch_t *d;
+ channel_id_t ch;
+ int count;
+};
+static void
+alertfn_generic(dispatch_t *d, channel_id_t ch, void *arg)
+{
+ struct alertfn_target *t = arg;
+ tt_ptr_op(d, OP_EQ, t->d);
+ tt_int_op(ch, OP_EQ, t->ch);
+ ++t->count;
+ done:
+ ;
+}
+
+static void
+test_pubsub_msg_alertfns(void *arg)
+{
+ dispatch_t *d = arg;
+ struct alertfn_target ch1_a = { d, get_channel_id("main"), 0 };
+ struct alertfn_target ch2_a = { d, get_channel_id("other"), 0 };
+
+ tt_int_op(0, OP_EQ,
+ dispatch_set_alert_fn(d, get_channel_id("main"),
+ alertfn_generic, &ch1_a));
+ tt_int_op(0, OP_EQ,
+ dispatch_set_alert_fn(d, get_channel_id("other"),
+ alertfn_generic, &ch2_a));
+
+ SEND(msg3, "hello");
+ tt_int_op(ch1_a.count, OP_EQ, 1);
+ SEND(msg3, "world");
+ tt_int_op(ch1_a.count, OP_EQ, 1); // only the first message sends an alert
+ tt_int_op(ch2_a.count, OP_EQ, 0); // no alert for 'other'
+
+ SEND(msg4, "worse things happen in C");
+ tt_int_op(ch2_a.count, OP_EQ, 1);
+
+ // flush the first (main) channel...
+ tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
+ tt_int_op(6, OP_EQ, msg_received_msg3); // 3 subscribers, 2 instances.
+
+ // now that the main channel is flushed, sending another message on it
+ // starts another alert.
+ tt_int_op(ch1_a.count, OP_EQ, 1);
+ SEND(msg1, "plover");
+ tt_int_op(ch1_a.count, OP_EQ, 2);
+ tt_int_op(ch2_a.count, OP_EQ, 1);
+
+ done:
+ ;
+}
+
+/* try more than N_FAST_FNS hooks on msg5 */
+static void
+test_pubsub_msg_many_hooks(void *arg)
+{
+ dispatch_t *d = arg;
+ strings_received = smartlist_new();
+
+ tt_int_op(0, OP_EQ, msg_received_msg5);
+ SEND(msg5, "hello world");
+ tt_int_op(0, OP_EQ, msg_received_msg5);
+ tt_int_op(0, OP_EQ, smartlist_len(strings_received));
+
+ tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 100000));
+ tt_int_op(5, OP_EQ, msg_received_msg5);
+ tt_int_op(995, OP_EQ, smartlist_len(strings_received));
+
+ done:
+ SMARTLIST_FOREACH(strings_received, char *, s, tor_free(s));
+ smartlist_free(strings_received);
+}
+
+#define T(name) \
+ { #name, test_pubsub_msg_ ## name , TT_FORK, \
+ &dispatcher_setup, NULL }
+
+struct testcase_t pubsub_msg_tests[] = {
+ T(minimal),
+ T(send_to_stub),
+ T(cancel_msgs),
+ T(alertfns),
+ T(many_hooks),
+ END_OF_TESTCASES
+};