diff options
author | Andrew Wang <324.andrew.wang@gmail.com> | 2024-01-13 16:10:12 -0500 |
---|---|---|
committer | Cecylia Bocovich <cohosh@torproject.org> | 2024-01-22 13:11:03 -0500 |
commit | 9b90b77d696a241bf4a363deee633b59a1680f85 (patch) | |
tree | cebd55780ce3e2df4383aca303e56402b33a1f9e | |
parent | 32e864b71d19145096ed93dd0af4b8b900a67081 (diff) | |
download | snowflake-9b90b77d696a241bf4a363deee633b59a1680f85.tar.gz snowflake-9b90b77d696a241bf4a363deee633b59a1680f85.zip |
Add unit tests for SQS rendezvous in broker
Co-authored-by: Michael Pu <michael.pu@uwaterloo.ca>
-rw-r--r-- | broker/broker.go | 10 | ||||
-rw-r--r-- | broker/sqs.go | 186 | ||||
-rw-r--r-- | broker/sqs_test.go | 327 | ||||
-rw-r--r-- | client/snowflake.go | 2 |
4 files changed, 435 insertions, 90 deletions
diff --git a/broker/broker.go b/broker/broker.go index 06b530a..b60faff 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -23,6 +23,8 @@ import ( "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/namematcher" @@ -283,8 +285,14 @@ func main() { // Run SQS Handler to continuously poll and process messages from SQS if brokerSQSQueueName != "" && brokerSQSQueueRegion != "" { + log.Printf("Loading SQSHandler using SQS Queue %s in region %s\n", brokerSQSQueueName, brokerSQSQueueRegion) sqsHandlerContext := context.Background() - sqsHandler, err := newSQSHandler(sqsHandlerContext, brokerSQSQueueName, brokerSQSQueueRegion, i) + cfg, err := config.LoadDefaultConfig(sqsHandlerContext, config.WithRegion(brokerSQSQueueRegion)) + if err != nil { + log.Fatal(err) + } + client := sqs.NewFromConfig(cfg) + sqsHandler, err := newSQSHandler(sqsHandlerContext, client, brokerSQSQueueName, brokerSQSQueueRegion, i) if err != nil { log.Fatal(err) } diff --git a/broker/sqs.go b/broker/sqs.go index 082f6e9..057a923 100644 --- a/broker/sqs.go +++ b/broker/sqs.go @@ -8,7 +8,6 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages" @@ -16,96 +15,109 @@ import ( ) const ( - cleanupInterval = time.Second * 30 cleanupThreshold = -2 * time.Minute ) type sqsHandler struct { - SQSClient sqsclient.SQSClient - SQSQueueURL *string - IPC *IPC + SQSClient sqsclient.SQSClient + SQSQueueURL *string + IPC *IPC + cleanupInterval time.Duration } -func (r *sqsHandler) pollMessages(context context.Context, chn chan<- *types.Message) { +func (r *sqsHandler) pollMessages(ctx context.Context, chn chan<- *types.Message) { for { - res, err := r.SQSClient.ReceiveMessage(context, &sqs.ReceiveMessageInput{ - QueueUrl: r.SQSQueueURL, - MaxNumberOfMessages: 10, - WaitTimeSeconds: 15, - MessageAttributeNames: []string{ - string(types.QueueAttributeNameAll), - }, - }) - - if err != nil { - log.Printf("SQSHandler: encountered error while polling for messages: %v\n", err) - } + select { + case <-ctx.Done(): + // if context is cancelled + return + default: + res, err := r.SQSClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ + QueueUrl: r.SQSQueueURL, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 15, + MessageAttributeNames: []string{ + string(types.QueueAttributeNameAll), + }, + }) + + if err != nil { + log.Printf("SQSHandler: encountered error while polling for messages: %v\n", err) + continue + } - for _, message := range res.Messages { - chn <- &message + for _, message := range res.Messages { + chn <- &message + } } } } -func (r *sqsHandler) cleanupClientQueues(context context.Context) { - for range time.Tick(cleanupInterval) { +func (r *sqsHandler) cleanupClientQueues(ctx context.Context) { + for range time.NewTicker(r.cleanupInterval).C { // Runs at fixed intervals to clean up any client queues that were last changed more than 2 minutes ago - queueURLsList := []string{} - var nextToken *string - for { - res, err := r.SQSClient.ListQueues(context, &sqs.ListQueuesInput{ - QueueNamePrefix: aws.String("snowflake-client-"), - MaxResults: aws.Int32(1000), - NextToken: nextToken, - }) - if err != nil { - log.Printf("SQSHandler: encountered error while retrieving client queues to clean up: %v\n", err) - } - queueURLsList = append(queueURLsList, res.QueueUrls...) - if res.NextToken == nil { - break - } else { - nextToken = res.NextToken + select { + case <-ctx.Done(): + // if context is cancelled + return + default: + queueURLsList := []string{} + var nextToken *string + for { + res, err := r.SQSClient.ListQueues(ctx, &sqs.ListQueuesInput{ + QueueNamePrefix: aws.String("snowflake-client-"), + MaxResults: aws.Int32(1000), + NextToken: nextToken, + }) + if err != nil { + log.Printf("SQSHandler: encountered error while retrieving client queues to clean up: %v\n", err) + } + queueURLsList = append(queueURLsList, res.QueueUrls...) + if res.NextToken == nil { + break + } else { + nextToken = res.NextToken + } } - } - numDeleted := 0 - cleanupCutoff := time.Now().Add(cleanupThreshold) - for _, queueURL := range queueURLsList { - if !strings.Contains(queueURL, "snowflake-client-") { - continue - } - res, err := r.SQSClient.GetQueueAttributes(context, &sqs.GetQueueAttributesInput{ - QueueUrl: aws.String(queueURL), - AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp}, - }) - if err != nil { - // According to the AWS SQS docs, the deletion process for a queue can take up to 60 seconds. So the queue - // can be in the process of being deleted, but will still be returned by the ListQueues operation, but - // fail when we try to GetQueueAttributes for the queue - log.Printf("SQSHandler: encountered error while getting attribute of client queue %s. queue may already be deleted.\n", queueURL) - continue - } - lastModifiedInt64, err := strconv.ParseInt(res.Attributes[string(types.QueueAttributeNameLastModifiedTimestamp)], 10, 64) - if err != nil { - log.Printf("SQSHandler: encountered invalid lastModifiedTimetamp value from client queue %s: %v\n", queueURL, err) - continue - } - lastModified := time.Unix(lastModifiedInt64, 0) - if lastModified.Before(cleanupCutoff) { - _, err := r.SQSClient.DeleteQueue(context, &sqs.DeleteQueueInput{ - QueueUrl: aws.String(queueURL), + numDeleted := 0 + cleanupCutoff := time.Now().Add(cleanupThreshold) + for _, queueURL := range queueURLsList { + if !strings.Contains(queueURL, "snowflake-client-") { + continue + } + res, err := r.SQSClient.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + QueueUrl: aws.String(queueURL), + AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp}, }) if err != nil { - log.Printf("SQSHandler: encountered error when deleting client queue %s: %v\n", queueURL, err) + // According to the AWS SQS docs, the deletion process for a queue can take up to 60 seconds. So the queue + // can be in the process of being deleted, but will still be returned by the ListQueues operation, but + // fail when we try to GetQueueAttributes for the queue + log.Printf("SQSHandler: encountered error while getting attribute of client queue %s. queue may already be deleted.\n", queueURL) + continue + } + lastModifiedInt64, err := strconv.ParseInt(res.Attributes[string(types.QueueAttributeNameLastModifiedTimestamp)], 10, 64) + if err != nil { + log.Printf("SQSHandler: encountered invalid lastModifiedTimetamp value from client queue %s: %v\n", queueURL, err) continue - } else { - numDeleted += 1 } + lastModified := time.Unix(lastModifiedInt64, 0) + if lastModified.Before(cleanupCutoff) { + _, err := r.SQSClient.DeleteQueue(ctx, &sqs.DeleteQueueInput{ + QueueUrl: aws.String(queueURL), + }) + if err != nil { + log.Printf("SQSHandler: encountered error when deleting client queue %s: %v\n", queueURL, err) + continue + } else { + numDeleted += 1 + } + } } + log.Printf("SQSHandler: finished running iteration of client queue cleanup. found and deleted %d client queues.\n", numDeleted) } - log.Printf("SQSHandler: finished running iteration of client queue cleanup. found and deleted %d client queues.\n", numDeleted) } } @@ -123,10 +135,11 @@ func (r *sqsHandler) handleMessage(context context.Context, message *types.Messa res, err := r.SQSClient.CreateQueue(context, &sqs.CreateQueueInput{ QueueName: aws.String("snowflake-client-" + *clientID), }) - answerSQSURL := res.QueueUrl if err != nil { log.Printf("SQSHandler: error encountered when creating answer queue for client %s: %v\n", *clientID, err) + return } + answerSQSURL := res.QueueUrl encPollReq = []byte(*message.Body) arg := messages.Arg{ @@ -153,15 +166,7 @@ func (r *sqsHandler) deleteMessage(context context.Context, message *types.Messa }) } -func newSQSHandler(context context.Context, sqsQueueName string, region string, i *IPC) (*sqsHandler, error) { - log.Printf("Loading SQSHandler using SQS Queue %s in region %s\n", sqsQueueName, region) - cfg, err := config.LoadDefaultConfig(context, config.WithRegion(region)) - if err != nil { - return nil, err - } - - client := sqs.NewFromConfig(cfg) - +func newSQSHandler(context context.Context, client sqsclient.SQSClient, sqsQueueName string, region string, i *IPC) (*sqsHandler, error) { // Creates the queue if a queue with the same name doesn't exist. If a queue with the same name and attributes // already exists, then nothing will happen. If a queue with the same name, but different attributes exists, then // an error will be returned @@ -177,20 +182,27 @@ func newSQSHandler(context context.Context, sqsQueueName string, region string, } return &sqsHandler{ - SQSClient: client, - SQSQueueURL: res.QueueUrl, - IPC: i, + SQSClient: client, + SQSQueueURL: res.QueueUrl, + IPC: i, + cleanupInterval: time.Second * 30, }, nil } -func (r *sqsHandler) PollAndHandleMessages(context context.Context) { +func (r *sqsHandler) PollAndHandleMessages(ctx context.Context) { log.Println("SQSHandler: Starting to poll for messages at: " + *r.SQSQueueURL) messagesChn := make(chan *types.Message, 2) - go r.pollMessages(context, messagesChn) - go r.cleanupClientQueues(context) + go r.pollMessages(ctx, messagesChn) + go r.cleanupClientQueues(ctx) for message := range messagesChn { - r.handleMessage(context, message) - r.deleteMessage(context, message) + select { + case <-ctx.Done(): + // if context is cancelled + return + default: + r.handleMessage(ctx, message) + r.deleteMessage(ctx, message) + } } } diff --git a/broker/sqs_test.go b/broker/sqs_test.go new file mode 100644 index 0000000..0596888 --- /dev/null +++ b/broker/sqs_test.go @@ -0,0 +1,327 @@ +package main + +import ( + "bytes" + "context" + "errors" + "log" + "strconv" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/golang/mock/gomock" + . "github.com/smartystreets/goconvey/convey" + "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/sqsclient" +) + +func TestSQS(t *testing.T) { + + Convey("Context", t, func() { + ctx := NewBrokerContext(NullLogger()) + i := &IPC{ctx} + + var logBuffer bytes.Buffer + log.SetOutput(&logBuffer) + + Convey("Responds to SQS client offers...", func() { + ctrl := gomock.NewController(t) + mockSQSClient := sqsclient.NewMockSQSClient(ctrl) + + brokerSQSQueueName := "example-name" + responseQueueURL := aws.String("https://sqs.us-east-1.amazonaws.com/testing") + + runSQSHandler := func(sqsHandlerContext context.Context) { + mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqs.CreateQueueInput{ + QueueName: aws.String(brokerSQSQueueName), + Attributes: map[string]string{ + "MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10), + }, + }).Return(&sqs.CreateQueueOutput{ + QueueUrl: responseQueueURL, + }, nil).Times(1) + sqsHandler, err := newSQSHandler(sqsHandlerContext, mockSQSClient, brokerSQSQueueName, "example-region", i) + So(err, ShouldBeNil) + go sqsHandler.PollAndHandleMessages(sqsHandlerContext) + } + + messageBody := aws.String("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}") + receiptHandle := "fake-receipt-handle" + sqsReceiveMessageInput := sqs.ReceiveMessageInput{ + QueueUrl: responseQueueURL, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 15, + MessageAttributeNames: []string{ + string(types.QueueAttributeNameAll), + }, + } + sqsDeleteMessageInput := sqs.DeleteMessageInput{ + QueueUrl: responseQueueURL, + ReceiptHandle: &receiptHandle, + } + + Convey("by ignoring it if no client id specified", func(c C) { + var wg sync.WaitGroup + wg.Add(1) + + sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background()) + defer sqsCancelFunc() + defer wg.Wait() + mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).MinTimes(1).DoAndReturn( + func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) { + return &sqs.ReceiveMessageOutput{ + Messages: []types.Message{ + { + Body: messageBody, + ReceiptHandle: &receiptHandle, + }, + }, + }, nil + }, + ) + mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).Times(1).Do( + func(ctx context.Context, input *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) { + defer wg.Done() + c.So(logBuffer.String(), ShouldContainSubstring, "SQSHandler: got SDP offer in SQS message with no client ID. ignoring this message.") + mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).AnyTimes() + }, + ) + runSQSHandler(sqsHandlerContext) + }) + + Convey("by doing nothing if an error occurs upon receipt of the message", func(c C) { + var wg sync.WaitGroup + wg.Add(2) + + sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background()) + defer sqsCancelFunc() + defer wg.Wait() + + numTimes := 0 + // When ReceiveMessage is called for the first time, the error has not had a chance to be logged yet. + // Therefore, we opt to wait for the second call because we are guaranteed that the error was logged + // by then. + mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).MinTimes(2).DoAndReturn( + func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) { + numTimes += 1 + if numTimes <= 2 { + wg.Done() + if numTimes == 2 { + c.So(logBuffer.String(), ShouldContainSubstring, "SQSHandler: encountered error while polling for messages: error") + } + } + return nil, errors.New("error") + }, + ) + runSQSHandler(sqsHandlerContext) + }) + + Convey("by attempting to create a new sqs queue...", func() { + clientId := "fake-id" + sqsCreateQueueInput := sqs.CreateQueueInput{ + QueueName: aws.String("snowflake-client-fake-id"), + } + + expectReceiveMessageReturnsValidMessage := func(sqsHandlerContext context.Context) { + mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).AnyTimes().DoAndReturn( + func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) { + return &sqs.ReceiveMessageOutput{ + Messages: []types.Message{ + { + Body: messageBody, + MessageAttributes: map[string]types.MessageAttributeValue{ + "ClientID": {StringValue: &clientId}, + }, + ReceiptHandle: &receiptHandle, + }, + }, + }, nil + }, + ) + } + + Convey("and does not attempt to send a message via SQS if queue creation fails.", func(c C) { + var wg sync.WaitGroup + wg.Add(2) + + sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background()) + defer sqsCancelFunc() + defer wg.Wait() + + expectReceiveMessageReturnsValidMessage(sqsHandlerContext) + mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqsCreateQueueInput).Return(nil, errors.New("error")).AnyTimes() + numTimes := 0 + mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).MinTimes(2).Do( + func(ctx context.Context, input *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) { + numTimes += 1 + if numTimes <= 2 { + wg.Done() + if numTimes == 2 { + c.So(logBuffer.String(), ShouldContainSubstring, "SQSHandler: error encountered when creating answer queue for client fake-id: error") + } + } + }, + ) + runSQSHandler(sqsHandlerContext) + }) + + Convey("and responds with a proxy answer if available.", func(c C) { + var wg sync.WaitGroup + wg.Add(1) + + sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background()) + defer sqsCancelFunc() + defer wg.Wait() + + expectReceiveMessageReturnsValidMessage(sqsHandlerContext) + mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqsCreateQueueInput).Return(&sqs.CreateQueueOutput{ + QueueUrl: responseQueueURL, + }, nil).AnyTimes() + mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).AnyTimes() + numTimes := 0 + mockSQSClient.EXPECT().SendMessage(sqsHandlerContext, gomock.Any()).MinTimes(1).DoAndReturn( + func(ctx context.Context, input *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) { + numTimes += 1 + if numTimes == 1 { + c.So(input.MessageBody, ShouldEqual, aws.String("{\"answer\":\"fake answer\"}")) + wg.Done() + } + return &sqs.SendMessageOutput{}, nil + }, + ) + runSQSHandler(sqsHandlerContext) + + snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0) + + offer := <-snowflake.offerChannel + So(offer.sdp, ShouldResemble, []byte("fake")) + + snowflake.answerChannel <- "fake answer" + }) + }) + }) + + Convey("Cleans up SQS client queues...", func() { + brokerSQSQueueName := "example-name" + responseQueueURL := aws.String("https://sqs.us-east-1.amazonaws.com/testing") + + ctrl := gomock.NewController(t) + mockSQSClient := sqsclient.NewMockSQSClient(ctrl) + + runSQSHandler := func(sqsHandlerContext context.Context) { + + mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqs.CreateQueueInput{ + QueueName: aws.String(brokerSQSQueueName), + Attributes: map[string]string{ + "MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10), + }, + }).Return(&sqs.CreateQueueOutput{ + QueueUrl: responseQueueURL, + }, nil).Times(1) + + mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, gomock.Any()).AnyTimes().Return( + &sqs.ReceiveMessageOutput{ + Messages: []types.Message{}, + }, nil, + ) + + sqsHandler, err := newSQSHandler(sqsHandlerContext, mockSQSClient, brokerSQSQueueName, "example-region", i) + So(err, ShouldBeNil) + // Set the cleanup interval to 1 ns so we can immediately test the cleanup logic + sqsHandler.cleanupInterval = time.Nanosecond + + go sqsHandler.PollAndHandleMessages(sqsHandlerContext) + } + + Convey("does nothing if there are no open queues.", func() { + var wg sync.WaitGroup + wg.Add(1) + sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background()) + defer wg.Wait() + + mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{ + QueueNamePrefix: aws.String("snowflake-client-"), + MaxResults: aws.Int32(1000), + NextToken: nil, + }).DoAndReturn(func(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) { + wg.Done() + // Cancel the handler context since we are only interested in testing one iteration of the cleanup + sqsCancelFunc() + return &sqs.ListQueuesOutput{ + QueueUrls: []string{}, + }, nil + }) + + runSQSHandler(sqsHandlerContext) + }) + + Convey("deletes open queue when there is one open queue.", func(c C) { + var wg sync.WaitGroup + wg.Add(1) + sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background()) + + clientQueueUrl1 := "https://sqs.us-east-1.amazonaws.com/snowflake-client-1" + clientQueueUrl2 := "https://sqs.us-east-1.amazonaws.com/snowflake-client-2" + + gomock.InOrder( + mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{ + QueueNamePrefix: aws.String("snowflake-client-"), + MaxResults: aws.Int32(1000), + NextToken: nil, + }).Times(1).Return(&sqs.ListQueuesOutput{ + QueueUrls: []string{ + clientQueueUrl1, + clientQueueUrl2, + }, + }, nil), + mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{ + QueueNamePrefix: aws.String("snowflake-client-"), + MaxResults: aws.Int32(1000), + NextToken: nil, + }).Times(1).DoAndReturn(func(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) { + // Executed on second iteration of cleanupClientQueues loop. This means that one full iteration has completed and we can verify the results of that iteration + wg.Done() + sqsCancelFunc() + c.So(logBuffer.String(), ShouldContainSubstring, "SQSHandler: finished running iteration of client queue cleanup. found and deleted 2 client queues.") + return &sqs.ListQueuesOutput{ + QueueUrls: []string{}, + }, nil + }), + ) + + gomock.InOrder( + mockSQSClient.EXPECT().GetQueueAttributes(sqsHandlerContext, &sqs.GetQueueAttributesInput{ + QueueUrl: aws.String(clientQueueUrl1), + AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp}, + }).Times(1).Return(&sqs.GetQueueAttributesOutput{ + Attributes: map[string]string{ + string(types.QueueAttributeNameLastModifiedTimestamp): "0", + }}, nil), + + mockSQSClient.EXPECT().GetQueueAttributes(sqsHandlerContext, &sqs.GetQueueAttributesInput{ + QueueUrl: aws.String(clientQueueUrl2), + AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp}, + }).Times(1).Return(&sqs.GetQueueAttributesOutput{ + Attributes: map[string]string{ + string(types.QueueAttributeNameLastModifiedTimestamp): "0", + }}, nil), + ) + + gomock.InOrder( + mockSQSClient.EXPECT().DeleteQueue(sqsHandlerContext, &sqs.DeleteQueueInput{ + QueueUrl: aws.String(clientQueueUrl1), + }).Return(&sqs.DeleteQueueOutput{}, nil), + mockSQSClient.EXPECT().DeleteQueue(sqsHandlerContext, &sqs.DeleteQueueInput{ + QueueUrl: aws.String(clientQueueUrl2), + }).Return(&sqs.DeleteQueueOutput{}, nil), + ) + + runSQSHandler(sqsHandlerContext) + wg.Wait() + }) + }) + }) +} diff --git a/client/snowflake.go b/client/snowflake.go index 2fdfc1c..c678964 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -16,8 +16,6 @@ import ( "sync" "syscall" - "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/proxy" - pt "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/goptlib" sf "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/client/lib" |