aboutsummaryrefslogtreecommitdiff
path: root/client/lib/snowflake.go
blob: dd78c12d6a547ed61b188ca8f1105e9e4d9cc468 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
/*
Package snowflake_client implements functionality necessary for a client to establish a connection
to a server using Snowflake.

Included in the package is a Transport type that implements the Pluggable Transports v2.1 Go API
specification. To use Snowflake, you must first create a client from a configuration:

	config := snowflake_client.ClientConfig{
		BrokerURL:   "https://snowflake-broker.example.com",
		FrontDomain: "https://friendlyfrontdomain.net",
		// ...
	}
	transport, err := snowflake_client.NewSnowflakeClient(config)
	if err != nil {
		// handle error
	}

The Dial function connects to a Snowflake server:

	conn, err := transport.Dial()
	if err != nil {
		// handle error
	}
	defer conn.Close()

*/
package snowflake_client

import (
	"context"
	"errors"
	"log"
	"math/rand"
	"net"
	"strings"
	"time"

	"git.torproject.org/pluggable-transports/snowflake.git/v2/common/event"
	"git.torproject.org/pluggable-transports/snowflake.git/v2/common/nat"
	"git.torproject.org/pluggable-transports/snowflake.git/v2/common/turbotunnel"
	"github.com/pion/webrtc/v3"
	"github.com/xtaci/kcp-go/v5"
	"github.com/xtaci/smux"
)

const (
	// ReconnectTimeout is the time a Snowflake client will wait before collecting
	// more snowflakes.
	ReconnectTimeout = 10 * time.Second
	// SnowflakeTimeout is the time a Snowflake client will wait before determining that
	// a remote snowflake has been disconnected. If no new messages are sent or received
	// in this time period, the client will terminate the connection with the remote
	// peer and collect a new snowflake.
	SnowflakeTimeout = 20 * time.Second
	// DataChannelTimeout is how long the client will wait for the OnOpen callback
	// on a newly created DataChannel.
	DataChannelTimeout = 10 * time.Second

	// WindowSize is the number of packets in the send and receive window of a KCP connection.
	WindowSize = 65535
	// StreamSize controls the maximum amount of in flight data between a client and server.
	StreamSize = 1048576 //1MB
)

type dummyAddr struct{}

func (addr dummyAddr) Network() string { return "dummy" }
func (addr dummyAddr) String() string  { return "dummy" }

// Transport is a structure with methods that conform to the Go PT v2.1 API
// https://github.com/Pluggable-Transports/Pluggable-Transports-spec/blob/master/releases/PTSpecV2.1/Pluggable%20Transport%20Specification%20v2.1%20-%20Go%20Transport%20API.pdf
type Transport struct {
	dialer *WebRTCDialer

	// EventDispatcher is the event bus for snowflake events.
	// When an important event happens, it will be distributed here.
	eventDispatcher event.SnowflakeEventDispatcher
}

// ClientConfig defines how the SnowflakeClient will connect to the broker and Snowflake proxies.
type ClientConfig struct {
	// BrokerURL is the full URL of the Snowflake broker that the client will connect to.
	BrokerURL string
	// AmpCacheURL is the full URL of a valid AMP cache. A nonzero value indicates
	// that AMP cache will be used as the rendezvous method with the broker.
	AmpCacheURL string
	// FrontDomain is a the full URL of an optional front domain that can be used with either
	// the AMP cache or HTTP domain fronting rendezvous method.
	FrontDomain string
	// ICEAddresses are a slice of ICE server URLs that will be used for NAT traversal and
	// the creation of the client's WebRTC SDP offer.
	ICEAddresses []string
	// KeepLocalAddresses is an optional setting that will prevent the removal of local or
	// invalid addresses from the client's SDP offer. This is useful for local deployments
	// and testing.
	KeepLocalAddresses bool
	// Max is the maximum number of snowflake proxy peers that the client should attempt to
	// connect to. Defaults to 1.
	Max int
	// UTLSClientID is the type of user application that snowflake should imitate.
	// If an empty value is provided, it will use Go's default TLS implementation
	UTLSClientID string
	// UTLSRemoveSNI is the flag to control whether SNI should be removed from Client Hello
	// when uTLS is used.
	UTLSRemoveSNI bool
	// BridgeFingerprint is the fingerprint of the bridge that the client will eventually
	// connect to, as specified in the Bridge line of the torrc.
	BridgeFingerprint string
}

