diff options
author | Anthony Chang <anthony20093@gmail.com> | 2024-01-29 18:24:25 -0500 |
---|---|---|
committer | Cecylia Bocovich <cohosh@torproject.org> | 2024-01-31 14:34:29 -0500 |
commit | dbecefa7d2a6a6c0d59f85e62919dcce1230a22a (patch) | |
tree | ad50468654781a4a84d1e9c14cdc65574dcb943c | |
parent | 26ceb6e20dd4e46268c3183516417ad7330fa1fc (diff) | |
download | snowflake-dbecefa7d2a6a6c0d59f85e62919dcce1230a22a.tar.gz snowflake-dbecefa7d2a6a6c0d59f85e62919dcce1230a22a.zip |
Move RendezvousMethod field to messages.Arg
-rw-r--r-- | broker/amp.go | 7 | ||||
-rw-r--r-- | broker/http.go | 7 | ||||
-rw-r--r-- | broker/ipc.go | 15 | ||||
-rw-r--r-- | broker/metrics.go | 36 | ||||
-rw-r--r-- | broker/sqs.go | 7 | ||||
-rw-r--r-- | common/messages/ipc.go | 13 |
6 files changed, 45 insertions, 40 deletions
diff --git a/broker/amp.go b/broker/amp.go index b2def12..4c7a036 100644 --- a/broker/amp.go +++ b/broker/amp.go @@ -34,10 +34,11 @@ func ampClientOffers(i *IPC, w http.ResponseWriter, r *http.Request) { encPollReq, err = amp.DecodePath(path) if err == nil { arg := messages.Arg{ - Body: encPollReq, - RemoteAddr: "", + Body: encPollReq, + RemoteAddr: "", + RendezvousMethod: messages.RendezvousAmpCache, } - err = i.ClientOffers(arg, &response, RendezvousAmpCache) + err = i.ClientOffers(arg, &response) } else { response, err = (&messages.ClientPollResponse{ Error: "cannot decode URL path", diff --git a/broker/http.go b/broker/http.go index ca9488f..2b68747 100644 --- a/broker/http.go +++ b/broker/http.go @@ -167,12 +167,13 @@ func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) { } arg := messages.Arg{ - Body: body, - RemoteAddr: "", + Body: body, + RemoteAddr: "", + RendezvousMethod: messages.RendezvousHttp, } var response []byte - err = i.ClientOffers(arg, &response, RendezvousHttp) + err = i.ClientOffers(arg, &response) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) diff --git a/broker/ipc.go b/broker/ipc.go index c61df1b..9116a1a 100644 --- a/broker/ipc.go +++ b/broker/ipc.go @@ -161,7 +161,8 @@ func sendClientResponse(resp *messages.ClientPollResponse, response *[]byte) err } } -func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte, rendezvousMethod RendezvousMethod) error { +func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error { + startTime := time.Now() req, err := messages.DecodeClientPollRequest(arg.Body) @@ -195,12 +196,12 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte, rendezvousMethod snowflake.offerChannel <- offer } else { i.ctx.metrics.lock.Lock() - i.ctx.metrics.clientDeniedCount[rendezvousMethod]++ - i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied", "rendezvous_method": string(rendezvousMethod)}).Inc() + i.ctx.metrics.clientDeniedCount[arg.RendezvousMethod]++ + i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied", "rendezvous_method": string(arg.RendezvousMethod)}).Inc() if offer.natType == NATUnrestricted { - i.ctx.metrics.clientUnrestrictedDeniedCount[rendezvousMethod]++ + i.ctx.metrics.clientUnrestrictedDeniedCount[arg.RendezvousMethod]++ } else { - i.ctx.metrics.clientRestrictedDeniedCount[rendezvousMethod]++ + i.ctx.metrics.clientRestrictedDeniedCount[arg.RendezvousMethod]++ } i.ctx.metrics.lock.Unlock() resp := &messages.ClientPollResponse{Error: messages.StrNoProxies} @@ -211,8 +212,8 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte, rendezvousMethod select { case answer := <-snowflake.answerChannel: i.ctx.metrics.lock.Lock() - i.ctx.metrics.clientProxyMatchCount[rendezvousMethod]++ - i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched", "rendezvous_method": string(rendezvousMethod)}).Inc() + i.ctx.metrics.clientProxyMatchCount[arg.RendezvousMethod]++ + i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched", "rendezvous_method": string(arg.RendezvousMethod)}).Inc() i.ctx.metrics.lock.Unlock() resp := &messages.ClientPollResponse{Answer: answer} err = sendClientResponse(resp, response) diff --git a/broker/metrics.go b/broker/metrics.go index 5b0d3e5..5dfcd6c 100644 --- a/broker/metrics.go +++ b/broker/metrics.go @@ -36,14 +36,6 @@ type CountryStats struct { counts map[string]int } -type RendezvousMethod string - -const ( - RendezvousHttp RendezvousMethod = "http" - RendezvousAmpCache RendezvousMethod = "ampcache" - RendezvousSqs RendezvousMethod = "sqs" -) - // Implements Observable type Metrics struct { logger *log.Logger @@ -52,10 +44,10 @@ type Metrics struct { countryStats CountryStats clientRoundtripEstimate time.Duration proxyIdleCount uint - clientDeniedCount map[RendezvousMethod]uint - clientRestrictedDeniedCount map[RendezvousMethod]uint - clientUnrestrictedDeniedCount map[RendezvousMethod]uint - clientProxyMatchCount map[RendezvousMethod]uint + clientDeniedCount map[messages.RendezvousMethod]uint + clientRestrictedDeniedCount map[messages.RendezvousMethod]uint + clientUnrestrictedDeniedCount map[messages.RendezvousMethod]uint + clientProxyMatchCount map[messages.RendezvousMethod]uint proxyPollWithRelayURLExtension uint proxyPollWithoutRelayURLExtension uint @@ -160,10 +152,10 @@ func (m *Metrics) LoadGeoipDatabases(geoipDB string, geoip6DB string) error { func NewMetrics(metricsLogger *log.Logger) (*Metrics, error) { m := new(Metrics) - m.clientDeniedCount = make(map[RendezvousMethod]uint) - m.clientRestrictedDeniedCount = make(map[RendezvousMethod]uint) - m.clientUnrestrictedDeniedCount = make(map[RendezvousMethod]uint) - m.clientProxyMatchCount = make(map[RendezvousMethod]uint) + m.clientDeniedCount = make(map[messages.RendezvousMethod]uint) + m.clientRestrictedDeniedCount = make(map[messages.RendezvousMethod]uint) + m.clientUnrestrictedDeniedCount = make(map[messages.RendezvousMethod]uint) + m.clientProxyMatchCount = make(map[messages.RendezvousMethod]uint) m.countryStats = CountryStats{ counts: make(map[string]int), @@ -215,7 +207,7 @@ func (m *Metrics) printMetrics() { m.logger.Println("client-unrestricted-denied-count", binCount(sumMapValues(&m.clientUnrestrictedDeniedCount))) m.logger.Println("client-snowflake-match-count", binCount(sumMapValues(&m.clientProxyMatchCount))) - for _, rendezvousMethod := range [3]RendezvousMethod{RendezvousHttp, RendezvousAmpCache, RendezvousSqs} { + for _, rendezvousMethod := range [3]messages.RendezvousMethod{messages.RendezvousHttp, messages.RendezvousAmpCache, messages.RendezvousSqs} { m.logger.Printf("client-%s-denied-count %d\n", rendezvousMethod, binCount(m.clientDeniedCount[rendezvousMethod])) m.logger.Printf("client-%s-restricted-denied-count %d\n", rendezvousMethod, binCount(m.clientRestrictedDeniedCount[rendezvousMethod])) m.logger.Printf("client-%s-unrestricted-denied-count %d\n", rendezvousMethod, binCount(m.clientUnrestrictedDeniedCount[rendezvousMethod])) @@ -231,13 +223,13 @@ func (m *Metrics) printMetrics() { // Restores all metrics to original values func (m *Metrics) zeroMetrics() { m.proxyIdleCount = 0 - m.clientDeniedCount = make(map[RendezvousMethod]uint) - m.clientRestrictedDeniedCount = make(map[RendezvousMethod]uint) - m.clientUnrestrictedDeniedCount = make(map[RendezvousMethod]uint) + m.clientDeniedCount = make(map[messages.RendezvousMethod]uint) + m.clientRestrictedDeniedCount = make(map[messages.RendezvousMethod]uint) + m.clientUnrestrictedDeniedCount = make(map[messages.RendezvousMethod]uint) m.proxyPollRejectedWithRelayURLExtension = 0 m.proxyPollWithRelayURLExtension = 0 m.proxyPollWithoutRelayURLExtension = 0 - m.clientProxyMatchCount = make(map[RendezvousMethod]uint) + m.clientProxyMatchCount = make(map[messages.RendezvousMethod]uint) m.countryStats.counts = make(map[string]int) for pType := range m.countryStats.proxies { m.countryStats.proxies[pType] = make(map[string]bool) @@ -253,7 +245,7 @@ func binCount(count uint) uint { return uint((math.Ceil(float64(count) / 8)) * 8) } -func sumMapValues(m *map[RendezvousMethod]uint) uint { +func sumMapValues(m *map[messages.RendezvousMethod]uint) uint { var s uint = 0 for _, v := range *m { s += v diff --git a/broker/sqs.go b/broker/sqs.go index 59fa4c7..42e5dd3 100644 --- a/broker/sqs.go +++ b/broker/sqs.go @@ -145,10 +145,11 @@ func (r *sqsHandler) handleMessage(context context.Context, message *types.Messa encPollReq = []byte(*message.Body) arg := messages.Arg{ - Body: encPollReq, - RemoteAddr: "", + Body: encPollReq, + RemoteAddr: "", + RendezvousMethod: messages.RendezvousSqs, } - err = r.IPC.ClientOffers(arg, &response, RendezvousSqs) + err = r.IPC.ClientOffers(arg, &response) if err != nil { log.Printf("SQSHandler: error encountered when handling message: %v\n", err) diff --git a/common/messages/ipc.go b/common/messages/ipc.go index 3250742..2a61b9d 100644 --- a/common/messages/ipc.go +++ b/common/messages/ipc.go @@ -4,9 +4,18 @@ import ( "errors" ) +type RendezvousMethod string + +const ( + RendezvousHttp RendezvousMethod = "http" + RendezvousAmpCache RendezvousMethod = "ampcache" + RendezvousSqs RendezvousMethod = "sqs" +) + type Arg struct { - Body []byte - RemoteAddr string + Body []byte + RemoteAddr string + RendezvousMethod RendezvousMethod } var ( |