diff options
author | gabi-250 <gabi@torproject.org> | 2024-04-25 15:45:51 +0000 |
---|---|---|
committer | gabi-250 <gabi@torproject.org> | 2024-04-25 15:45:51 +0000 |
commit | f4b5a00bab127581b1781c1511b9b5bf8fe80b70 (patch) | |
tree | 4e87859998e00ca96c457ca7a5625a3e06767075 | |
parent | fd0f03eb28fb76d29e19960039a40857d16996cb (diff) | |
parent | 14f0ee82d1f8ffbbbead3e72340d2c98f05a73fc (diff) | |
download | arti-f4b5a00bab127581b1781c1511b9b5bf8fe80b70.tar.gz arti-f4b5a00bab127581b1781c1511b9b5bf8fe80b70.zip |
Merge branch 'b1373' into 'main'
Circuit reactor: document some requirements and assumptions
Closes #1373
See merge request tpo/core/arti!2089
-rw-r--r-- | crates/tor-proto/src/circuit/reactor.rs | 52 |
1 files changed, 42 insertions, 10 deletions
diff --git a/crates/tor-proto/src/circuit/reactor.rs b/crates/tor-proto/src/circuit/reactor.rs index edd4b86a6..3242b1c00 100644 --- a/crates/tor-proto/src/circuit/reactor.rs +++ b/crates/tor-proto/src/circuit/reactor.rs @@ -934,6 +934,17 @@ impl Reactor { i, cell ); + // We know the stream has a non-empty SENDME + // window because we accept at most one cell per + // stream below, and only when the relevant + // stream has a non-empty window and after + // clearing the backlog here. + // + // TODO prop340: We need to be careful here when + // adding fragmentation; e.g. this argument + // fails to hold if we allow later breaking a + // DATA message into multiple messages and cells + // for packing. self.send_relay_cell(cx, hop_num, early, cell)?; if !self.channel.poll_ready(cx)? { break 'outer; @@ -953,7 +964,18 @@ impl Reactor { // Do the stream and hop send windows allow us to obtain and // send something? // - // FIXME(eta): not everything counts toward congestion control! + // NOTE: not everything counts toward congestion + // control. However, we can't easily remove + // this check: + // * We need to be careful not to buffer more + // than ONE message per stream for the call to + // `send_relay_cell` above, and potentially + // closing the stream below, to be correct. + // * We need to be careful about allowing + // messages that *don't* count to be sent before + // messages that *do*; e.g. we wouldn't want to + // accept and send an END message on a stream where we + // still have DATA messages queued. if send_window.window() > 0 && hop.sendwindow.window() > 0 { match Pin::new(rx).poll_next(cx) { Poll::Ready(Some(m)) => { @@ -964,8 +986,12 @@ impl Reactor { } Poll::Ready(None) => { // Stream receiver was dropped; close the stream. - // We can't close it here though due to borrowck; that - // will happen later. + // + // We know there are no queued messages for the stream + // since we already flushed above. + // + // We can't close it here due to + // borrowck; that will happen later. streams_to_close.push((hop_num, id)); } Poll::Pending => {} @@ -1481,7 +1507,13 @@ impl Reactor { Ok(()) } - /// Encode the relay cell `cell`, encrypt it, and send it to the 'hop'th hop. + /// Encode `msg`, encrypt it, and send it to the 'hop'th hop. + /// + /// If there is insufficient outgoing *circuit-level* SENDME window, the + /// `msg` is enqueued in the hop's `outbound` queue instead. + /// + /// If there is insufficient outgoing *stream-level* SENDME window, + /// an error is returned instead. /// /// Does not check whether the cell is well-formed or reasonable. fn send_relay_cell( @@ -1489,10 +1521,10 @@ impl Reactor { cx: &mut Context<'_>, hop: HopNum, early: bool, - cell: AnyRelayMsgOuter, + msg: AnyRelayMsgOuter, ) -> Result<()> { - let c_t_w = sendme::cmd_counts_towards_windows(cell.cmd()); - let stream_id = cell.stream_id(); + let c_t_w = sendme::cmd_counts_towards_windows(msg.cmd()); + let stream_id = msg.stream_id(); // Check whether the hop send window is empty, if this cell counts towards windows. // NOTE(eta): It is imperative this happens *before* calling encrypt() below, otherwise // we'll have cells rejected due to a protocol violation! (Cells have to be @@ -1507,13 +1539,13 @@ impl Reactor { "{}: having to use onto hop {} queue for cell: {:?}", self.unique_id, hop.display(), - cell + msg ); - circhop.outbound.push_back((early, cell)); + circhop.outbound.push_back((early, msg)); return Ok(()); } } - let mut body: RelayCellBody = cell + let mut body: RelayCellBody = msg .encode(&mut rand::thread_rng()) .map_err(|e| Error::from_cell_enc(e, "relay cell body"))? .into(); |