// NewSnowflakeClient creates a new Snowflake transport client that can spawn multiple
// Snowflake connections.
//
// brokerURL and frontDomain are the urls for the broker host and domain fronting host
// iceAddresses are the STUN/TURN urls needed for WebRTC negotiation
// keepLocalAddresses is a flag to enable sending local network addresses (for testing purposes)
// max is the maximum number of snowflakes the client should gather for each SOCKS connection
func NewSnowflakeClient(config ClientConfig) (*Transport, error) {

	log.Println("\n\n\n --- Starting Snowflake Client ---")

	iceServers := parseIceServers(config.ICEAddresses)
	// chooses a random subset of servers from inputs
	rand.Seed(time.Now().UnixNano())
	rand.Shuffle(len(iceServers), func(i, j int) {
		iceServers[i], iceServers[j] = iceServers[j], iceServers[i]
	})
	if len(iceServers) > 2 {
		iceServers = iceServers[:(len(iceServers)+1)/2]
	}
	log.Printf("Using ICE servers:")
	for _, server := range iceServers {
		log.Printf("url: %v", strings.Join(server.URLs, " "))
	}

	// Rendezvous with broker using the given parameters.
	broker, err := newBrokerChannelFromConfig(config)
	if err != nil {
		return nil, err
	}
	go updateNATType(iceServers, broker)

	max := 1
	if config.Max > max {
		max = config.Max
	}
	eventsLogger := event.NewSnowflakeEventDispatcher()
	transport := &Transport{dialer: NewWebRTCDialerWithEvents(broker, iceServers, max, eventsLogger), eventDispatcher: eventsLogger}

	return transport, nil
}

// Dial creates a new Snowflake connection.
// Dial starts the collection of snowflakes and returns a SnowflakeConn that is a
// wrapper around a smux.Stream that will reliably deliver data to a Snowflake
// server through one or more snowflake proxies.
func (t *Transport) Dial() (net.Conn, error) {
	// Cleanup functions to run before returning, in case of an error.
	var cleanup []func()
	defer func() {
		// Run cleanup in reverse order, as defer does.
		for i := len(cleanup) - 1; i >= 0; i-- {
			cleanup[i]()
		}
	}()

	// Prepare to collect remote WebRTC peers.
	snowflakes, err := NewPeers(t.dialer)
	if err != nil {
		return nil, err
	}
	cleanup = append(cleanup, func() { snowflakes.End() })

	// Use a real logger to periodically output how much traffic is happening.
	snowflakes.bytesLogger = newBytesSyncLogger()

	log.Printf("---- SnowflakeConn: begin collecting snowflakes ---")
	go connectLoop(snowflakes)

	// Create a new smux session
	log.Printf("---- SnowflakeConn: starting a new session ---")
	pconn, sess, err := newSession(snowflakes)
	if err != nil {
		return nil, err
	}
	cleanup = append(cleanup, func() {
		pconn.Close()
		sess.Close()
	})

	// On the smux session we overlay a stream.
	stream, err := sess.OpenStream()
	if err != nil {
		return nil, err
	}
	// Begin exchanging data.
	log.Printf("---- SnowflakeConn: begin stream %v ---", stream.ID())
	cleanup = append(cleanup, func() { stream.Close() })

	// All good, clear the cleanup list.
	cleanup = nil
	return &SnowflakeConn{Stream: stream, sess: sess, pconn: pconn, snowflakes: snowflakes}, nil
}
func (t *Transport) AddSnowflakeEventListener(receiver event.SnowflakeEventReceiver) {
	t.eventDispatcher.AddSnowflakeEventListener(receiver)
}

func (t *Transport) RemoveSnowflakeEventListener(receiver event.SnowflakeEventReceiver) {
	t.eventDispatcher.RemoveSnowflakeEventListener(receiver)
}

// SetRendezvousMethod sets the rendezvous method to the Snowflake broker.
func (t *Transport) SetRendezvousMethod(r RendezvousMethod) {
	t.dialer.Rendezvous = r
}

// SnowflakeConn is a reliable connection to a snowflake server that implements net.Conn.
type SnowflakeConn struct {
	*smux.Stream
	sess       *smux.Session
	pconn      net.PacketConn
	snowflakes *Peers
}

