aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Fifield <david@bamsoftware.com>2023-03-10 17:33:53 -0700
committerDavid Fifield <david@bamsoftware.com>2023-03-13 11:42:44 -0600
commitb63d2272bfd262f5166c44f8f88330ed7a732009 (patch)
tree47a5e261c948575879c1ea0489a7d4ca2b70a76a
parent473cc4598702c48ae8562df54115d54a7d9ec71c (diff)
downloadsnowflake-b63d2272bfd262f5166c44f8f88330ed7a732009.tar.gz
snowflake-b63d2272bfd262f5166c44f8f88330ed7a732009.zip
Test for data race with QueuePacketConn.WriteTo and kcp-go.
For #40260.
-rw-r--r--common/turbotunnel/queuepacketconn_test.go129
1 files changed, 129 insertions, 0 deletions
diff --git a/common/turbotunnel/queuepacketconn_test.go b/common/turbotunnel/queuepacketconn_test.go
index 49ab6aa..e7eb90f 100644
--- a/common/turbotunnel/queuepacketconn_test.go
+++ b/common/turbotunnel/queuepacketconn_test.go
@@ -1,8 +1,14 @@
package turbotunnel
import (
+ "bytes"
+ "fmt"
+ "net"
+ "sync"
"testing"
"time"
+
+ "github.com/xtaci/kcp-go/v5"
)
type emptyAddr struct{}
@@ -10,6 +16,11 @@ type emptyAddr struct{}
func (_ emptyAddr) Network() string { return "empty" }
func (_ emptyAddr) String() string { return "empty" }
+type intAddr int
+
+func (i intAddr) Network() string { return "int" }
+func (i intAddr) String() string { return fmt.Sprintf("%d", i) }
+
// Run with -benchmem to see memory allocations.
func BenchmarkQueueIncoming(b *testing.B) {
conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour)
@@ -41,3 +52,121 @@ func BenchmarkWriteTo(b *testing.B) {
}
b.StopTimer()
}
+
+// DiscardPacketConn is a net.PacketConn whose ReadFrom method block forever and
+// whose WriteTo method discards whatever it is called with.
+type DiscardPacketConn struct{}
+
+func (_ DiscardPacketConn) ReadFrom(_ []byte) (int, net.Addr, error) { select {} } // block forever
+func (_ DiscardPacketConn) WriteTo(p []byte, _ net.Addr) (int, error) { return len(p), nil }
+func (_ DiscardPacketConn) Close() error { return nil }
+func (_ DiscardPacketConn) LocalAddr() net.Addr { return emptyAddr{} }
+func (_ DiscardPacketConn) SetDeadline(t time.Time) error { return nil }
+func (_ DiscardPacketConn) SetReadDeadline(t time.Time) error { return nil }
+func (_ DiscardPacketConn) SetWriteDeadline(t time.Time) error { return nil }
+
+// TranscriptPacketConn keeps a log of the []byte argument to every call to
+// WriteTo.
+type TranscriptPacketConn struct {
+ Transcript [][]byte
+ lock sync.Mutex
+ net.PacketConn
+}
+
+func NewTranscriptPacketConn(inner net.PacketConn) *TranscriptPacketConn {
+ return &TranscriptPacketConn{
+ PacketConn: inner,
+ }
+}
+
+func (c *TranscriptPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ p2 := make([]byte, len(p))
+ copy(p2, p)
+ c.Transcript = append(c.Transcript, p2)
+
+ return c.PacketConn.WriteTo(p, addr)
+}
+
+// Tests that QueuePacketConn.WriteTo is compatible with the way kcp-go uses
+// PacketConn, allocating source buffers in a sync.Pool.
+//
+// https://bugs.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/40260
+func TestQueuePacketConnWriteToKCP(t *testing.T) {
+ // Start a goroutine to constantly exercise kcp UDPSession.tx, writing
+ // packets with payload "XXXX".
+ done := make(chan struct{}, 0)
+ defer close(done)
+ ready := make(chan struct{}, 0)
+ go func() {
+ var readyClose sync.Once
+ defer readyClose.Do(func() { close(ready) })
+ pconn := DiscardPacketConn{}
+ defer pconn.Close()
+ for {
+ select {
+ case <-done:
+ break
+ default:
+ }
+ // Create a new UDPSession, send once, then discard the
+ // UDPSession.
+ conn, err := kcp.NewConn2(intAddr(2), nil, 0, 0, pconn)
+ if err != nil {
+ panic(err)
+ }
+ _, err = conn.Write([]byte("XXXX"))
+ if err != nil {
+ panic(err)
+ }
+ // Signal the main test to start once we have done one
+ // iterator of this noisy loop.
+ readyClose.Do(func() { close(ready) })
+ }
+ }()
+
+ pconn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour)
+ defer pconn.Close()
+ addr1 := intAddr(1)
+ outgoing := pconn.OutgoingQueue(addr1)
+
+ // Once the "XXXX" goroutine is started, repeatedly send a packet, wait,
+ // then retrieve it and check whether it has changed since being sent.
+ <-ready
+ for i := 0; i < 10; i++ {
+ transcript := NewTranscriptPacketConn(pconn)
+ conn, err := kcp.NewConn2(addr1, nil, 0, 0, transcript)
+ if err != nil {
+ panic(err)
+ }
+ _, err = conn.Write([]byte("hello world"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = conn.Close()
+ if err != nil {
+ panic(err)
+ }
+
+ // A sleep after the Write makes buffer reuse more likely.
+ time.Sleep(100 * time.Millisecond)
+
+ if len(transcript.Transcript) == 0 {
+ panic("empty transcript")
+ }
+
+ for j, tr := range transcript.Transcript {
+ p := <-outgoing
+ // This test is meant to detect unsynchronized memory
+ // changes, so freeze the slice we just read.
+ p2 := make([]byte, len(p))
+ copy(p2, p)
+ if !bytes.Equal(p2, tr) {
+ t.Fatalf("%d %d packet changed between send and recv\nsend: %+q\nrecv: %+q", i, j, tr, p2)
+ }
+ }
+ }
+}