aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCecylia Bocovich <cohosh@torproject.org>2021-05-12 09:32:07 -0400
committerCecylia Bocovich <cohosh@torproject.org>2021-05-12 09:32:07 -0400
commit7c9005bed3e353c4e108355abd1ed4b35099f2ea (patch)
treeef77139dcb4814523dcdde8a27c04916bf59862c
parent11f0846264d4033e7a7dc7824febb6ad7140762f (diff)
downloadsnowflake-7c9005bed3e353c4e108355abd1ed4b35099f2ea.tar.gz
snowflake-7c9005bed3e353c4e108355abd1ed4b35099f2ea.zip
Ensure turbotunnel read and write loop terminate
Introduce a waitgroup and done channel to ensure that both the read and write gorouting for turbotunnel connections terminate when the connection is closed.
-rw-r--r--common/turbotunnel/clientmap.go1
-rw-r--r--server/lib/http.go40
2 files changed, 27 insertions, 14 deletions
diff --git a/common/turbotunnel/clientmap.go b/common/turbotunnel/clientmap.go
index fa12915..53d0302 100644
--- a/common/turbotunnel/clientmap.go
+++ b/common/turbotunnel/clientmap.go
@@ -140,5 +140,6 @@ func (inner *clientMapInner) Pop() interface{} {
inner.byAge = inner.byAge[:n-1]
// Remove from byAddr map.
delete(inner.byAddr, record.Addr)
+ close(record.SendQueue)
return record
}
diff --git a/server/lib/http.go b/server/lib/http.go
index b1c453c..c612422 100644
--- a/server/lib/http.go
+++ b/server/lib/http.go
@@ -8,6 +8,7 @@ import (
"log"
"net"
"net/http"
+ "sync"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/common/encapsulation"
@@ -139,18 +140,21 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke
// credited for the entire KCP session.
clientIDAddrMap.Set(clientID, addr.String())
- errCh := make(chan error)
+ var wg sync.WaitGroup
+ wg.Add(2)
+ done := make(chan struct{})
// The remainder of the WebSocket stream consists of encapsulated
// packets. We read them one by one and feed them into the
// QueuePacketConn on which kcp.ServeConn was set up, which eventually
// leads to KCP-level sessions in the acceptSessions function.
go func() {
+ defer wg.Done()
+ defer close(done) // Signal the write loop to finish
for {
p, err := encapsulation.ReadData(conn)
if err != nil {
- errCh <- err
- break
+ return
}
pconn.QueueIncoming(p, clientID)
}
@@ -159,24 +163,32 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke
// At the same time, grab packets addressed to this ClientID and
// encapsulate them into the downstream.
go func() {
+ defer wg.Done()
+ defer conn.Close() // Signal the read loop to finish
+
// Buffer encapsulation.WriteData operations to keep length
// prefixes in the same send as the data that follows.
bw := bufio.NewWriter(conn)
- for p := range pconn.OutgoingQueue(clientID) {
- _, err := encapsulation.WriteData(bw, p)
- if err == nil {
- err = bw.Flush()
- }
- if err != nil {
- errCh <- err
- break
+ for {
+ select {
+ case <-done:
+ return
+ case p, ok := <-pconn.OutgoingQueue(clientID):
+ if !ok {
+ return
+ }
+ _, err := encapsulation.WriteData(bw, p)
+ if err == nil {
+ err = bw.Flush()
+ }
+ if err != nil {
+ return
+ }
}
}
}()
- // Wait until one of the above loops terminates. The closing of the
- // WebSocket connection will terminate the other one.
- <-errCh
+ wg.Wait()
return nil
}