aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVort <vvort@yandex.ru>2023-03-27 19:02:10 +0300
committerShelikhoo <xiaokangwang@outlook.com>2023-06-19 17:44:45 +0100
commitea01c92cf1a9a13c1058b377ec547b43dfc164e1 (patch)
treeb382ef2c49aca6a6c1575c21ffa94d59de156db1
parentf8eb86f24d659f8d41dc67bd12bf496dbd35c0b5 (diff)
downloadsnowflake-ea01c92cf1a9a13c1058b377ec547b43dfc164e1.tar.gz
snowflake-ea01c92cf1a9a13c1058b377ec547b43dfc164e1.zip
Implement DataChannel flow control
-rw-r--r--proxy/lib/snowflake.go11
-rw-r--r--proxy/lib/webrtcconn.go20
2 files changed, 30 insertions, 1 deletions
diff --git a/proxy/lib/snowflake.go b/proxy/lib/snowflake.go
index ac47e1a..b46a9b4 100644
--- a/proxy/lib/snowflake.go
+++ b/proxy/lib/snowflake.go
@@ -83,6 +83,8 @@ const (
sessionIDLength = 16
)
+const bufferedAmountLowThreshold uint64 = 256 * 1024 // 256 KB
+
var broker *SignalingServer
var currentNATTypeAccess = &sync.RWMutex{}
@@ -408,6 +410,15 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer(sdp *webrtc.SessionDescrip
pr, pw := io.Pipe()
conn := newWebRTCConn(pc, dc, pr, sf.EventDispatcher)
+ dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold)
+
+ dc.OnBufferedAmountLow(func() {
+ select {
+ case conn.sendMoreCh <- struct{}{}:
+ default:
+ }
+ })
+
dc.OnOpen(func() {
log.Printf("Data Channel %s-%d open\n", dc.Label(), dc.ID())
diff --git a/proxy/lib/webrtcconn.go b/proxy/lib/webrtcconn.go
index d37cf51..ee656cb 100644
--- a/proxy/lib/webrtcconn.go
+++ b/proxy/lib/webrtcconn.go
@@ -16,6 +16,8 @@ import (
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event"
)
+const maxBufferedAmount uint64 = 512 * 1024 // 512 KB
+
var remoteIPPatterns = []*regexp.Regexp{
/* IPv4 */
regexp.MustCompile(`(?m)^c=IN IP4 ([\d.]+)(?:(?:\/\d+)?\/\d+)?(:? |\r?\n)`),
@@ -31,18 +33,23 @@ type webRTCConn struct {
lock sync.Mutex // Synchronization for DataChannel destruction
once sync.Once // Synchronization for PeerConnection destruction
+ isClosing bool
+
bytesLogger bytesLogger
eventLogger event.SnowflakeEventReceiver
inactivityTimeout time.Duration
activity chan struct{}
+ sendMoreCh chan struct{}
cancelTimeoutLoop context.CancelFunc
}
func newWebRTCConn(pc *webrtc.PeerConnection, dc *webrtc.DataChannel, pr *io.PipeReader, eventLogger event.SnowflakeEventReceiver) *webRTCConn {
conn := &webRTCConn{pc: pc, dc: dc, pr: pr, eventLogger: eventLogger}
+ conn.isClosing = false
conn.bytesLogger = newBytesSyncLogger()
conn.activity = make(chan struct{}, 100)
+ conn.sendMoreCh = make(chan struct{}, 1)
conn.inactivityTimeout = 30 * time.Second
ctx, cancel := context.WithCancel(context.Background())
conn.cancelTimeoutLoop = cancel
@@ -76,16 +83,27 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
func (c *webRTCConn) Write(b []byte) (int, error) {
c.bytesLogger.AddInbound(int64(len(b)))
- c.activity <- struct{}{}
+ select {
+ case c.activity <- struct{}{}:
+ default:
+ }
c.lock.Lock()
defer c.lock.Unlock()
if c.dc != nil {
c.dc.Send(b)
+ if !c.isClosing && c.dc.BufferedAmount()+uint64(len(b)) > maxBufferedAmount {
+ <-c.sendMoreCh
+ }
}
return len(b), nil
}
func (c *webRTCConn) Close() (err error) {
+ c.isClosing = true
+ select {
+ case c.sendMoreCh <- struct{}{}:
+ default:
+ }
c.once.Do(func() {
c.cancelTimeoutLoop()
err = c.pc.Close()