// Close closes the connection.
//
// The collection of snowflake proxies for this connection is stopped.
func (conn *SnowflakeConn) Close() error {
	log.Printf("---- SnowflakeConn: closed stream %v ---", conn.ID())
	conn.Stream.Close()
	log.Printf("---- SnowflakeConn: end collecting snowflakes ---")
	conn.snowflakes.End()
	conn.pconn.Close()
	log.Printf("---- SnowflakeConn: discarding finished session ---")
	conn.sess.Close()
	return nil //TODO: return errors if any of the above do
}

// loop through all provided STUN servers until we exhaust the list or find
// one that is compatable with RFC 5780
func updateNATType(servers []webrtc.ICEServer, broker *BrokerChannel) {

	var restrictedNAT bool
	var err error
	for _, server := range servers {
		addr := strings.TrimPrefix(server.URLs[0], "stun:")
		restrictedNAT, err = nat.CheckIfRestrictedNAT(addr)
		if err == nil {
			if restrictedNAT {
				broker.SetNATType(nat.NATRestricted)
			} else {
				broker.SetNATType(nat.NATUnrestricted)
			}
			break
		}
	}
	if err != nil {
		broker.SetNATType(nat.NATUnknown)
	}
}

// Returns a slice of webrtc.ICEServer given a slice of addresses
func parseIceServers(addresses []string) []webrtc.ICEServer {
	var servers []webrtc.ICEServer
	if len(addresses) == 0 {
		return nil
	}
	for _, url := range addresses {
		url = strings.TrimSpace(url)
		servers = append(servers, webrtc.ICEServer{
			URLs: []string{url},
		})
	}
	return servers
}

// newSession returns a new smux.Session and the net.PacketConn it is running
// over. The net.PacketConn successively connects through Snowflake proxies
// pulled from snowflakes.
func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) {
	clientID := turbotunnel.NewClientID()

	// We build a persistent KCP session on a sequence of ephemeral WebRTC
	// connections. This dialContext tells RedialPacketConn how to get a new
	// WebRTC connection when the previous one dies. Inside each WebRTC
	// connection, we use encapsulationPacketConn to encode packets into a
	// stream.
	dialContext := func(ctx context.Context) (net.PacketConn, error) {
		log.Printf("redialing on same connection")
		// Obtain an available WebRTC remote. May block.
		conn := snowflakes.Pop()
		if conn == nil {
			return nil, errors.New("handler: Received invalid Snowflake")
		}
		log.Println("---- Handler: snowflake assigned ----")
		// Send the magic Turbo Tunnel token.
		_, err := conn.Write(turbotunnel.Token[:])
		if err != nil {
			return nil, err
		}
		// Send ClientID prefix.
		_, err = conn.Write(clientID[:])
		if err != nil {
			return nil, err
		}
		return newEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil
	}
	pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext)

	// conn is built on the underlying RedialPacketConn—when one WebRTC
	// connection dies, another one will be found to take its place. The
	// sequence of packets across multiple WebRTC connections drives the KCP
	// engine.
	conn, err := kcp.NewConn2(dummyAddr{}, nil, 0, 0, pconn)
	if err != nil {
		pconn.Close()
		return nil, nil, err
	}
	// Permit coalescing the payloads of consecutive sends.
	conn.SetStreamMode(true)
	// Set the maximum send and receive window sizes to a high number
	// Removes KCP bottlenecks: https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/40026
	conn.SetWindowSize(WindowSize, WindowSize)
	// Disable the dynamic congestion window (limit only by the
	// maximum of local and remote static windows).
	conn.SetNoDelay(
		0, // default nodelay
		0, // default interval
		0, // default resend
		1, // nc=1 => congestion window off
	)
	// On the KCP connection we overlay an smux session and stream.
	smuxConfig := smux.DefaultConfig()
	smuxConfig.Version = 2
	smuxConfig.KeepAliveTimeout = 10 * time.Minute
	smuxConfig.MaxStreamBuffer = StreamSize

	sess, err := smux.Client(conn, smuxConfig)
	if err != nil {
		conn.Close()
		pconn.Close()
		return nil, nil, err
	}

	return pconn, sess, err
}

// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
func connectLoop(snowflakes SnowflakeCollector) {
	for {
		timer := time.After(ReconnectTimeout)
		_, err := snowflakes.Collect()
		if err != nil {
			log.Printf("WebRTC: %v  Retrying...", err)
		}
		select {
		case <-timer:
			continue
		case <-snowflakes.Melted():
			log.Println("ConnectLoop: stopped.")
			return
		}
	}
}