aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Jackson <ijackson@chiark.greenend.org.uk>2023-09-06 16:16:05 +0100
committerIan Jackson <ijackson@chiark.greenend.org.uk>2023-09-18 10:33:05 +0100
commitcca7834dab4596fef432a1bb24eb604f55cd5026 (patch)
tree5710118ca71f5ede78ea3648ac5f9c4b82043068
parent1e7c314d1c89a09ec6272e1b68d9f3d3ded805f6 (diff)
downloadarti-cca7834dab4596fef432a1bb24eb604f55cd5026.tar.gz
arti-cca7834dab4596fef432a1bb24eb604f55cd5026.zip
tor-hsservice: ipt_set: principal implementation
-rw-r--r--crates/tor-hsservice/src/ipt_set.rs105
1 files changed, 91 insertions, 14 deletions
diff --git a/crates/tor-hsservice/src/ipt_set.rs b/crates/tor-hsservice/src/ipt_set.rs
index 79e80e608..93c7315ef 100644
--- a/crates/tor-hsservice/src/ipt_set.rs
+++ b/crates/tor-hsservice/src/ipt_set.rs
@@ -1,9 +1,14 @@
//! IPT set - the principal API between the IPT manager and publisher
use std::ops::DerefMut;
+use std::sync::Arc;
+use std::sync::{Mutex, MutexGuard};
use std::time::{Duration, Instant};
-use void::Void;
+use futures::channel::mpsc;
+use futures::StreamExt as _;
+
+use derive_more::{Deref, DerefMut};
use crate::IptLocalId;
@@ -101,8 +106,15 @@ pub(crate) const IPT_PUBLISH_EXPIRY_SLOP: Duration = Duration::from_secs(300); /
/// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`.
#[derive(Debug)]
pub(crate) struct IptsManagerView {
- /// TODO HSS
- todo: Void,
+ /// Actual shared data
+ shared: Shared,
+
+ /// Notification sender
+ ///
+ /// We don't wrap the state in a postage::watch,
+ /// because the publisher needs to be able to mutably borrow the data
+ /// without re-notifying itself when it drops the guard.
+ notify: mpsc::Sender<()>,
}
/// Shared view of introduction points - IPT publisher's view
@@ -110,14 +122,55 @@ pub(crate) struct IptsManagerView {
/// This is the publishers's end of a bidirectional "channel",
/// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`.
pub(crate) struct IptsPublisherView {
- /// TODO HSS
- todo: Void,
+ /// Actual shared data
+ shared: Shared,
+
+ /// Notification receiver
+ notify: mpsc::Receiver<()>,
+}
+
+/// Core shared state
+type Shared = Arc<Mutex<PublishIptSet>>;
+
+/// Mutex guard that will notify when dropped
+///
+/// Returned by [`IptsManagerView::borrow_for_update`]
+#[derive(Deref, DerefMut)]
+struct NotifyingBorrow<'v> {
+ /// Lock guard
+ #[deref(forward)]
+ #[deref_mut(forward)]
+ guard: MutexGuard<'v, PublishIptSet>,
+
+ /// To be notified on drop
+ notify: &'v mut mpsc::Sender<()>,
}
/// Create a new shared state channel for the publication instructions
pub(crate) fn ipts_channel(initial_state: PublishIptSet) -> (IptsManagerView, IptsPublisherView) {
- drop(initial_state); // clippy::needless_pass_by_value
- todo!()
+ let shared = Arc::new(Mutex::new(initial_state));
+ // Zero buffer is right. Docs for `mpsc::channel` say:
+ // each sender gets a guaranteed slot in the channel capacity,
+ // and on top of that there are buffer “first come, first serve” slots
+ // We only have one sender and only ever want one outstanding,
+ // since we can (and would like to) coalesce notifications.
+ let (tx, rx) = mpsc::channel(0);
+ (
+ IptsManagerView {
+ shared: shared.clone(),
+ notify: tx,
+ },
+ IptsPublisherView { shared, notify: rx },
+ )
+}
+
+/// Lock the shared state and obtain a lock guard
+///
+/// Does not do any notification.
+fn lock_shared(shared: &Shared) -> MutexGuard<PublishIptSet> {
+ // Propagating panics is fine since if either the manager or the publisher crashes,
+ // the other one cannot survive.
+ shared.lock().expect("IPT set shared state poisoned")
}
impl IptsManagerView {
@@ -128,10 +181,29 @@ impl IptsManagerView {
/// The returned value is a lock guard.
/// (It is not `Send` so cannot be held across await points.)
/// The publisher will be notified when it is dropped.
- #[allow(unreachable_code)] // TODO HSS remove
pub(crate) fn borrow_for_update(&mut self) -> impl DerefMut<Target = PublishIptSet> + '_ {
- // assist type inference with bogus type
- std::convert::identity::<Box<PublishIptSet>>(void::unreachable(self.todo))
+ let guard = lock_shared(&self.shared);
+ NotifyingBorrow {
+ guard,
+ notify: &mut self.notify,
+ }
+ }
+}
+
+impl Drop for NotifyingBorrow<'_> {
+ fn drop(&mut self) {
+ // Channel full? Well, then the receiver is indeed going to wake up, so fine
+ // Channel disconnected? The publisher has crashed or terminated,
+ // but we are not in a position to fail and shut down the establisher.
+ // If our HS is shutting down, the manager will be shut down by other means.
+ let _: Result<(), mpsc::TrySendError<_>> = self.notify.try_send(());
+
+ // Now the fields will be dropped, includeing `guard`.
+ // I.e. the mutex gets unlocked. This means we notify the publisher
+ // (which might make it wake up on another thread) just *before*
+ // we release the lock, rather than just after.
+ // This is slightly suboptimal but doesn't matter here.
+ // To do better, we'd need to make the guard into an Option.
}
}
@@ -149,7 +221,14 @@ impl IptsPublisherView {
/// * `None` if the manager is shutting down and the publisher should shut down too
/// * `Some(Err(..))` if a fatal error occurred
pub(crate) async fn await_update(&mut self) -> Option<Result<(), crate::FatalError>> {
- void::unreachable(self.todo)
+ // Cancellation safety:
+ //
+ // We're using mpsc::Receiver's implementation of Stream, via StreamExt.
+ // Stream::next() must be cancellation safe or it would be lossy everywhere.
+ // So it is OK to create the future from next, here, and possibly discard it
+ // before it becomes Ready.
+ let () = self.notify.next().await?;
+ Some(Ok(()))
}
/// Look at the list of introduction points to publish
@@ -160,10 +239,8 @@ impl IptsPublisherView {
///
/// The returned value is a lock guard.
/// (It is not `Send` so cannot be held across await points.)
- #[allow(unreachable_code)] // TODO HSS remove
pub(crate) fn borrow_for_publish(&self) -> impl DerefMut<Target = PublishIptSet> + '_ {
- // assist type inference with bogus type
- std::convert::identity::<Box<PublishIptSet>>(void::unreachable(self.todo))
+ lock_shared(&self.shared)
}
}