diff options
author | Cecylia Bocovich <cohosh@torproject.org> | 2021-05-12 09:32:07 -0400 |
---|---|---|
committer | Cecylia Bocovich <cohosh@torproject.org> | 2021-05-12 09:32:07 -0400 |
commit | 7c9005bed3e353c4e108355abd1ed4b35099f2ea (patch) | |
tree | ef77139dcb4814523dcdde8a27c04916bf59862c | |
parent | 11f0846264d4033e7a7dc7824febb6ad7140762f (diff) | |
download | snowflake-7c9005bed3e353c4e108355abd1ed4b35099f2ea.tar.gz snowflake-7c9005bed3e353c4e108355abd1ed4b35099f2ea.zip |
Ensure turbotunnel read and write loop terminate
Introduce a waitgroup and done channel to ensure that both the read and
write gorouting for turbotunnel connections terminate when the
connection is closed.
-rw-r--r-- | common/turbotunnel/clientmap.go | 1 | ||||
-rw-r--r-- | server/lib/http.go | 40 |
2 files changed, 27 insertions, 14 deletions
diff --git a/common/turbotunnel/clientmap.go b/common/turbotunnel/clientmap.go index fa12915..53d0302 100644 --- a/common/turbotunnel/clientmap.go +++ b/common/turbotunnel/clientmap.go @@ -140,5 +140,6 @@ func (inner *clientMapInner) Pop() interface{} { inner.byAge = inner.byAge[:n-1] // Remove from byAddr map. delete(inner.byAddr, record.Addr) + close(record.SendQueue) return record } diff --git a/server/lib/http.go b/server/lib/http.go index b1c453c..c612422 100644 --- a/server/lib/http.go +++ b/server/lib/http.go @@ -8,6 +8,7 @@ import ( "log" "net" "net/http" + "sync" "time" "git.torproject.org/pluggable-transports/snowflake.git/common/encapsulation" @@ -139,18 +140,21 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke // credited for the entire KCP session. clientIDAddrMap.Set(clientID, addr.String()) - errCh := make(chan error) + var wg sync.WaitGroup + wg.Add(2) + done := make(chan struct{}) // The remainder of the WebSocket stream consists of encapsulated // packets. We read them one by one and feed them into the // QueuePacketConn on which kcp.ServeConn was set up, which eventually // leads to KCP-level sessions in the acceptSessions function. go func() { + defer wg.Done() + defer close(done) // Signal the write loop to finish for { p, err := encapsulation.ReadData(conn) if err != nil { - errCh <- err - break + return } pconn.QueueIncoming(p, clientID) } @@ -159,24 +163,32 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke // At the same time, grab packets addressed to this ClientID and // encapsulate them into the downstream. go func() { + defer wg.Done() + defer conn.Close() // Signal the read loop to finish + // Buffer encapsulation.WriteData operations to keep length // prefixes in the same send as the data that follows. bw := bufio.NewWriter(conn) - for p := range pconn.OutgoingQueue(clientID) { - _, err := encapsulation.WriteData(bw, p) - if err == nil { - err = bw.Flush() - } - if err != nil { - errCh <- err - break + for { + select { + case <-done: + return + case p, ok := <-pconn.OutgoingQueue(clientID): + if !ok { + return + } + _, err := encapsulation.WriteData(bw, p) + if err == nil { + err = bw.Flush() + } + if err != nil { + return + } } } }() - // Wait until one of the above loops terminates. The closing of the - // WebSocket connection will terminate the other one. - <-errCh + wg.Wait() return nil } |