diff options
author | Ian Jackson <ijackson@chiark.greenend.org.uk> | 2023-09-06 16:16:05 +0100 |
---|---|---|
committer | Ian Jackson <ijackson@chiark.greenend.org.uk> | 2023-09-18 10:33:05 +0100 |
commit | cca7834dab4596fef432a1bb24eb604f55cd5026 (patch) | |
tree | 5710118ca71f5ede78ea3648ac5f9c4b82043068 | |
parent | 1e7c314d1c89a09ec6272e1b68d9f3d3ded805f6 (diff) | |
download | arti-cca7834dab4596fef432a1bb24eb604f55cd5026.tar.gz arti-cca7834dab4596fef432a1bb24eb604f55cd5026.zip |
tor-hsservice: ipt_set: principal implementation
-rw-r--r-- | crates/tor-hsservice/src/ipt_set.rs | 105 |
1 files changed, 91 insertions, 14 deletions
diff --git a/crates/tor-hsservice/src/ipt_set.rs b/crates/tor-hsservice/src/ipt_set.rs index 79e80e608..93c7315ef 100644 --- a/crates/tor-hsservice/src/ipt_set.rs +++ b/crates/tor-hsservice/src/ipt_set.rs @@ -1,9 +1,14 @@ //! IPT set - the principal API between the IPT manager and publisher use std::ops::DerefMut; +use std::sync::Arc; +use std::sync::{Mutex, MutexGuard}; use std::time::{Duration, Instant}; -use void::Void; +use futures::channel::mpsc; +use futures::StreamExt as _; + +use derive_more::{Deref, DerefMut}; use crate::IptLocalId; @@ -101,8 +106,15 @@ pub(crate) const IPT_PUBLISH_EXPIRY_SLOP: Duration = Duration::from_secs(300); / /// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`. #[derive(Debug)] pub(crate) struct IptsManagerView { - /// TODO HSS - todo: Void, + /// Actual shared data + shared: Shared, + + /// Notification sender + /// + /// We don't wrap the state in a postage::watch, + /// because the publisher needs to be able to mutably borrow the data + /// without re-notifying itself when it drops the guard. + notify: mpsc::Sender<()>, } /// Shared view of introduction points - IPT publisher's view @@ -110,14 +122,55 @@ pub(crate) struct IptsManagerView { /// This is the publishers's end of a bidirectional "channel", /// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`. pub(crate) struct IptsPublisherView { - /// TODO HSS - todo: Void, + /// Actual shared data + shared: Shared, + + /// Notification receiver + notify: mpsc::Receiver<()>, +} + +/// Core shared state +type Shared = Arc<Mutex<PublishIptSet>>; + +/// Mutex guard that will notify when dropped +/// +/// Returned by [`IptsManagerView::borrow_for_update`] +#[derive(Deref, DerefMut)] +struct NotifyingBorrow<'v> { + /// Lock guard + #[deref(forward)] + #[deref_mut(forward)] + guard: MutexGuard<'v, PublishIptSet>, + + /// To be notified on drop + notify: &'v mut mpsc::Sender<()>, } /// Create a new shared state channel for the publication instructions pub(crate) fn ipts_channel(initial_state: PublishIptSet) -> (IptsManagerView, IptsPublisherView) { - drop(initial_state); // clippy::needless_pass_by_value - todo!() + let shared = Arc::new(Mutex::new(initial_state)); + // Zero buffer is right. Docs for `mpsc::channel` say: + // each sender gets a guaranteed slot in the channel capacity, + // and on top of that there are buffer “first come, first serve” slots + // We only have one sender and only ever want one outstanding, + // since we can (and would like to) coalesce notifications. + let (tx, rx) = mpsc::channel(0); + ( + IptsManagerView { + shared: shared.clone(), + notify: tx, + }, + IptsPublisherView { shared, notify: rx }, + ) +} + +/// Lock the shared state and obtain a lock guard +/// +/// Does not do any notification. +fn lock_shared(shared: &Shared) -> MutexGuard<PublishIptSet> { + // Propagating panics is fine since if either the manager or the publisher crashes, + // the other one cannot survive. + shared.lock().expect("IPT set shared state poisoned") } impl IptsManagerView { @@ -128,10 +181,29 @@ impl IptsManagerView { /// The returned value is a lock guard. /// (It is not `Send` so cannot be held across await points.) /// The publisher will be notified when it is dropped. - #[allow(unreachable_code)] // TODO HSS remove pub(crate) fn borrow_for_update(&mut self) -> impl DerefMut<Target = PublishIptSet> + '_ { - // assist type inference with bogus type - std::convert::identity::<Box<PublishIptSet>>(void::unreachable(self.todo)) + let guard = lock_shared(&self.shared); + NotifyingBorrow { + guard, + notify: &mut self.notify, + } + } +} + +impl Drop for NotifyingBorrow<'_> { + fn drop(&mut self) { + // Channel full? Well, then the receiver is indeed going to wake up, so fine + // Channel disconnected? The publisher has crashed or terminated, + // but we are not in a position to fail and shut down the establisher. + // If our HS is shutting down, the manager will be shut down by other means. + let _: Result<(), mpsc::TrySendError<_>> = self.notify.try_send(()); + + // Now the fields will be dropped, includeing `guard`. + // I.e. the mutex gets unlocked. This means we notify the publisher + // (which might make it wake up on another thread) just *before* + // we release the lock, rather than just after. + // This is slightly suboptimal but doesn't matter here. + // To do better, we'd need to make the guard into an Option. } } @@ -149,7 +221,14 @@ impl IptsPublisherView { /// * `None` if the manager is shutting down and the publisher should shut down too /// * `Some(Err(..))` if a fatal error occurred pub(crate) async fn await_update(&mut self) -> Option<Result<(), crate::FatalError>> { - void::unreachable(self.todo) + // Cancellation safety: + // + // We're using mpsc::Receiver's implementation of Stream, via StreamExt. + // Stream::next() must be cancellation safe or it would be lossy everywhere. + // So it is OK to create the future from next, here, and possibly discard it + // before it becomes Ready. + let () = self.notify.next().await?; + Some(Ok(())) } /// Look at the list of introduction points to publish @@ -160,10 +239,8 @@ impl IptsPublisherView { /// /// The returned value is a lock guard. /// (It is not `Send` so cannot be held across await points.) - #[allow(unreachable_code)] // TODO HSS remove pub(crate) fn borrow_for_publish(&self) -> impl DerefMut<Target = PublishIptSet> + '_ { - // assist type inference with bogus type - std::convert::identity::<Box<PublishIptSet>>(void::unreachable(self.todo)) + lock_shared(&self.shared) } } |