summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Wang <324.andrew.wang@gmail.com>2024-01-13 16:10:12 -0500
committerCecylia Bocovich <cohosh@torproject.org>2024-01-22 13:11:03 -0500
commit9b90b77d696a241bf4a363deee633b59a1680f85 (patch)
treecebd55780ce3e2df4383aca303e56402b33a1f9e
parent32e864b71d19145096ed93dd0af4b8b900a67081 (diff)
downloadsnowflake-9b90b77d696a241bf4a363deee633b59a1680f85.tar.gz
snowflake-9b90b77d696a241bf4a363deee633b59a1680f85.zip
Add unit tests for SQS rendezvous in broker
Co-authored-by: Michael Pu <michael.pu@uwaterloo.ca>
-rw-r--r--broker/broker.go10
-rw-r--r--broker/sqs.go186
-rw-r--r--broker/sqs_test.go327
-rw-r--r--client/snowflake.go2
4 files changed, 435 insertions, 90 deletions
diff --git a/broker/broker.go b/broker/broker.go
index 06b530a..b60faff 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -23,6 +23,8 @@ import (
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint"
+ "github.com/aws/aws-sdk-go-v2/config"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/namematcher"
@@ -283,8 +285,14 @@ func main() {
// Run SQS Handler to continuously poll and process messages from SQS
if brokerSQSQueueName != "" && brokerSQSQueueRegion != "" {
+ log.Printf("Loading SQSHandler using SQS Queue %s in region %s\n", brokerSQSQueueName, brokerSQSQueueRegion)
sqsHandlerContext := context.Background()
- sqsHandler, err := newSQSHandler(sqsHandlerContext, brokerSQSQueueName, brokerSQSQueueRegion, i)
+ cfg, err := config.LoadDefaultConfig(sqsHandlerContext, config.WithRegion(brokerSQSQueueRegion))
+ if err != nil {
+ log.Fatal(err)
+ }
+ client := sqs.NewFromConfig(cfg)
+ sqsHandler, err := newSQSHandler(sqsHandlerContext, client, brokerSQSQueueName, brokerSQSQueueRegion, i)
if err != nil {
log.Fatal(err)
}
diff --git a/broker/sqs.go b/broker/sqs.go
index 082f6e9..057a923 100644
--- a/broker/sqs.go
+++ b/broker/sqs.go
@@ -8,7 +8,6 @@ import (
"time"
"github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages"
@@ -16,96 +15,109 @@ import (
)
const (
- cleanupInterval = time.Second * 30
cleanupThreshold = -2 * time.Minute
)
type sqsHandler struct {
- SQSClient sqsclient.SQSClient
- SQSQueueURL *string
- IPC *IPC
+ SQSClient sqsclient.SQSClient
+ SQSQueueURL *string
+ IPC *IPC
+ cleanupInterval time.Duration
}
-func (r *sqsHandler) pollMessages(context context.Context, chn chan<- *types.Message) {
+func (r *sqsHandler) pollMessages(ctx context.Context, chn chan<- *types.Message) {
for {
- res, err := r.SQSClient.ReceiveMessage(context, &sqs.ReceiveMessageInput{
- QueueUrl: r.SQSQueueURL,
- MaxNumberOfMessages: 10,
- WaitTimeSeconds: 15,
- MessageAttributeNames: []string{
- string(types.QueueAttributeNameAll),
- },
- })
-
- if err != nil {
- log.Printf("SQSHandler: encountered error while polling for messages: %v\n", err)
- }
+ select {
+ case <-ctx.Done():
+ // if context is cancelled
+ return
+ default:
+ res, err := r.SQSClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
+ QueueUrl: r.SQSQueueURL,
+ MaxNumberOfMessages: 10,
+ WaitTimeSeconds: 15,
+ MessageAttributeNames: []string{
+ string(types.QueueAttributeNameAll),
+ },
+ })
+
+ if err != nil {
+ log.Printf("SQSHandler: encountered error while polling for messages: %v\n", err)
+ continue
+ }
- for _, message := range res.Messages {
- chn <- &message
+ for _, message := range res.Messages {
+ chn <- &message
+ }
}
}
}
-func (r *sqsHandler) cleanupClientQueues(context context.Context) {
- for range time.Tick(cleanupInterval) {
+func (r *sqsHandler) cleanupClientQueues(ctx context.Context) {
+ for range time.NewTicker(r.cleanupInterval).C {
// Runs at fixed intervals to clean up any client queues that were last changed more than 2 minutes ago
- queueURLsList := []string{}
- var nextToken *string
- for {
- res, err := r.SQSClient.ListQueues(context, &sqs.ListQueuesInput{
- QueueNamePrefix: aws.String("snowflake-client-"),
- MaxResults: aws.Int32(1000),
- NextToken: nextToken,
- })
- if err != nil {
- log.Printf("SQSHandler: encountered error while retrieving client queues to clean up: %v\n", err)
- }
- queueURLsList = append(queueURLsList, res.QueueUrls...)
- if res.NextToken == nil {
- break
- } else {
- nextToken = res.NextToken
+ select {
+ case <-ctx.Done():
+ // if context is cancelled
+ return
+ default:
+ queueURLsList := []string{}
+ var nextToken *string
+ for {
+ res, err := r.SQSClient.ListQueues(ctx, &sqs.ListQueuesInput{
+ QueueNamePrefix: aws.String("snowflake-client-"),
+ MaxResults: aws.Int32(1000),
+ NextToken: nextToken,
+ })
+ if err != nil {
+ log.Printf("SQSHandler: encountered error while retrieving client queues to clean up: %v\n", err)
+ }
+ queueURLsList = append(queueURLsList, res.QueueUrls...)
+ if res.NextToken == nil {
+ break
+ } else {
+ nextToken = res.NextToken
+ }
}
- }
- numDeleted := 0
- cleanupCutoff := time.Now().Add(cleanupThreshold)
- for _, queueURL := range queueURLsList {
- if !strings.Contains(queueURL, "snowflake-client-") {
- continue
- }
- res, err := r.SQSClient.GetQueueAttributes(context, &sqs.GetQueueAttributesInput{
- QueueUrl: aws.String(queueURL),
- AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
- })
- if err != nil {
- // According to the AWS SQS docs, the deletion process for a queue can take up to 60 seconds. So the queue
- // can be in the process of being deleted, but will still be returned by the ListQueues operation, but
- // fail when we try to GetQueueAttributes for the queue
- log.Printf("SQSHandler: encountered error while getting attribute of client queue %s. queue may already be deleted.\n", queueURL)
- continue
- }
- lastModifiedInt64, err := strconv.ParseInt(res.Attributes[string(types.QueueAttributeNameLastModifiedTimestamp)], 10, 64)
- if err != nil {
- log.Printf("SQSHandler: encountered invalid lastModifiedTimetamp value from client queue %s: %v\n", queueURL, err)
- continue
- }
- lastModified := time.Unix(lastModifiedInt64, 0)
- if lastModified.Before(cleanupCutoff) {
- _, err := r.SQSClient.DeleteQueue(context, &sqs.DeleteQueueInput{
- QueueUrl: aws.String(queueURL),
+ numDeleted := 0
+ cleanupCutoff := time.Now().Add(cleanupThreshold)
+ for _, queueURL := range queueURLsList {
+ if !strings.Contains(queueURL, "snowflake-client-") {
+ continue
+ }
+ res, err := r.SQSClient.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: aws.String(queueURL),
+ AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
})
if err != nil {
- log.Printf("SQSHandler: encountered error when deleting client queue %s: %v\n", queueURL, err)
+ // According to the AWS SQS docs, the deletion process for a queue can take up to 60 seconds. So the queue
+ // can be in the process of being deleted, but will still be returned by the ListQueues operation, but
+ // fail when we try to GetQueueAttributes for the queue
+ log.Printf("SQSHandler: encountered error while getting attribute of client queue %s. queue may already be deleted.\n", queueURL)
+ continue
+ }
+ lastModifiedInt64, err := strconv.ParseInt(res.Attributes[string(types.QueueAttributeNameLastModifiedTimestamp)], 10, 64)
+ if err != nil {
+ log.Printf("SQSHandler: encountered invalid lastModifiedTimetamp value from client queue %s: %v\n", queueURL, err)
continue
- } else {
- numDeleted += 1
}
+ lastModified := time.Unix(lastModifiedInt64, 0)
+ if lastModified.Before(cleanupCutoff) {
+ _, err := r.SQSClient.DeleteQueue(ctx, &sqs.DeleteQueueInput{
+ QueueUrl: aws.String(queueURL),
+ })
+ if err != nil {
+ log.Printf("SQSHandler: encountered error when deleting client queue %s: %v\n", queueURL, err)
+ continue
+ } else {
+ numDeleted += 1
+ }
+ }
}
+ log.Printf("SQSHandler: finished running iteration of client queue cleanup. found and deleted %d client queues.\n", numDeleted)
}
- log.Printf("SQSHandler: finished running iteration of client queue cleanup. found and deleted %d client queues.\n", numDeleted)
}
}
@@ -123,10 +135,11 @@ func (r *sqsHandler) handleMessage(context context.Context, message *types.Messa
res, err := r.SQSClient.CreateQueue(context, &sqs.CreateQueueInput{
QueueName: aws.String("snowflake-client-" + *clientID),
})
- answerSQSURL := res.QueueUrl
if err != nil {
log.Printf("SQSHandler: error encountered when creating answer queue for client %s: %v\n", *clientID, err)
+ return
}
+ answerSQSURL := res.QueueUrl
encPollReq = []byte(*message.Body)
arg := messages.Arg{
@@ -153,15 +166,7 @@ func (r *sqsHandler) deleteMessage(context context.Context, message *types.Messa
})
}
-func newSQSHandler(context context.Context, sqsQueueName string, region string, i *IPC) (*sqsHandler, error) {
- log.Printf("Loading SQSHandler using SQS Queue %s in region %s\n", sqsQueueName, region)
- cfg, err := config.LoadDefaultConfig(context, config.WithRegion(region))
- if err != nil {
- return nil, err
- }
-
- client := sqs.NewFromConfig(cfg)
-
+func newSQSHandler(context context.Context, client sqsclient.SQSClient, sqsQueueName string, region string, i *IPC) (*sqsHandler, error) {
// Creates the queue if a queue with the same name doesn't exist. If a queue with the same name and attributes
// already exists, then nothing will happen. If a queue with the same name, but different attributes exists, then
// an error will be returned
@@ -177,20 +182,27 @@ func newSQSHandler(context context.Context, sqsQueueName string, region string,
}
return &sqsHandler{
- SQSClient: client,
- SQSQueueURL: res.QueueUrl,
- IPC: i,
+ SQSClient: client,
+ SQSQueueURL: res.QueueUrl,
+ IPC: i,
+ cleanupInterval: time.Second * 30,
}, nil
}
-func (r *sqsHandler) PollAndHandleMessages(context context.Context) {
+func (r *sqsHandler) PollAndHandleMessages(ctx context.Context) {
log.Println("SQSHandler: Starting to poll for messages at: " + *r.SQSQueueURL)
messagesChn := make(chan *types.Message, 2)
- go r.pollMessages(context, messagesChn)
- go r.cleanupClientQueues(context)
+ go r.pollMessages(ctx, messagesChn)
+ go r.cleanupClientQueues(ctx)
for message := range messagesChn {
- r.handleMessage(context, message)
- r.deleteMessage(context, message)
+ select {
+ case <-ctx.Done():
+ // if context is cancelled
+ return
+ default:
+ r.handleMessage(ctx, message)
+ r.deleteMessage(ctx, message)
+ }
}
}
diff --git a/broker/sqs_test.go b/broker/sqs_test.go
new file mode 100644
index 0000000..0596888
--- /dev/null
+++ b/broker/sqs_test.go
@@ -0,0 +1,327 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "log"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ "github.com/golang/mock/gomock"
+ . "github.com/smartystreets/goconvey/convey"
+ "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/sqsclient"
+)
+
+func TestSQS(t *testing.T) {
+
+ Convey("Context", t, func() {
+ ctx := NewBrokerContext(NullLogger())
+ i := &IPC{ctx}
+
+ var logBuffer bytes.Buffer
+ log.SetOutput(&logBuffer)
+
+ Convey("Responds to SQS client offers...", func() {
+ ctrl := gomock.NewController(t)
+ mockSQSClient := sqsclient.NewMockSQSClient(ctrl)
+
+ brokerSQSQueueName := "example-name"
+ responseQueueURL := aws.String("https://sqs.us-east-1.amazonaws.com/testing")
+
+ runSQSHandler := func(sqsHandlerContext context.Context) {
+ mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqs.CreateQueueInput{
+ QueueName: aws.String(brokerSQSQueueName),
+ Attributes: map[string]string{
+ "MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
+ },
+ }).Return(&sqs.CreateQueueOutput{
+ QueueUrl: responseQueueURL,
+ }, nil).Times(1)
+ sqsHandler, err := newSQSHandler(sqsHandlerContext, mockSQSClient, brokerSQSQueueName, "example-region", i)
+ So(err, ShouldBeNil)
+ go sqsHandler.PollAndHandleMessages(sqsHandlerContext)
+ }
+
+ messageBody := aws.String("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")
+ receiptHandle := "fake-receipt-handle"
+ sqsReceiveMessageInput := sqs.ReceiveMessageInput{
+ QueueUrl: responseQueueURL,
+ MaxNumberOfMessages: 10,
+ WaitTimeSeconds: 15,
+ MessageAttributeNames: []string{
+ string(types.QueueAttributeNameAll),
+ },
+ }
+ sqsDeleteMessageInput := sqs.DeleteMessageInput{
+ QueueUrl: responseQueueURL,
+ ReceiptHandle: &receiptHandle,
+ }
+
+ Convey("by ignoring it if no client id specified", func(c C) {
+ var wg sync.WaitGroup
+ wg.Add(1)
+
+ sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
+ defer sqsCancelFunc()
+ defer wg.Wait()
+ mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).MinTimes(1).DoAndReturn(
+ func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
+ return &sqs.ReceiveMessageOutput{
+ Messages: []types.Message{
+ {
+ Body: messageBody,
+ ReceiptHandle: &receiptHandle,
+ },
+ },
+ }, nil
+ },
+ )
+ mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).Times(1).Do(
+ func(ctx context.Context, input *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) {
+ defer wg.Done()
+ c.So(logBuffer.String(), ShouldContainSubstring, "SQSHandler: got SDP offer in SQS message with no client ID. ignoring this message.")
+ mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).AnyTimes()
+ },
+ )
+ runSQSHandler(sqsHandlerContext)
+ })
+
+ Convey("by doing nothing if an error occurs upon receipt of the message", func(c C) {
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
+ defer sqsCancelFunc()
+ defer wg.Wait()
+
+ numTimes := 0
+ // When ReceiveMessage is called for the first time, the error has not had a chance to be logged yet.
+ // Therefore, we opt to wait for the second call because we are guaranteed that the error was logged
+ // by then.
+ mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).MinTimes(2).DoAndReturn(
+ func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
+ numTimes += 1
+ if numTimes <= 2 {
+ wg.Done()
+ if numTimes == 2 {
+ c.So(logBuffer.String(), ShouldContainSubstring, "SQSHandler: encountered error while polling for messages: error")
+ }
+ }
+ return nil, errors.New("error")
+ },
+ )
+ runSQSHandler(sqsHandlerContext)
+ })
+
+ Convey("by attempting to create a new sqs queue...", func() {
+ clientId := "fake-id"
+ sqsCreateQueueInput := sqs.CreateQueueInput{
+ QueueName: aws.String("snowflake-client-fake-id"),
+ }
+
+ expectReceiveMessageReturnsValidMessage := func(sqsHandlerContext context.Context) {
+ mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).AnyTimes().DoAndReturn(
+ func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
+ return &sqs.ReceiveMessageOutput{
+ Messages: []types.Message{
+ {
+ Body: messageBody,
+ MessageAttributes: map[string]types.MessageAttributeValue{
+ "ClientID": {StringValue: &clientId},
+ },
+ ReceiptHandle: &receiptHandle,
+ },
+ },
+ }, nil
+ },
+ )
+ }
+
+ Convey("and does not attempt to send a message via SQS if queue creation fails.", func(c C) {
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
+ defer sqsCancelFunc()
+ defer wg.Wait()
+
+ expectReceiveMessageReturnsValidMessage(sqsHandlerContext)
+ mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqsCreateQueueInput).Return(nil, errors.New("error")).AnyTimes()
+ numTimes := 0
+ mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).MinTimes(2).Do(
+ func(ctx context.Context, input *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) {
+ numTimes += 1
+ if numTimes <= 2 {
+ wg.Done()
+ if numTimes == 2 {
+ c.So(logBuffer.String(), ShouldContainSubstring, "SQSHandler: error encountered when creating answer queue for client fake-id: error")
+ }
+ }
+ },
+ )
+ runSQSHandler(sqsHandlerContext)
+ })
+
+ Convey("and responds with a proxy answer if available.", func(c C) {
+ var wg sync.WaitGroup
+ wg.Add(1)
+
+ sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
+ defer sqsCancelFunc()
+ defer wg.Wait()
+
+ expectReceiveMessageReturnsValidMessage(sqsHandlerContext)
+ mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqsCreateQueueInput).Return(&sqs.CreateQueueOutput{
+ QueueUrl: responseQueueURL,
+ }, nil).AnyTimes()
+ mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).AnyTimes()
+ numTimes := 0
+ mockSQSClient.EXPECT().SendMessage(sqsHandlerContext, gomock.Any()).MinTimes(1).DoAndReturn(
+ func(ctx context.Context, input *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) {
+ numTimes += 1
+ if numTimes == 1 {
+ c.So(input.MessageBody, ShouldEqual, aws.String("{\"answer\":\"fake answer\"}"))
+ wg.Done()
+ }
+ return &sqs.SendMessageOutput{}, nil
+ },
+ )
+ runSQSHandler(sqsHandlerContext)
+
+ snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
+
+ offer := <-snowflake.offerChannel
+ So(offer.sdp, ShouldResemble, []byte("fake"))
+
+ snowflake.answerChannel <- "fake answer"
+ })
+ })
+ })
+
+ Convey("Cleans up SQS client queues...", func() {
+ brokerSQSQueueName := "example-name"
+ responseQueueURL := aws.String("https://sqs.us-east-1.amazonaws.com/testing")
+
+ ctrl := gomock.NewController(t)
+ mockSQSClient := sqsclient.NewMockSQSClient(ctrl)
+
+ runSQSHandler := func(sqsHandlerContext context.Context) {
+
+ mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqs.CreateQueueInput{
+ QueueName: aws.String(brokerSQSQueueName),
+ Attributes: map[string]string{
+ "MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
+ },
+ }).Return(&sqs.CreateQueueOutput{
+ QueueUrl: responseQueueURL,
+ }, nil).Times(1)
+
+ mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, gomock.Any()).AnyTimes().Return(
+ &sqs.ReceiveMessageOutput{
+ Messages: []types.Message{},
+ }, nil,
+ )
+
+ sqsHandler, err := newSQSHandler(sqsHandlerContext, mockSQSClient, brokerSQSQueueName, "example-region", i)
+ So(err, ShouldBeNil)
+ // Set the cleanup interval to 1 ns so we can immediately test the cleanup logic
+ sqsHandler.cleanupInterval = time.Nanosecond
+
+ go sqsHandler.PollAndHandleMessages(sqsHandlerContext)
+ }
+
+ Convey("does nothing if there are no open queues.", func() {
+ var wg sync.WaitGroup
+ wg.Add(1)
+ sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
+ defer wg.Wait()
+
+ mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
+ QueueNamePrefix: aws.String("snowflake-client-"),
+ MaxResults: aws.Int32(1000),
+ NextToken: nil,
+ }).DoAndReturn(func(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) {
+ wg.Done()
+ // Cancel the handler context since we are only interested in testing one iteration of the cleanup
+ sqsCancelFunc()
+ return &sqs.ListQueuesOutput{
+ QueueUrls: []string{},
+ }, nil
+ })
+
+ runSQSHandler(sqsHandlerContext)
+ })
+
+ Convey("deletes open queue when there is one open queue.", func(c C) {
+ var wg sync.WaitGroup
+ wg.Add(1)
+ sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
+
+ clientQueueUrl1 := "https://sqs.us-east-1.amazonaws.com/snowflake-client-1"
+ clientQueueUrl2 := "https://sqs.us-east-1.amazonaws.com/snowflake-client-2"
+
+ gomock.InOrder(
+ mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
+ QueueNamePrefix: aws.String("snowflake-client-"),
+ MaxResults: aws.Int32(1000),
+ NextToken: nil,
+ }).Times(1).Return(&sqs.ListQueuesOutput{
+ QueueUrls: []string{
+ clientQueueUrl1,
+ clientQueueUrl2,
+ },
+ }, nil),
+ mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
+ QueueNamePrefix: aws.String("snowflake-client-"),
+ MaxResults: aws.Int32(1000),
+ NextToken: nil,
+ }).Times(1).DoAndReturn(func(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) {
+ // Executed on second iteration of cleanupClientQueues loop. This means that one full iteration has completed and we can verify the results of that iteration
+ wg.Done()
+ sqsCancelFunc()
+ c.So(logBuffer.String(), ShouldContainSubstring, "SQSHandler: finished running iteration of client queue cleanup. found and deleted 2 client queues.")
+ return &sqs.ListQueuesOutput{
+ QueueUrls: []string{},
+ }, nil
+ }),
+ )
+
+ gomock.InOrder(
+ mockSQSClient.EXPECT().GetQueueAttributes(sqsHandlerContext, &sqs.GetQueueAttributesInput{
+ QueueUrl: aws.String(clientQueueUrl1),
+ AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
+ }).Times(1).Return(&sqs.GetQueueAttributesOutput{
+ Attributes: map[string]string{
+ string(types.QueueAttributeNameLastModifiedTimestamp): "0",
+ }}, nil),
+
+ mockSQSClient.EXPECT().GetQueueAttributes(sqsHandlerContext, &sqs.GetQueueAttributesInput{
+ QueueUrl: aws.String(clientQueueUrl2),
+ AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
+ }).Times(1).Return(&sqs.GetQueueAttributesOutput{
+ Attributes: map[string]string{
+ string(types.QueueAttributeNameLastModifiedTimestamp): "0",
+ }}, nil),
+ )
+
+ gomock.InOrder(
+ mockSQSClient.EXPECT().DeleteQueue(sqsHandlerContext, &sqs.DeleteQueueInput{
+ QueueUrl: aws.String(clientQueueUrl1),
+ }).Return(&sqs.DeleteQueueOutput{}, nil),
+ mockSQSClient.EXPECT().DeleteQueue(sqsHandlerContext, &sqs.DeleteQueueInput{
+ QueueUrl: aws.String(clientQueueUrl2),
+ }).Return(&sqs.DeleteQueueOutput{}, nil),
+ )
+
+ runSQSHandler(sqsHandlerContext)
+ wg.Wait()
+ })
+ })
+ })
+}
diff --git a/client/snowflake.go b/client/snowflake.go
index 2fdfc1c..c678964 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -16,8 +16,6 @@ import (
"sync"
"syscall"
- "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/proxy"
-
pt "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/goptlib"
sf "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/client/lib"