aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Pu <michael.pu@uwaterloo.ca>2023-11-18 20:43:28 -0500
committerCecylia Bocovich <cohosh@torproject.org>2024-01-22 13:06:42 -0500
commit8fb17de1529281d30f1eb3c9d746de70673337fa (patch)
treeba08f5d26210de1633e890b0f7b4d187cdb6bacd
parentd0529141acb706f64e4defebd22a7d8604d831db (diff)
downloadsnowflake-8fb17de1529281d30f1eb3c9d746de70673337fa.tar.gz
snowflake-8fb17de1529281d30f1eb3c9d746de70673337fa.zip
Implement SQS rendezvous in client and broker
This features adds an additional rendezvous method to send client offers and receive proxy answers through the use of Amazon SQS queues. https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/26151
-rw-r--r--broker/broker.go17
-rw-r--r--broker/sqs.go195
-rw-r--r--client/lib/rendezvous.go19
-rw-r--r--client/lib/rendezvous_sqs.go135
-rw-r--r--client/lib/snowflake.go6
-rw-r--r--client/snowflake.go17
-rw-r--r--doc/rendezvous-with-sqs.md45
-rw-r--r--go.mod14
-rw-r--r--go.sum28
9 files changed, 472 insertions, 4 deletions
diff --git a/broker/broker.go b/broker/broker.go
index 33f45ab..06b530a 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -8,9 +8,9 @@ package main
import (
"bytes"
"container/heap"
+ "context"
"crypto/tls"
"flag"
- "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint"
"io"
"log"
"net/http"
@@ -21,6 +21,8 @@ import (
"syscall"
"time"
+ "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint"
+
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/namematcher"
@@ -190,6 +192,7 @@ func main() {
var geoipDatabase string
var geoip6Database string
var bridgeListFilePath, allowedRelayPattern, presumedPatternForLegacyClient string
+ var brokerSQSQueueName, brokerSQSQueueRegion string
var disableTLS bool
var certFilename, keyFilename string
var disableGeoip bool
@@ -207,6 +210,8 @@ func main() {
flag.StringVar(&bridgeListFilePath, "bridge-list-path", "", "file path for bridgeListFile")
flag.StringVar(&allowedRelayPattern, "allowed-relay-pattern", "", "allowed pattern for relay host name")
flag.StringVar(&presumedPatternForLegacyClient, "default-relay-pattern", "", "presumed pattern for legacy client")
+ flag.StringVar(&brokerSQSQueueName, "broker-sqs-name", "", "name of broker SQS queue to listen for incoming messages on")
+ flag.StringVar(&brokerSQSQueueRegion, "broker-sqs-region", "", "name of AWS region of broker SQS queue")
flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS")
flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection")
flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output")
@@ -276,6 +281,16 @@ func main() {
Addr: addr,
}
+ // Run SQS Handler to continuously poll and process messages from SQS
+ if brokerSQSQueueName != "" && brokerSQSQueueRegion != "" {
+ sqsHandlerContext := context.Background()
+ sqsHandler, err := newSQSHandler(sqsHandlerContext, brokerSQSQueueName, brokerSQSQueueRegion, i)
+ if err != nil {
+ log.Fatal(err)
+ }
+ go sqsHandler.PollAndHandleMessages(sqsHandlerContext)
+ }
+
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
diff --git a/broker/sqs.go b/broker/sqs.go
new file mode 100644
index 0000000..f3d3e79
--- /dev/null
+++ b/broker/sqs.go
@@ -0,0 +1,195 @@
+package main
+
+import (
+ "context"
+ "log"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/config"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages"
+)
+
+const (
+ cleanupInterval = time.Second * 30
+ cleanupThreshold = -2 * time.Minute
+)
+
+type sqsHandler struct {
+ SQSClient *sqs.Client
+ SQSQueueURL *string
+ IPC *IPC
+}
+
+func (r *sqsHandler) pollMessages(context context.Context, chn chan<- *types.Message) {
+ for {
+ res, err := r.SQSClient.ReceiveMessage(context, &sqs.ReceiveMessageInput{
+ QueueUrl: r.SQSQueueURL,
+ MaxNumberOfMessages: 10,
+ WaitTimeSeconds: 15,
+ MessageAttributeNames: []string{
+ string(types.QueueAttributeNameAll),
+ },
+ })
+
+ if err != nil {
+ log.Printf("SQSHandler: encountered error while polling for messages: %v\n", err)
+ }
+
+ for _, message := range res.Messages {
+ chn <- &message
+ }
+ }
+}
+
+func (r *sqsHandler) cleanupClientQueues(context context.Context) {
+ for range time.Tick(cleanupInterval) {
+ // Runs at fixed intervals to clean up any client queues that were last changed more than 2 minutes ago
+ queueURLsList := []string{}
+ var nextToken *string
+ for {
+ res, err := r.SQSClient.ListQueues(context, &sqs.ListQueuesInput{
+ QueueNamePrefix: aws.String("snowflake-client-"),
+ MaxResults: aws.Int32(1000),
+ NextToken: nextToken,
+ })
+ if err != nil {
+ log.Printf("SQSHandler: encountered error while retrieving client queues to clean up: %v\n", err)
+ }
+ queueURLsList = append(queueURLsList, res.QueueUrls...)
+ if res.NextToken == nil {
+ break
+ } else {
+ nextToken = res.NextToken
+ }
+ }
+
+ numDeleted := 0
+ cleanupCutoff := time.Now().Add(cleanupThreshold)
+ for _, queueURL := range queueURLsList {
+ if !strings.Contains(queueURL, "snowflake-client-") {
+ continue
+ }
+ res, err := r.SQSClient.GetQueueAttributes(context, &sqs.GetQueueAttributesInput{
+ QueueUrl: aws.String(queueURL),
+ AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
+ })
+ if err != nil {
+ // According to the AWS SQS docs, the deletion process for a queue can take up to 60 seconds. So the queue
+ // can be in the process of being deleted, but will still be returned by the ListQueues operation, but
+ // fail when we try to GetQueueAttributes for the queue
+ log.Printf("SQSHandler: encountered error while getting attribute of client queue %s. queue may already be deleted.\n", queueURL)
+ continue
+ }
+ lastModifiedInt64, err := strconv.ParseInt(res.Attributes[string(types.QueueAttributeNameLastModifiedTimestamp)], 10, 64)
+ if err != nil {
+ log.Printf("SQSHandler: encountered invalid lastModifiedTimetamp value from client queue %s: %v\n", queueURL, err)
+ continue
+ }
+ lastModified := time.Unix(lastModifiedInt64, 0)
+ if lastModified.Before(cleanupCutoff) {
+ _, err := r.SQSClient.DeleteQueue(context, &sqs.DeleteQueueInput{
+ QueueUrl: aws.String(queueURL),
+ })
+ if err != nil {
+ log.Printf("SQSHandler: encountered error when deleting client queue %s: %v\n", queueURL, err)
+ continue
+ } else {
+ numDeleted += 1
+ }
+
+ }
+ }
+ log.Printf("SQSHandler: finished running iteration of client queue cleanup. found and deleted %d client queues.\n", numDeleted)
+ }
+}
+
+func (r *sqsHandler) handleMessage(context context.Context, message *types.Message) {
+ var encPollReq []byte
+ var response []byte
+ var err error
+
+ clientID := message.MessageAttributes["ClientID"].StringValue
+ if clientID == nil {
+ log.Println("SQSHandler: got SDP offer in SQS message with no client ID. ignoring this message.")
+ return
+ }
+
+ res, err := r.SQSClient.CreateQueue(context, &sqs.CreateQueueInput{
+ QueueName: aws.String("snowflake-client-" + *clientID),
+ })
+ answerSQSURL := res.QueueUrl
+ if err != nil {
+ log.Printf("SQSHandler: error encountered when creating answer queue for client %s: %v\n", *clientID, err)
+ }
+
+ encPollReq = []byte(*message.Body)
+ arg := messages.Arg{
+ Body: encPollReq,
+ RemoteAddr: "",
+ }
+ err = r.IPC.ClientOffers(arg, &response)
+
+ if err != nil {
+ log.Printf("SQSHandler: error encountered when handling message: %v\n", err)
+ return
+ }
+
+ r.SQSClient.SendMessage(context, &sqs.SendMessageInput{
+ QueueUrl: answerSQSURL,
+ MessageBody: aws.String(string(response)),
+ })
+}
+
+func (r *sqsHandler) deleteMessage(context context.Context, message *types.Message) {
+ r.SQSClient.DeleteMessage(context, &sqs.DeleteMessageInput{
+ QueueUrl: r.SQSQueueURL,
+ ReceiptHandle: message.ReceiptHandle,
+ })
+}
+
+func newSQSHandler(context context.Context, sqsQueueName string, region string, i *IPC) (*sqsHandler, error) {
+ log.Printf("Loading SQSHandler using SQS Queue %s in region %s\n", sqsQueueName, region)
+ cfg, err := config.LoadDefaultConfig(context, config.WithRegion(region))
+ if err != nil {
+ return nil, err
+ }
+
+ client := sqs.NewFromConfig(cfg)
+
+ // Creates the queue if a queue with the same name doesn't exist. If a queue with the same name and attributes
+ // already exists, then nothing will happen. If a queue with the same name, but different attributes exists, then
+ // an error will be returned
+ res, err := client.CreateQueue(context, &sqs.CreateQueueInput{
+ QueueName: aws.String(sqsQueueName),
+ Attributes: map[string]string{
+ "MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
+ },
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &sqsHandler{
+ SQSClient: client,
+ SQSQueueURL: res.QueueUrl,
+ IPC: i,
+ }, nil
+}
+
+func (r *sqsHandler) PollAndHandleMessages(context context.Context) {
+ log.Println("SQSHandler: Starting to poll for messages at: " + *r.SQSQueueURL)
+ messagesChn := make(chan *types.Message, 2)
+ go r.pollMessages(context, messagesChn)
+ go r.cleanupClientQueues(context)
+
+ for message := range messagesChn {
+ r.handleMessage(context, message)
+ r.deleteMessage(context, message)
+ }
+}
diff --git a/client/lib/rendezvous.go b/client/lib/rendezvous.go
index a9d810e..908b2ff 100644
--- a/client/lib/rendezvous.go
+++ b/client/lib/rendezvous.go
@@ -26,7 +26,9 @@ import (
const (
brokerErrorUnexpected string = "Unexpected error, no answer."
- readLimit = 100000 //Maximum number of bytes to be read from an HTTP response
+ rendezvousErrorMsg string = "One of SQS, AmpCache, or Domain Fronting rendezvous methods must be used."
+
+ readLimit = 100000 //Maximum number of bytes to be read from an HTTP response
)
// RendezvousMethod represents a way of communicating with the broker: sending
@@ -88,14 +90,25 @@ func newBrokerChannelFromConfig(config ClientConfig) (*BrokerChannel, error) {
var rendezvous RendezvousMethod
var err error
- if config.AmpCacheURL != "" {
+ if config.SQSQueueURL != "" {
+ if config.AmpCacheURL != "" || config.BrokerURL != "" {
+ log.Fatalln("Multiple rendezvous methods specified. " + rendezvousErrorMsg)
+ }
+ if config.SQSAccessKeyID == "" || config.SQSSecretKey == "" {
+ log.Fatalln("sqsakid and sqsskey must be specified to use SQS rendezvous method.")
+ }
+ log.Println("Through SQS queue at:", config.SQSQueueURL)
+ rendezvous, err = newSQSRendezvous(config.SQSQueueURL, config.SQSAccessKeyID, config.SQSSecretKey, brokerTransport)
+ } else if config.AmpCacheURL != "" && config.BrokerURL != "" {
log.Println("Through AMP cache at:", config.AmpCacheURL)
rendezvous, err = newAMPCacheRendezvous(
config.BrokerURL, config.AmpCacheURL, config.FrontDomains,
brokerTransport)
- } else {
+ } else if config.BrokerURL != "" {
rendezvous, err = newHTTPRendezvous(
config.BrokerURL, config.FrontDomains, brokerTransport)
+ } else {
+ log.Fatalln("No rendezvous method was specified. " + rendezvousErrorMsg)
}
if err != nil {
return nil, err
diff --git a/client/lib/rendezvous_sqs.go b/client/lib/rendezvous_sqs.go
new file mode 100644
index 0000000..0ef4df4
--- /dev/null
+++ b/client/lib/rendezvous_sqs.go
@@ -0,0 +1,135 @@
+package snowflake_client
+
+import (
+ "context"
+ "crypto/rand"
+ "encoding/hex"
+ "log"
+ "net/http"
+ "net/url"
+ "regexp"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/config"
+ "github.com/aws/aws-sdk-go-v2/credentials"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+)
+
+type sqsRendezvous struct {
+ transport http.RoundTripper
+ sqsClientID string
+ sqsClient *sqs.Client
+ sqsURL *url.URL
+}
+
+func newSQSRendezvous(sqsQueue string, sqsAccessKeyId string, sqsSecretKey string, transport http.RoundTripper) (*sqsRendezvous, error) {
+ sqsURL, err := url.Parse(sqsQueue)
+ if err != nil {
+ return nil, err
+ }
+
+ var id [8]byte
+ _, err = rand.Read(id[:])
+ if err != nil {
+ log.Fatal(err)
+ }
+ clientID := hex.EncodeToString(id[:])
+
+ queueURL := sqsURL.String()
+ hostName := sqsURL.Hostname()
+
+ regionRegex, _ := regexp.Compile(`^sqs\.([\w-]+)\.amazonaws\.com$`)
+ res := regionRegex.FindStringSubmatch(hostName)
+ if len(res) < 2 {
+ log.Fatal("Could not extract AWS region from SQS URL. Ensure that the SQS Queue URL provided is valid.")
+ }
+ region := res[1]
+ cfg, err := config.LoadDefaultConfig(context.TODO(),
+ config.WithCredentialsProvider(
+ credentials.NewStaticCredentialsProvider(sqsAccessKeyId, sqsSecretKey, ""),
+ ),
+ config.WithRegion(region),
+ )
+ if err != nil {
+ log.Fatal(err)
+ }
+ client := sqs.NewFromConfig(cfg)
+
+ log.Println("Queue URL: ", queueURL)
+ log.Println("SQS Client ID: ", clientID)
+
+ return &sqsRendezvous{
+ transport: transport,
+ sqsClientID: clientID,
+ sqsClient: client,
+ sqsURL: sqsURL,
+ }, nil
+}
+
+func (r *sqsRendezvous) Exchange(encPollReq []byte) ([]byte, error) {
+ log.Println("Negotiating via SQS Queue rendezvous...")
+
+ _, err := r.sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{
+ MessageAttributes: map[string]types.MessageAttributeValue{
+ "ClientID": {
+ DataType: aws.String("String"),
+ StringValue: aws.String(r.sqsClientID),
+ },
+ },
+ MessageBody: aws.String(string(encPollReq)),
+ QueueUrl: aws.String(r.sqsURL.String()),
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ time.Sleep(time.Second) // wait for client queue to be created by the broker
+
+ numRetries := 5
+ var responseQueueURL *string
+ for i := 0; i < numRetries; i++ {
+ // The SQS queue corresponding to the client where the SDP Answer will be placed
+ // may not be created yet. We will retry up to 5 times before we error out.
+ var res *sqs.GetQueueUrlOutput
+ res, err = r.sqsClient.GetQueueUrl(context.TODO(), &sqs.GetQueueUrlInput{
+ QueueName: aws.String("snowflake-client-" + r.sqsClientID),
+ })
+ if err != nil {
+ log.Println(err)
+ log.Printf("Attempt %d of %d to retrieve URL of response SQS queue failed.\n", i+1, numRetries)
+ time.Sleep(time.Second)
+ } else {
+ responseQueueURL = res.QueueUrl
+ break
+ }
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ var answer string
+ for i := 0; i < numRetries; i++ {
+ // Waiting for SDP Answer from proxy to be placed in SQS queue.
+ // We will retry upt to 5 times before we error out.
+ res, err := r.sqsClient.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{
+ QueueUrl: responseQueueURL,
+ MaxNumberOfMessages: 1,
+ WaitTimeSeconds: 20,
+ })
+ if err != nil {
+ return nil, err
+ }
+ if len(res.Messages) == 0 {
+ log.Printf("Attempt %d of %d to receive message from response SQS queue failed. No message found in queue.\n", i+1, numRetries)
+ delay := float64(i)/2.0 + 1
+ time.Sleep(time.Duration(delay*1000) * time.Millisecond)
+ } else {
+ answer = *res.Messages[0].Body
+ break
+ }
+ }
+
+ return []byte(answer), nil
+}
diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go
index 4b760b0..cf9bd6e 100644
--- a/client/lib/snowflake.go
+++ b/client/lib/snowflake.go
@@ -86,6 +86,12 @@ type ClientConfig struct {
// AmpCacheURL is the full URL of a valid AMP cache. A nonzero value indicates
// that AMP cache will be used as the rendezvous method with the broker.
AmpCacheURL string
+ // SQSQueueURL is the full URL of an AWS SQS Queue. A nonzero value indicates
+ // that SQS queue will be used as the rendezvous method with the broker.
+ SQSQueueURL string
+ // Access Key ID and Secret Key of the credentials used to access the AWS SQS Qeueue
+ SQSAccessKeyID string
+ SQSSecretKey string
// FrontDomain is the full URL of an optional front domain that can be used with either
// the AMP cache or HTTP domain fronting rendezvous method.
FrontDomain string
diff --git a/client/snowflake.go b/client/snowflake.go
index 1603ef0..2fdfc1c 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -16,6 +16,8 @@ import (
"sync"
"syscall"
+ "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/proxy"
+
pt "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/goptlib"
sf "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/client/lib"
@@ -82,6 +84,15 @@ func socksAcceptLoop(ln *pt.SocksListener, config sf.ClientConfig, shutdown chan
if arg, ok := conn.Req.Args.Get("ampcache"); ok {
config.AmpCacheURL = arg
}
+ if arg, ok := conn.Req.Args.Get("sqsqueue"); ok {
+ config.SQSQueueURL = arg
+ }
+ if arg, ok := conn.Req.Args.Get("sqsakid"); ok {
+ config.SQSAccessKeyID = arg
+ }
+ if arg, ok := conn.Req.Args.Get("sqsskey"); ok {
+ config.SQSSecretKey = arg
+ }
if arg, ok := conn.Req.Args.Get("fronts"); ok {
if arg != "" {
config.FrontDomains = strings.Split(strings.TrimSpace(arg), ",")
@@ -160,6 +171,9 @@ func main() {
frontDomain := flag.String("front", "", "front domain")
frontDomainsCommas := flag.String("fronts", "", "comma-separated list of front domains")
ampCacheURL := flag.String("ampcache", "", "URL of AMP cache to use as a proxy for signaling")
+ sqsQueueURL := flag.String("sqsqueue", "", "URL of SQS Queue to use as a proxy for signaling")
+ sqsAccessKeyId := flag.String("sqsakid", "", "Access Key ID for credentials to access SQS Queue ")
+ sqsSecretKey := flag.String("sqsskey", "", "Secret Key for credentials to access SQS Queue")
logFilename := flag.String("log", "", "name of log file")
logToStateDir := flag.Bool("log-to-state-dir", false, "resolve the log file relative to tor's pt state dir")
keepLocalAddresses := flag.Bool("keep-local-addresses", false, "keep local LAN address ICE candidates")
@@ -227,6 +241,9 @@ func main() {
config := sf.ClientConfig{
BrokerURL: *brokerURL,
AmpCacheURL: *ampCacheURL,
+ SQSQueueURL: *sqsQueueURL,
+ SQSAccessKeyID: *sqsAccessKeyId,
+ SQSSecretKey: *sqsSecretKey,
FrontDomains: frontDomains,
ICEAddresses: iceAddresses,
KeepLocalAddresses: *keepLocalAddresses || *oldKeepLocalAddresses,
diff --git a/doc/rendezvous-with-sqs.md b/doc/rendezvous-with-sqs.md
new file mode 100644
index 0000000..244df53
--- /dev/null
+++ b/doc/rendezvous-with-sqs.md
@@ -0,0 +1,45 @@
+# Rendezvous with Amazon SQS
+This is a new experimental rendezvous method (in addition to the existing HTTPs and AMP cache methods).
+It leverages the Amazon SQS Queue service for a client to communicate with the broker server.
+
+## Broker
+To run the broker with this rendezvous method, use the following CLI flags (they are both required):
+- `broker-sqs-name` - name of the broker SQS queue to listen for incoming messages
+- `broker-sqs-region` - name of AWS region of the SQS queue
+
+These two parameters determine the SQS queue URL that the client needs to be run with as a CLI flag in order to communicate with the broker. For example, the following values can be used:
+
+`-broker-sqs-name snowflake-broker -broker-sqs-region us-east-1`
+
+The machine on which the broker is being run must be equiped with the correct AWS configs and credentials that would allow the broker program to create, read from, and write to the SQS queue. These are typically stored at `~/.aws/config` and `~/.aws/credentials`. However, enviornment variables may also be used as described in the [AWS Docs](https://docs.aws.amazon.com/sdkref/latest/guide/creds-config-files.html)
+
+## Client
+To run the client with this rendezvous method, use the following CLI flags (they are all required):
+- `sqsqueue` - URL of the SQS queue to use as a proxy for signalling
+- `sqsakid` - AWS Access Key ID of credentials for accessing the SQS queue
+- `sqsskey` - AWS Secrety Key of credentials for accessing the SQS queue
+
+`sqsqueue` should correspond to the URL of the SQS queue that the broker is listening on.
+For the example above, the following value can be used:
+
+`-sqsqueue https://sqs.us-east-1.amazonaws.com/893902434899/snowflake-broker -sqsakid some-aws-access-key-id -sqsskey some-aws-secret-key`
+
+*Public access to SQS queues is not allowed, so there needs to be some form of authentication to be able to access the queue. Limited permission credentials will be provided by the Snowflake team to access the corresponding SQS queue.*
+
+## Implementation Details
+```
+╭――――――――――――――――――╮ ╭――――――――――――――――――╮ ╭――――――――――――――――――╮ ╭―――――――――――――――――-―╮
+│ Client │ <=> │ Amazon SQS │ <=> │ Broker │ <=> │ Snowflake Proxy │
+╰――――――――――――――――――╯ ╰――――――――――――――――――╯ ╰――――――――――――――――――╯ ╰――――――――――――――――――-╯
+```
+
+1. On startup, the **broker** ensures that an SQS queue with the name of the `broker-sqs-name` parameter exists. It will create such a queue if it doesn’t exist. Afterwards, it will enter a loop of continuously:
+ - polling for new messages
+ - cleaning up client queues
+2. **Client** sends SDP Offer to the SQS queue at the URL provided by the `sqsqueue` parameter using a message with a unique ID corresponding to the client along with the contents of the SDP Offer
+3. The **broker** will receive this message during its polling and process it.
+ - A client SQS queue with the name `"snowflake-client" + clientID` will be created for the broker to send messages to the client. This is needed because if a queue shared between all clients was used for outgoing messages from the server, then clients would have to pick off the top message, check if it is addressed to them, and then process the message if it is. This means clients would possibly have to check many messages before they find the one addressed to them.
+ - When the broker has a response for the client, it will send a message to the client queue with the details of the SDP answer.
+ - The SDP offer message from the client is then deleted from the broker queue.
+4. The **client** will continuously poll its client queue and eventually receive the message with the SDP answer from the broker.
+5. The broker server will periodically clean up the unique SQS queues it has created for each client once the queues are no longer needed (it will delete queues that were last modified before a certain amount of time ago) \ No newline at end of file
diff --git a/go.mod b/go.mod
index 00b7410..bf6672b 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,8 @@ go 1.21
require (
github.com/gorilla/websocket v1.5.1
github.com/miekg/dns v1.1.57
+ github.com/aws/aws-sdk-go-v2/config v1.25.3
+ github.com/aws/aws-sdk-go-v2/service/sqs v1.28.1
github.com/pion/ice/v2 v2.3.11
github.com/pion/sdp/v3 v3.0.6
github.com/pion/stun v0.6.1
@@ -28,6 +30,18 @@ require (
require (
github.com/andybalholm/brotli v1.0.5 // indirect
+ github.com/aws/aws-sdk-go-v2 v1.23.0 // indirect
+ github.com/aws/aws-sdk-go-v2/credentials v1.16.2 // indirect
+ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 // indirect
+ github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 // indirect
+ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sso v1.17.2 // indirect
+ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.0 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sts v1.25.3 // indirect
+ github.com/aws/smithy-go v1.17.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudflare/circl v1.3.6 // indirect
diff --git a/go.sum b/go.sum
index 84e451f..ea28722 100644
--- a/go.sum
+++ b/go.sum
@@ -2,6 +2,34 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
+github.com/aws/aws-sdk-go-v2 v1.23.0 h1:PiHAzmiQQr6JULBUdvR8fKlA+UPKLT/8KbiqpFBWiAo=
+github.com/aws/aws-sdk-go-v2 v1.23.0/go.mod h1:i1XDttT4rnf6vxc9AuskLc6s7XBee8rlLilKlc03uAA=
+github.com/aws/aws-sdk-go-v2/config v1.25.3 h1:E4m9LbwJOoncDNt3e9MPLbz/saxWcGUlZVBydydD6+8=
+github.com/aws/aws-sdk-go-v2/config v1.25.3/go.mod h1:tAByZy03nH5jcq0vZmkcVoo6tRzRHEwSFx3QW4NmDw8=
+github.com/aws/aws-sdk-go-v2/credentials v1.16.2 h1:0sdZ5cwfOAipTzZ7eOL0gw4LAhk/RZnTa16cDqIt8tg=
+github.com/aws/aws-sdk-go-v2/credentials v1.16.2/go.mod h1:sDdvGhXrSVT5yzBDR7qXz+rhbpiMpUYfF3vJ01QSdrc=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4 h1:9wKDWEjwSnXZre0/O3+ZwbBl1SmlgWYBbrTV10X/H1s=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4/go.mod h1:t4i+yGHMCcUNIX1x7YVYa6bH/Do7civ5I6cG/6PMfyA=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3 h1:DUwbD79T8gyQ23qVXFUthjzVMTviSHi3y4z58KvghhM=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3/go.mod h1:7sGSz1JCKHWWBHq98m6sMtWQikmYPpxjqOydDemiVoM=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3 h1:AplLJCtIaUZDCbr6+gLYdsYNxne4iuaboJhVt9d+WXI=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3/go.mod h1:ify42Rb7nKeDDPkFjKn7q1bPscVPu/+gmHH8d2c+anU=
+github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 h1:uR9lXYjdPX0xY+NhvaJ4dD8rpSRz5VY81ccIIoNG+lw=
+github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY=
+github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 h1:rpkF4n0CyFcrJUG/rNNohoTmhtWlFTRI4BsZOh9PvLs=
+github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1/go.mod h1:l9ymW25HOqymeU2m1gbUQ3rUIsTwKs8gYHXkqDQUhiI=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3 h1:kJOolE8xBAD13xTCgOakByZkyP4D/owNmvEiioeUNAg=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3/go.mod h1:Owv1I59vaghv1Ax8zz8ELY8DN7/Y0rGS+WWAmjgi950=
+github.com/aws/aws-sdk-go-v2/service/sqs v1.28.1 h1:rfX6lA1EW6Q5zT7Cl8RG90hCdWY4VVaobnmbgl5OIy0=
+github.com/aws/aws-sdk-go-v2/service/sqs v1.28.1/go.mod h1:gGmF6hmPsYUf/kgaSw7BOqLpdVNSfMzGSar61OX812w=
+github.com/aws/aws-sdk-go-v2/service/sso v1.17.2 h1:V47N5eKgVZoRSvx2+RQ0EpAEit/pqOhqeSQFiS4OFEQ=
+github.com/aws/aws-sdk-go-v2/service/sso v1.17.2/go.mod h1:/pE21vno3q1h4bbhUOEi+6Zu/aT26UK2WKkDXd+TssQ=
+github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.0 h1:/XiEU7VIFcVWRDQLabyrSjBoKIm8UkYgsvWDuFW8Img=
+github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.0/go.mod h1:dWqm5G767qwKPuayKfzm4rjzFmVjiBFbOJrpSPnAMDs=
+github.com/aws/aws-sdk-go-v2/service/sts v1.25.3 h1:M2w4kiMGJCCM6Ljmmx/l6mmpfa3gPJVpBencfnsgvqs=
+github.com/aws/aws-sdk-go-v2/service/sts v1.25.3/go.mod h1:4EqRHDCKP78hq3zOnmFXu5k0j4bXbRFfCh/zQ6KnEfQ=
+github.com/aws/smithy-go v1.17.0 h1:wWJD7LX6PBV6etBUwO0zElG0nWN9rUhp0WdYeHSHAaI=
+github.com/aws/smithy-go v1.17.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=