summaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/or/circuitbuild.c11
-rw-r--r--src/core/or/circuitlist.c33
-rw-r--r--src/core/or/connection_or.c28
-rw-r--r--src/core/or/ocirc_event.c112
-rw-r--r--src/core/or/ocirc_event.h35
-rw-r--r--src/core/or/orconn_event.c90
-rw-r--r--src/core/or/orconn_event.h31
7 files changed, 182 insertions, 158 deletions
diff --git a/src/core/or/circuitbuild.c b/src/core/or/circuitbuild.c
index 3a4e729429..ff809c01cf 100644
--- a/src/core/or/circuitbuild.c
+++ b/src/core/or/circuitbuild.c
@@ -522,14 +522,13 @@ origin_circuit_get_guard_state(origin_circuit_t *circ)
static void
circuit_chan_publish(const origin_circuit_t *circ, const channel_t *chan)
{
- ocirc_event_msg_t msg;
+ ocirc_chan_msg_t *msg = tor_malloc(sizeof(*msg));
- msg.type = OCIRC_MSGTYPE_CHAN;
- msg.u.chan.gid = circ->global_identifier;
- msg.u.chan.chan = chan->global_identifier;
- msg.u.chan.onehop = circ->build_state->onehop_tunnel;
+ msg->gid = circ->global_identifier;
+ msg->chan = chan->global_identifier;
+ msg->onehop = circ->build_state->onehop_tunnel;
- ocirc_event_publish(&msg);
+ ocirc_chan_publish(msg);
}
/** Start establishing the first hop of our circuit. Figure out what
diff --git a/src/core/or/circuitlist.c b/src/core/or/circuitlist.c
index ebbe7f0824..9ee9f93c99 100644
--- a/src/core/or/circuitlist.c
+++ b/src/core/or/circuitlist.c
@@ -496,17 +496,16 @@ int
circuit_event_status(origin_circuit_t *circ, circuit_status_event_t tp,
int reason_code)
{
- ocirc_event_msg_t msg;
+ ocirc_cevent_msg_t *msg = tor_malloc(sizeof(*msg));
tor_assert(circ);
- msg.type = OCIRC_MSGTYPE_CEVENT;
- msg.u.cevent.gid = circ->global_identifier;
- msg.u.cevent.evtype = tp;
- msg.u.cevent.reason = reason_code;
- msg.u.cevent.onehop = circ->build_state->onehop_tunnel;
+ msg->gid = circ->global_identifier;
+ msg->evtype = tp;
+ msg->reason = reason_code;
+ msg->onehop = circ->build_state->onehop_tunnel;
- ocirc_event_publish(&msg);
+ ocirc_cevent_publish(msg);
return control_event_circuit_status(circ, tp, reason_code);
}
@@ -514,26 +513,25 @@ circuit_event_status(origin_circuit_t *circ, circuit_status_event_t tp,
* Helper function to publish a state change message
*
* circuit_set_state() calls this to notify subscribers about a change
- * of the state of an origin circuit.
+ * of the state of an origin circuit. @a circ must be an origin
+ * circuit.
**/
static void
circuit_state_publish(const circuit_t *circ)
{
- ocirc_event_msg_t msg;
+ ocirc_state_msg_t *msg = tor_malloc(sizeof(*msg));
const origin_circuit_t *ocirc;
- if (!CIRCUIT_IS_ORIGIN(circ))
- return;
+ tor_assert(CIRCUIT_IS_ORIGIN(circ));
ocirc = CONST_TO_ORIGIN_CIRCUIT(circ);
/* Only inbound OR circuits can be in this state, not origin circuits. */
tor_assert(circ->state != CIRCUIT_STATE_ONIONSKIN_PENDING);
- msg.type = OCIRC_MSGTYPE_STATE;
- msg.u.state.gid = ocirc->global_identifier;
- msg.u.state.state = circ->state;
- msg.u.state.onehop = ocirc->build_state->onehop_tunnel;
+ msg->gid = ocirc->global_identifier;
+ msg->state = circ->state;
+ msg->onehop = ocirc->build_state->onehop_tunnel;
- ocirc_event_publish(&msg);
+ ocirc_state_publish(msg);
}
/** Change the state of <b>circ</b> to <b>state</b>, adding it to or removing
@@ -565,7 +563,8 @@ circuit_set_state(circuit_t *circ, uint8_t state)
if (state == CIRCUIT_STATE_GUARD_WAIT || state == CIRCUIT_STATE_OPEN)
tor_assert(!circ->n_chan_create_cell);
circ->state = state;
- circuit_state_publish(circ);
+ if (CIRCUIT_IS_ORIGIN(circ))
+ circuit_state_publish(circ);
}
/** Append to <b>out</b> all circuits in state CHAN_WAIT waiting for
diff --git a/src/core/or/connection_or.c b/src/core/or/connection_or.c
index 830e09fd54..4c93351e31 100644
--- a/src/core/or/connection_or.c
+++ b/src/core/or/connection_or.c
@@ -414,13 +414,12 @@ void
connection_or_event_status(or_connection_t *conn, or_conn_status_event_t tp,
int reason)
{
- orconn_event_msg_t msg;
+ orconn_status_msg_t *msg = tor_malloc(sizeof(*msg));
- msg.type = ORCONN_MSGTYPE_STATUS;
- msg.u.status.gid = conn->base_.global_identifier;
- msg.u.status.status = tp;
- msg.u.status.reason = reason;
- orconn_event_publish(&msg);
+ msg->gid = conn->base_.global_identifier;
+ msg->status = tp;
+ msg->reason = reason;
+ orconn_status_publish(msg);
control_event_or_conn_status(conn, tp, reason);
}
@@ -433,26 +432,25 @@ connection_or_event_status(or_connection_t *conn, or_conn_status_event_t tp,
static void
connection_or_state_publish(const or_connection_t *conn, uint8_t state)
{
- orconn_event_msg_t msg;
+ orconn_state_msg_t *msg = tor_malloc(sizeof(*msg));
- msg.type = ORCONN_MSGTYPE_STATE;
- msg.u.state.gid = conn->base_.global_identifier;
+ msg->gid = conn->base_.global_identifier;
if (conn->is_pt) {
/* Do extra decoding because conn->proxy_type indicates the proxy
* protocol that tor uses to talk with the transport plugin,
* instead of PROXY_PLUGGABLE. */
tor_assert_nonfatal(conn->proxy_type != PROXY_NONE);
- msg.u.state.proxy_type = PROXY_PLUGGABLE;
+ msg->proxy_type = PROXY_PLUGGABLE;
} else {
- msg.u.state.proxy_type = conn->proxy_type;
+ msg->proxy_type = conn->proxy_type;
}
- msg.u.state.state = state;
+ msg->state = state;
if (conn->chan) {
- msg.u.state.chan = TLS_CHAN_TO_BASE(conn->chan)->global_identifier;
+ msg->chan = TLS_CHAN_TO_BASE(conn->chan)->global_identifier;
} else {
- msg.u.state.chan = 0;
+ msg->chan = 0;
}
- orconn_event_publish(&msg);
+ orconn_state_publish(msg);
}
/** Call this to change or_connection_t states, so the owning channel_tls_t can
diff --git a/src/core/or/ocirc_event.c b/src/core/or/ocirc_event.c
index 4a6fc748c9..3cb9147134 100644
--- a/src/core/or/ocirc_event.c
+++ b/src/core/or/ocirc_event.c
@@ -26,59 +26,103 @@
#include "core/or/origin_circuit_st.h"
#include "lib/subsys/subsys.h"
-/** List of subscribers */
-static smartlist_t *ocirc_event_rcvrs;
+DECLARE_PUBLISH(ocirc_state);
+DECLARE_PUBLISH(ocirc_chan);
+DECLARE_PUBLISH(ocirc_cevent);
+
+static void
+ocirc_event_free(msg_aux_data_t u)
+{
+ tor_free_(u.ptr);
+}
+
+static char *
+ocirc_state_fmt(msg_aux_data_t u)
+{
+ ocirc_state_msg_t *msg = (ocirc_state_msg_t *)u.ptr;
+ char *s = NULL;
+
+ tor_asprintf(&s, "<gid=%"PRIu32" state=%d onehop=%d>",
+ msg->gid, msg->state, msg->onehop);
+ return s;
+}
+
+static char *
+ocirc_chan_fmt(msg_aux_data_t u)
+{
+ ocirc_chan_msg_t *msg = (ocirc_chan_msg_t *)u.ptr;
+ char *s = NULL;
+
+ tor_asprintf(&s, "<gid=%"PRIu32" chan=%"PRIu64" onehop=%d>",
+ msg->gid, msg->chan, msg->onehop);
+ return s;
+}
+
+static char *
+ocirc_cevent_fmt(msg_aux_data_t u)
+{
+ ocirc_cevent_msg_t *msg = (ocirc_cevent_msg_t *)u.ptr;
+ char *s = NULL;
+
+ tor_asprintf(&s, "<gid=%"PRIu32" evtype=%d reason=%d onehop=%d>",
+ msg->gid, msg->evtype, msg->reason, msg->onehop);
+ return s;
+}
+
+static dispatch_typefns_t ocirc_state_fns = {
+ .free_fn = ocirc_event_free,
+ .fmt_fn = ocirc_state_fmt,
+};
+
+static dispatch_typefns_t ocirc_chan_fns = {
+ .free_fn = ocirc_event_free,
+ .fmt_fn = ocirc_chan_fmt,
+};
+
+static dispatch_typefns_t ocirc_cevent_fns = {
+ .free_fn = ocirc_event_free,
+ .fmt_fn = ocirc_cevent_fmt,
+};
-/** Initialize subscriber list */
static int
-ocirc_event_init(void)
+ocirc_add_pubsub(struct pubsub_connector_t *connector)
{
- ocirc_event_rcvrs = smartlist_new();
+ if (DISPATCH_REGISTER_TYPE(connector, ocirc_state, &ocirc_state_fns))
+ return -1;
+ if (DISPATCH_REGISTER_TYPE(connector, ocirc_chan, &ocirc_chan_fns))
+ return -1;
+ if (DISPATCH_REGISTER_TYPE(connector, ocirc_cevent, &ocirc_cevent_fns))
+ return -1;
+ if (DISPATCH_ADD_PUB(connector, ocirc, ocirc_state))
+ return -1;
+ if (DISPATCH_ADD_PUB(connector, ocirc, ocirc_chan))
+ return -1;
+ if (DISPATCH_ADD_PUB(connector, ocirc, ocirc_cevent))
+ return -1;
return 0;
}
-/** Free subscriber list */
-static void
-ocirc_event_fini(void)
+void
+ocirc_state_publish(ocirc_state_msg_t *msg)
{
- smartlist_free(ocirc_event_rcvrs);
+ PUBLISH(ocirc_state, msg);
}
-/**
- * Subscribe to messages about origin circuit events
- *
- * Register a callback function to receive messages about origin
- * circuits. The publisher calls this function synchronously.
- **/
void
-ocirc_event_subscribe(ocirc_event_rcvr_t fn)
+ocirc_chan_publish(ocirc_chan_msg_t *msg)
{
- tor_assert(fn);
- /* Don't duplicate subscriptions. */
- if (smartlist_contains(ocirc_event_rcvrs, fn))
- return;
-
- smartlist_add(ocirc_event_rcvrs, fn);
+ PUBLISH(ocirc_chan, msg);
}
-/**
- * Publish a message about OR connection events
- *
- * This calls the subscriber receiver function synchronously.
- **/
void
-ocirc_event_publish(const ocirc_event_msg_t *msg)
+ocirc_cevent_publish(ocirc_cevent_msg_t *msg)
{
- SMARTLIST_FOREACH_BEGIN(ocirc_event_rcvrs, ocirc_event_rcvr_t, fn) {
- tor_assert(fn);
- (*fn)(msg);
- } SMARTLIST_FOREACH_END(fn);
+ PUBLISH(ocirc_cevent, msg);
}
const subsys_fns_t sys_ocirc_event = {
.name = "ocirc_event",
.supported = true,
.level = -32,
- .initialize = ocirc_event_init,
- .shutdown = ocirc_event_fini,
+ .add_pubsub = ocirc_add_pubsub,
};
diff --git a/src/core/or/ocirc_event.h b/src/core/or/ocirc_event.h
index 59ec9e27cb..8e9494874f 100644
--- a/src/core/or/ocirc_event.h
+++ b/src/core/or/ocirc_event.h
@@ -12,6 +12,7 @@
#include <stdbool.h>
#include "lib/cc/torint.h"
+#include "lib/pubsub/pubsub.h"
/** Used to indicate the type of a circuit event passed to the controller.
* The various types are defined in control-spec.txt */
@@ -30,6 +31,8 @@ typedef struct ocirc_state_msg_t {
bool onehop; /**< one-hop circuit? */
} ocirc_state_msg_t;
+DECLARE_MESSAGE(ocirc_state, ocirc_state, ocirc_state_msg_t *);
+
/**
* Message when a channel gets associated to a circuit.
*
@@ -44,6 +47,8 @@ typedef struct ocirc_chan_msg_t {
bool onehop; /**< one-hop circuit? */
} ocirc_chan_msg_t;
+DECLARE_MESSAGE(ocirc_chan, ocirc_chan, ocirc_chan_msg_t *);
+
/**
* Message for origin circuit status event
*
@@ -56,34 +61,12 @@ typedef struct ocirc_cevent_msg_t {
bool onehop; /**< one-hop circuit? */
} ocirc_cevent_msg_t;
-/** Discriminant values for origin circuit event message */
-typedef enum ocirc_msgtype_t {
- OCIRC_MSGTYPE_STATE,
- OCIRC_MSGTYPE_CHAN,
- OCIRC_MSGTYPE_CEVENT,
-} ocirc_msgtype_t;
-
-/** Discriminated union for the actual message */
-typedef struct ocirc_event_msg_t {
- int type;
- union {
- ocirc_state_msg_t state;
- ocirc_chan_msg_t chan;
- ocirc_cevent_msg_t cevent;
- } u;
-} ocirc_event_msg_t;
-
-/**
- * Receiver function pointer for origin circuit subscribers
- *
- * This function gets called synchronously by the publisher.
- **/
-typedef void (*ocirc_event_rcvr_t)(const ocirc_event_msg_t *);
-
-void ocirc_event_subscribe(ocirc_event_rcvr_t fn);
+DECLARE_MESSAGE(ocirc_cevent, ocirc_cevent, ocirc_cevent_msg_t *);
#ifdef OCIRC_EVENT_PRIVATE
-void ocirc_event_publish(const ocirc_event_msg_t *msg);
+void ocirc_state_publish(ocirc_state_msg_t *msg);
+void ocirc_chan_publish(ocirc_chan_msg_t *msg);
+void ocirc_cevent_publish(ocirc_cevent_msg_t *msg);
#endif
#endif /* !defined(TOR_OCIRC_EVENT_H) */
diff --git a/src/core/or/orconn_event.c b/src/core/or/orconn_event.c
index 9fb34bd1ff..86f112fc09 100644
--- a/src/core/or/orconn_event.c
+++ b/src/core/or/orconn_event.c
@@ -17,65 +17,83 @@
**/
#include "core/or/or.h"
+#include "lib/pubsub/pubsub.h"
#include "lib/subsys/subsys.h"
#define ORCONN_EVENT_PRIVATE
#include "core/or/orconn_event.h"
#include "core/or/orconn_event_sys.h"
-/** List of subscribers */
-static smartlist_t *orconn_event_rcvrs;
+DECLARE_PUBLISH(orconn_state);
+DECLARE_PUBLISH(orconn_status);
-/** Initialize subscriber list */
-static int
-orconn_event_init(void)
+static void
+orconn_event_free(msg_aux_data_t u)
{
- orconn_event_rcvrs = smartlist_new();
- return 0;
+ tor_free_(u.ptr);
}
-/** Free subscriber list */
-static void
-orconn_event_fini(void)
+static char *
+orconn_state_fmt(msg_aux_data_t u)
{
- smartlist_free(orconn_event_rcvrs);
+ orconn_state_msg_t *msg = (orconn_state_msg_t *)u.ptr;
+ char *s = NULL;
+
+ tor_asprintf(&s, "<gid=%"PRIu64" chan=%"PRIu64" proxy_type=%d state=%d>",
+ msg->gid, msg->chan, msg->proxy_type, msg->state);
+ return s;
}
-/**
- * Subscribe to messages about OR connection events
- *
- * Register a callback function to receive messages about ORCONNs.
- * The publisher calls this function synchronously.
- **/
-void
-orconn_event_subscribe(orconn_event_rcvr_t fn)
+static char *
+orconn_status_fmt(msg_aux_data_t u)
{
- tor_assert(fn);
- /* Don't duplicate subscriptions. */
- if (smartlist_contains(orconn_event_rcvrs, fn))
- return;
+ orconn_status_msg_t *msg = (orconn_status_msg_t *)u.ptr;
+ char *s = NULL;
- smartlist_add(orconn_event_rcvrs, fn);
+ tor_asprintf(&s, "<gid=%"PRIu64" status=%d reason=%d>",
+ msg->gid, msg->status, msg->reason);
+ return s;
+}
+
+static dispatch_typefns_t orconn_state_fns = {
+ .free_fn = orconn_event_free,
+ .fmt_fn = orconn_state_fmt,
+};
+
+static dispatch_typefns_t orconn_status_fns = {
+ .free_fn = orconn_event_free,
+ .fmt_fn = orconn_status_fmt,
+};
+
+static int
+orconn_add_pubsub(struct pubsub_connector_t *connector)
+{
+ if (DISPATCH_REGISTER_TYPE(connector, orconn_state, &orconn_state_fns))
+ return -1;
+ if (DISPATCH_REGISTER_TYPE(connector, orconn_status, &orconn_status_fns))
+ return -1;
+ if (DISPATCH_ADD_PUB(connector, orconn, orconn_state) != 0)
+ return -1;
+ if (DISPATCH_ADD_PUB(connector, orconn, orconn_status) != 0)
+ return -1;
+ return 0;
+}
+
+void
+orconn_state_publish(orconn_state_msg_t *msg)
+{
+ PUBLISH(orconn_state, msg);
}
-/**
- * Publish a message about OR connection events
- *
- * This calls the subscriber receiver function synchronously.
- **/
void
-orconn_event_publish(const orconn_event_msg_t *msg)
+orconn_status_publish(orconn_status_msg_t *msg)
{
- SMARTLIST_FOREACH_BEGIN(orconn_event_rcvrs, orconn_event_rcvr_t, fn) {
- tor_assert(fn);
- (*fn)(msg);
- } SMARTLIST_FOREACH_END(fn);
+ PUBLISH(orconn_status, msg);
}
const subsys_fns_t sys_orconn_event = {
.name = "orconn_event",
.supported = true,
.level = -33,
- .initialize = orconn_event_init,
- .shutdown = orconn_event_fini,
+ .add_pubsub = orconn_add_pubsub,
};
diff --git a/src/core/or/orconn_event.h b/src/core/or/orconn_event.h
index d6635793db..fb67a7d183 100644
--- a/src/core/or/orconn_event.h
+++ b/src/core/or/orconn_event.h
@@ -16,6 +16,8 @@
#ifndef TOR_ORCONN_EVENT_H
#define TOR_ORCONN_EVENT_H
+#include "lib/pubsub/pubsub.h"
+
/**
* @name States of OR connections
*
@@ -62,12 +64,6 @@ typedef enum or_conn_status_event_t {
OR_CONN_EVENT_NEW = 4,
} or_conn_status_event_t;
-/** Discriminant values for orconn event message */
-typedef enum orconn_msgtype_t {
- ORCONN_MSGTYPE_STATE,
- ORCONN_MSGTYPE_STATUS,
-} orconn_msgtype_t;
-
/**
* Message for orconn state update
*
@@ -83,6 +79,8 @@ typedef struct orconn_state_msg_t {
uint8_t state; /**< new connection state */
} orconn_state_msg_t;
+DECLARE_MESSAGE(orconn_state, orconn_state, orconn_state_msg_t *);
+
/**
* Message for orconn status event
*
@@ -95,26 +93,11 @@ typedef struct orconn_status_msg_t {
int reason; /**< reason */
} orconn_status_msg_t;
-/** Discriminated union for the actual message */
-typedef struct orconn_event_msg_t {
- int type;
- union {
- orconn_state_msg_t state;
- orconn_status_msg_t status;
- } u;
-} orconn_event_msg_t;
-
-/**
- * Receiver function pointer for OR subscribers
- *
- * This function gets called synchronously by the publisher.
- **/
-typedef void (*orconn_event_rcvr_t)(const orconn_event_msg_t *);
-
-void orconn_event_subscribe(orconn_event_rcvr_t);
+DECLARE_MESSAGE(orconn_status, orconn_status, orconn_status_msg_t *);
#ifdef ORCONN_EVENT_PRIVATE
-void orconn_event_publish(const orconn_event_msg_t *);
+void orconn_state_publish(orconn_state_msg_t *);
+void orconn_status_publish(orconn_status_msg_t *);
#endif
#endif /* !defined(TOR_ORCONN_EVENT_H) */