aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Fifield <david@bamsoftware.com>2022-10-01 11:43:29 -0600
committerDavid Fifield <david@bamsoftware.com>2022-12-14 23:02:26 -0700
commitc6fabb212d7eff3f58021eb0f806376772c2bd4d (patch)
treeb5d17f5044f2605a171e57f178c844609671ec54
parent53e381e45d63b0d0ccebbeece71b2471900c7b6a (diff)
downloadsnowflake-c6fabb212d7eff3f58021eb0f806376772c2bd4d.tar.gz
snowflake-c6fabb212d7eff3f58021eb0f806376772c2bd4d.zip
Use multiple parallel KCP state machines in the server.
To distribute CPU load. https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/40200
-rw-r--r--server/lib/http.go52
-rw-r--r--server/lib/snowflake.go48
2 files changed, 73 insertions, 27 deletions
diff --git a/server/lib/http.go b/server/lib/http.go
index 4bd4fa8..3a01884 100644
--- a/server/lib/http.go
+++ b/server/lib/http.go
@@ -3,6 +3,10 @@ package snowflake_server
import (
"bufio"
"bytes"
+ "crypto/hmac"
+ "crypto/rand"
+ "crypto/sha256"
+ "encoding/binary"
"fmt"
"io"
"log"
@@ -49,9 +53,45 @@ var upgrader = websocket.Upgrader{
var clientIDAddrMap = newClientIDMap(clientIDAddrMapCapacity)
type httpHandler struct {
- // pconn is the adapter layer between stream-oriented WebSocket
- // connections and the packet-oriented KCP layer.
- pconn *turbotunnel.QueuePacketConn
+ // pconns is the adapter layer between stream-oriented WebSocket
+ // connections and the packet-oriented KCP layer. There are multiple of
+ // these, corresponding to the multiple kcp.ServeConn in
+ // Transport.Listen. Clients are assigned to a particular instance by a
+ // hash of ClientID, indexed by a hash of the ClientID, in order to
+ // distribute KCP processing load across CPU cores.
+ pconns []*turbotunnel.QueuePacketConn
+
+ // clientIDLookupKey is a secret key used to tweak the hash-based
+ // assignement of ClientID to pconn, in order to avoid manipulation of
+ // hash assignments.
+ clientIDLookupKey []byte
+}
+
+// newHTTPHandler creates a new http.Handler that exchanges encapsulated packets
+// over incoming WebSocket connections.
+func newHTTPHandler(localAddr net.Addr, numInstances int) *httpHandler {
+ pconns := make([]*turbotunnel.QueuePacketConn, 0, numInstances)
+ for i := 0; i < numInstances; i++ {
+ pconns = append(pconns, turbotunnel.NewQueuePacketConn(localAddr, clientMapTimeout))
+ }
+
+ clientIDLookupKey := make([]byte, 16)
+ _, err := rand.Read(clientIDLookupKey)
+ if err != nil {
+ panic(err)
+ }
+
+ return &httpHandler{
+ pconns: pconns,
+ clientIDLookupKey: clientIDLookupKey,
+ }
+}
+
+// lookupPacketConn returns the element of pconns that corresponds to client ID,
+// according to the hash-based mapping.
+func (handler *httpHandler) lookupPacketConn(clientID turbotunnel.ClientID) *turbotunnel.QueuePacketConn {
+ s := hmac.New(sha256.New, handler.clientIDLookupKey).Sum(clientID[:])
+ return handler.pconns[binary.LittleEndian.Uint64(s)%uint64(len(handler.pconns))]
}
func (handler *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -82,7 +122,7 @@ func (handler *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch {
case bytes.Equal(token[:], turbotunnel.Token[:]):
- err = turbotunnelMode(conn, addr, handler.pconn)
+ err = handler.turbotunnelMode(conn, addr)
default:
// We didn't find a matching token, which means that we are
// dealing with a client that doesn't know about such things.
@@ -100,7 +140,7 @@ func (handler *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// turbotunnelMode handles clients that sent turbotunnel.Token at the start of
// their stream. These clients expect to send and receive encapsulated packets,
// with a long-lived session identified by ClientID.
-func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacketConn) error {
+func (handler *httpHandler) turbotunnelMode(conn net.Conn, addr net.Addr) error {
// Read the ClientID prefix. Every packet encapsulated in this WebSocket
// connection pertains to the same ClientID.
var clientID turbotunnel.ClientID
@@ -120,6 +160,8 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke
// credited for the entire KCP session.
clientIDAddrMap.Set(clientID, addr)
+ pconn := handler.lookupPacketConn(clientID)
+
var wg sync.WaitGroup
wg.Add(2)
done := make(chan struct{})
diff --git a/server/lib/snowflake.go b/server/lib/snowflake.go
index 5f995ac..2af5dbb 100644
--- a/server/lib/snowflake.go
+++ b/server/lib/snowflake.go
@@ -55,6 +55,11 @@ const (
WindowSize = 65535
// StreamSize controls the maximum amount of in flight data between a client and server.
StreamSize = 1048576 //1MB
+
+ // numKCPInstances is the number of parallel KCP state machines to run.
+ // Clients are assigned to a particular KCP instance by a hash of their
+ // ClientID.
+ numKCPInstances = 2
)
// Transport is a structure with methods that conform to the Go PT v2.1 API
@@ -76,17 +81,13 @@ func (t *Transport) Listen(addr net.Addr) (*SnowflakeListener, error) {
addr: addr,
queue: make(chan net.Conn, 65534),
closed: make(chan struct{}),
+ ln: make([]*kcp.Listener, 0, numKCPInstances),
}
- handler := httpHandler{
- // pconn is shared among all connections to this server. It
- // overlays packet-based client sessions on top of ephemeral
- // WebSocket connections.
- pconn: turbotunnel.NewQueuePacketConn(addr, clientMapTimeout),
- }
+ handler := newHTTPHandler(addr, numKCPInstances)
server := &http.Server{
Addr: addr.String(),
- Handler: &handler,
+ Handler: handler,
ReadTimeout: requestTimeout,
}
// We need to override server.TLSConfig.GetCertificate--but first
@@ -139,25 +140,26 @@ func (t *Transport) Listen(addr net.Addr) (*SnowflakeListener, error) {
listener.server = server
- // Start a KCP engine, set up to read and write its packets over the
+ // Start the KCP engines, set up to read and write its packets over the
// WebSocket connections that arrive at the web server.
// handler.ServeHTTP is responsible for encapsulation/decapsulation of
// packets on behalf of KCP. KCP takes those packets and turns them into
// sessions which appear in the acceptSessions function.
- ln, err := kcp.ServeConn(nil, 0, 0, handler.pconn)
- if err != nil {
- server.Close()
- return nil, err
- }
- go func() {
- defer ln.Close()
- err := listener.acceptSessions(ln)
+ for i, pconn := range handler.pconns {
+ ln, err := kcp.ServeConn(nil, 0, 0, pconn)
if err != nil {
- log.Printf("acceptSessions: %v", err)
+ server.Close()
+ return nil, err
}
- }()
-
- listener.ln = ln
+ go func() {
+ defer ln.Close()
+ err := listener.acceptSessions(ln)
+ if err != nil {
+ log.Printf("acceptSessions %d: %v", i, err)
+ }
+ }()
+ listener.ln = append(listener.ln, ln)
+ }
return listener, nil
@@ -167,7 +169,7 @@ type SnowflakeListener struct {
addr net.Addr
queue chan net.Conn
server *http.Server
- ln *kcp.Listener
+ ln []*kcp.Listener
closed chan struct{}
closeOnce sync.Once
}
@@ -196,7 +198,9 @@ func (l *SnowflakeListener) Close() error {
l.closeOnce.Do(func() {
close(l.closed)
l.server.Close()
- l.ln.Close()
+ for _, ln := range l.ln {
+ ln.Close()
+ }
})
return nil
}