diff options
author | Vort <vvort@yandex.ru> | 2023-03-27 19:02:10 +0300 |
---|---|---|
committer | Shelikhoo <xiaokangwang@outlook.com> | 2023-06-19 17:44:45 +0100 |
commit | ea01c92cf1a9a13c1058b377ec547b43dfc164e1 (patch) | |
tree | b382ef2c49aca6a6c1575c21ffa94d59de156db1 | |
parent | f8eb86f24d659f8d41dc67bd12bf496dbd35c0b5 (diff) | |
download | snowflake-ea01c92cf1a9a13c1058b377ec547b43dfc164e1.tar.gz snowflake-ea01c92cf1a9a13c1058b377ec547b43dfc164e1.zip |
Implement DataChannel flow control
-rw-r--r-- | proxy/lib/snowflake.go | 11 | ||||
-rw-r--r-- | proxy/lib/webrtcconn.go | 20 |
2 files changed, 30 insertions, 1 deletions
diff --git a/proxy/lib/snowflake.go b/proxy/lib/snowflake.go index ac47e1a..b46a9b4 100644 --- a/proxy/lib/snowflake.go +++ b/proxy/lib/snowflake.go @@ -83,6 +83,8 @@ const ( sessionIDLength = 16 ) +const bufferedAmountLowThreshold uint64 = 256 * 1024 // 256 KB + var broker *SignalingServer var currentNATTypeAccess = &sync.RWMutex{} @@ -408,6 +410,15 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer(sdp *webrtc.SessionDescrip pr, pw := io.Pipe() conn := newWebRTCConn(pc, dc, pr, sf.EventDispatcher) + dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) + + dc.OnBufferedAmountLow(func() { + select { + case conn.sendMoreCh <- struct{}{}: + default: + } + }) + dc.OnOpen(func() { log.Printf("Data Channel %s-%d open\n", dc.Label(), dc.ID()) diff --git a/proxy/lib/webrtcconn.go b/proxy/lib/webrtcconn.go index d37cf51..ee656cb 100644 --- a/proxy/lib/webrtcconn.go +++ b/proxy/lib/webrtcconn.go @@ -16,6 +16,8 @@ import ( "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event" ) +const maxBufferedAmount uint64 = 512 * 1024 // 512 KB + var remoteIPPatterns = []*regexp.Regexp{ /* IPv4 */ regexp.MustCompile(`(?m)^c=IN IP4 ([\d.]+)(?:(?:\/\d+)?\/\d+)?(:? |\r?\n)`), @@ -31,18 +33,23 @@ type webRTCConn struct { lock sync.Mutex // Synchronization for DataChannel destruction once sync.Once // Synchronization for PeerConnection destruction + isClosing bool + bytesLogger bytesLogger eventLogger event.SnowflakeEventReceiver inactivityTimeout time.Duration activity chan struct{} + sendMoreCh chan struct{} cancelTimeoutLoop context.CancelFunc } func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.PipeReader, eventLogger event.SnowflakeEventReceiver) *webRTCConn { conn := &webRTCConn{pc: pc, dc: dc, pr: pr, eventLogger: eventLogger} + conn.isClosing = false conn.bytesLogger = newBytesSyncLogger() conn.activity = make(chan struct{}, 100) + conn.sendMoreCh = make(chan struct{}, 1) conn.inactivityTimeout = 30 * time.Second ctx, cancel := context.WithCancel(context.Background()) conn.cancelTimeoutLoop = cancel @@ -76,16 +83,27 @@ func (c *webRTCConn) Read(b []byte) (int, error) { func (c *webRTCConn) Write(b []byte) (int, error) { c.bytesLogger.AddInbound(int64(len(b))) - c.activity <- struct{}{} + select { + case c.activity <- struct{}{}: + default: + } c.lock.Lock() defer c.lock.Unlock() if c.dc != nil { c.dc.Send(b) + if !c.isClosing && c.dc.BufferedAmount()+uint64(len(b)) > maxBufferedAmount { + <-c.sendMoreCh + } } return len(b), nil } func (c *webRTCConn) Close() (err error) { + c.isClosing = true + select { + case c.sendMoreCh <- struct{}{}: + default: + } c.once.Do(func() { c.cancelTimeoutLoop() err = c.pc.Close() |