aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgabi-250 <gabi@torproject.org>2024-04-25 15:45:51 +0000
committergabi-250 <gabi@torproject.org>2024-04-25 15:45:51 +0000
commitf4b5a00bab127581b1781c1511b9b5bf8fe80b70 (patch)
tree4e87859998e00ca96c457ca7a5625a3e06767075
parentfd0f03eb28fb76d29e19960039a40857d16996cb (diff)
parent14f0ee82d1f8ffbbbead3e72340d2c98f05a73fc (diff)
downloadarti-f4b5a00bab127581b1781c1511b9b5bf8fe80b70.tar.gz
arti-f4b5a00bab127581b1781c1511b9b5bf8fe80b70.zip
Merge branch 'b1373' into 'main'
Circuit reactor: document some requirements and assumptions Closes #1373 See merge request tpo/core/arti!2089
-rw-r--r--crates/tor-proto/src/circuit/reactor.rs52
1 files changed, 42 insertions, 10 deletions
diff --git a/crates/tor-proto/src/circuit/reactor.rs b/crates/tor-proto/src/circuit/reactor.rs
index edd4b86a6..3242b1c00 100644
--- a/crates/tor-proto/src/circuit/reactor.rs
+++ b/crates/tor-proto/src/circuit/reactor.rs
@@ -934,6 +934,17 @@ impl Reactor {
i,
cell
);
+ // We know the stream has a non-empty SENDME
+ // window because we accept at most one cell per
+ // stream below, and only when the relevant
+ // stream has a non-empty window and after
+ // clearing the backlog here.
+ //
+ // TODO prop340: We need to be careful here when
+ // adding fragmentation; e.g. this argument
+ // fails to hold if we allow later breaking a
+ // DATA message into multiple messages and cells
+ // for packing.
self.send_relay_cell(cx, hop_num, early, cell)?;
if !self.channel.poll_ready(cx)? {
break 'outer;
@@ -953,7 +964,18 @@ impl Reactor {
// Do the stream and hop send windows allow us to obtain and
// send something?
//
- // FIXME(eta): not everything counts toward congestion control!
+ // NOTE: not everything counts toward congestion
+ // control. However, we can't easily remove
+ // this check:
+ // * We need to be careful not to buffer more
+ // than ONE message per stream for the call to
+ // `send_relay_cell` above, and potentially
+ // closing the stream below, to be correct.
+ // * We need to be careful about allowing
+ // messages that *don't* count to be sent before
+ // messages that *do*; e.g. we wouldn't want to
+ // accept and send an END message on a stream where we
+ // still have DATA messages queued.
if send_window.window() > 0 && hop.sendwindow.window() > 0 {
match Pin::new(rx).poll_next(cx) {
Poll::Ready(Some(m)) => {
@@ -964,8 +986,12 @@ impl Reactor {
}
Poll::Ready(None) => {
// Stream receiver was dropped; close the stream.
- // We can't close it here though due to borrowck; that
- // will happen later.
+ //
+ // We know there are no queued messages for the stream
+ // since we already flushed above.
+ //
+ // We can't close it here due to
+ // borrowck; that will happen later.
streams_to_close.push((hop_num, id));
}
Poll::Pending => {}
@@ -1481,7 +1507,13 @@ impl Reactor {
Ok(())
}
- /// Encode the relay cell `cell`, encrypt it, and send it to the 'hop'th hop.
+ /// Encode `msg`, encrypt it, and send it to the 'hop'th hop.
+ ///
+ /// If there is insufficient outgoing *circuit-level* SENDME window, the
+ /// `msg` is enqueued in the hop's `outbound` queue instead.
+ ///
+ /// If there is insufficient outgoing *stream-level* SENDME window,
+ /// an error is returned instead.
///
/// Does not check whether the cell is well-formed or reasonable.
fn send_relay_cell(
@@ -1489,10 +1521,10 @@ impl Reactor {
cx: &mut Context<'_>,
hop: HopNum,
early: bool,
- cell: AnyRelayMsgOuter,
+ msg: AnyRelayMsgOuter,
) -> Result<()> {
- let c_t_w = sendme::cmd_counts_towards_windows(cell.cmd());
- let stream_id = cell.stream_id();
+ let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd());
+ let stream_id = msg.stream_id();
// Check whether the hop send window is empty, if this cell counts towards windows.
// NOTE(eta): It is imperative this happens *before* calling encrypt() below, otherwise
// we'll have cells rejected due to a protocol violation! (Cells have to be
@@ -1507,13 +1539,13 @@ impl Reactor {
"{}: having to use onto hop {} queue for cell: {:?}",
self.unique_id,
hop.display(),
- cell
+ msg
);
- circhop.outbound.push_back((early, cell));
+ circhop.outbound.push_back((early, msg));
return Ok(());
}
}
- let mut body: RelayCellBody = cell
+ let mut body: RelayCellBody = msg
.encode(&mut rand::thread_rng())
.map_err(|e| Error::from_cell_enc(e, "relay cell body"))?
.into();