diff options
author | David Fifield <david@bamsoftware.com> | 2023-03-10 17:33:53 -0700 |
---|---|---|
committer | David Fifield <david@bamsoftware.com> | 2023-03-13 11:42:44 -0600 |
commit | b63d2272bfd262f5166c44f8f88330ed7a732009 (patch) | |
tree | 47a5e261c948575879c1ea0489a7d4ca2b70a76a | |
parent | 473cc4598702c48ae8562df54115d54a7d9ec71c (diff) | |
download | snowflake-b63d2272bfd262f5166c44f8f88330ed7a732009.tar.gz snowflake-b63d2272bfd262f5166c44f8f88330ed7a732009.zip |
Test for data race with QueuePacketConn.WriteTo and kcp-go.
For #40260.
-rw-r--r-- | common/turbotunnel/queuepacketconn_test.go | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/common/turbotunnel/queuepacketconn_test.go b/common/turbotunnel/queuepacketconn_test.go index 49ab6aa..e7eb90f 100644 --- a/common/turbotunnel/queuepacketconn_test.go +++ b/common/turbotunnel/queuepacketconn_test.go @@ -1,8 +1,14 @@ package turbotunnel import ( + "bytes" + "fmt" + "net" + "sync" "testing" "time" + + "github.com/xtaci/kcp-go/v5" ) type emptyAddr struct{} @@ -10,6 +16,11 @@ type emptyAddr struct{} func (_ emptyAddr) Network() string { return "empty" } func (_ emptyAddr) String() string { return "empty" } +type intAddr int + +func (i intAddr) Network() string { return "int" } +func (i intAddr) String() string { return fmt.Sprintf("%d", i) } + // Run with -benchmem to see memory allocations. func BenchmarkQueueIncoming(b *testing.B) { conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour) @@ -41,3 +52,121 @@ func BenchmarkWriteTo(b *testing.B) { } b.StopTimer() } + +// DiscardPacketConn is a net.PacketConn whose ReadFrom method block forever and +// whose WriteTo method discards whatever it is called with. +type DiscardPacketConn struct{} + +func (_ DiscardPacketConn) ReadFrom(_ []byte) (int, net.Addr, error) { select {} } // block forever +func (_ DiscardPacketConn) WriteTo(p []byte, _ net.Addr) (int, error) { return len(p), nil } +func (_ DiscardPacketConn) Close() error { return nil } +func (_ DiscardPacketConn) LocalAddr() net.Addr { return emptyAddr{} } +func (_ DiscardPacketConn) SetDeadline(t time.Time) error { return nil } +func (_ DiscardPacketConn) SetReadDeadline(t time.Time) error { return nil } +func (_ DiscardPacketConn) SetWriteDeadline(t time.Time) error { return nil } + +// TranscriptPacketConn keeps a log of the []byte argument to every call to +// WriteTo. +type TranscriptPacketConn struct { + Transcript [][]byte + lock sync.Mutex + net.PacketConn +} + +func NewTranscriptPacketConn(inner net.PacketConn) *TranscriptPacketConn { + return &TranscriptPacketConn{ + PacketConn: inner, + } +} + +func (c *TranscriptPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) { + c.lock.Lock() + defer c.lock.Unlock() + + p2 := make([]byte, len(p)) + copy(p2, p) + c.Transcript = append(c.Transcript, p2) + + return c.PacketConn.WriteTo(p, addr) +} + +// Tests that QueuePacketConn.WriteTo is compatible with the way kcp-go uses +// PacketConn, allocating source buffers in a sync.Pool. +// +// https://bugs.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/40260 +func TestQueuePacketConnWriteToKCP(t *testing.T) { + // Start a goroutine to constantly exercise kcp UDPSession.tx, writing + // packets with payload "XXXX". + done := make(chan struct{}, 0) + defer close(done) + ready := make(chan struct{}, 0) + go func() { + var readyClose sync.Once + defer readyClose.Do(func() { close(ready) }) + pconn := DiscardPacketConn{} + defer pconn.Close() + for { + select { + case <-done: + break + default: + } + // Create a new UDPSession, send once, then discard the + // UDPSession. + conn, err := kcp.NewConn2(intAddr(2), nil, 0, 0, pconn) + if err != nil { + panic(err) + } + _, err = conn.Write([]byte("XXXX")) + if err != nil { + panic(err) + } + // Signal the main test to start once we have done one + // iterator of this noisy loop. + readyClose.Do(func() { close(ready) }) + } + }() + + pconn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour) + defer pconn.Close() + addr1 := intAddr(1) + outgoing := pconn.OutgoingQueue(addr1) + + // Once the "XXXX" goroutine is started, repeatedly send a packet, wait, + // then retrieve it and check whether it has changed since being sent. + <-ready + for i := 0; i < 10; i++ { + transcript := NewTranscriptPacketConn(pconn) + conn, err := kcp.NewConn2(addr1, nil, 0, 0, transcript) + if err != nil { + panic(err) + } + _, err = conn.Write([]byte("hello world")) + if err != nil { + panic(err) + } + + err = conn.Close() + if err != nil { + panic(err) + } + + // A sleep after the Write makes buffer reuse more likely. + time.Sleep(100 * time.Millisecond) + + if len(transcript.Transcript) == 0 { + panic("empty transcript") + } + + for j, tr := range transcript.Transcript { + p := <-outgoing + // This test is meant to detect unsynchronized memory + // changes, so freeze the slice we just read. + p2 := make([]byte, len(p)) + copy(p2, p) + if !bytes.Equal(p2, tr) { + t.Fatalf("%d %d packet changed between send and recv\nsend: %+q\nrecv: %+q", i, j, tr, p2) + } + } + } +} |