summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnthony Chang <anthony20093@gmail.com>2024-01-29 18:24:25 -0500
committerCecylia Bocovich <cohosh@torproject.org>2024-01-31 14:34:29 -0500
commitdbecefa7d2a6a6c0d59f85e62919dcce1230a22a (patch)
treead50468654781a4a84d1e9c14cdc65574dcb943c
parent26ceb6e20dd4e46268c3183516417ad7330fa1fc (diff)
downloadsnowflake-dbecefa7d2a6a6c0d59f85e62919dcce1230a22a.tar.gz
snowflake-dbecefa7d2a6a6c0d59f85e62919dcce1230a22a.zip
Move RendezvousMethod field to messages.Arg
-rw-r--r--broker/amp.go7
-rw-r--r--broker/http.go7
-rw-r--r--broker/ipc.go15
-rw-r--r--broker/metrics.go36
-rw-r--r--broker/sqs.go7
-rw-r--r--common/messages/ipc.go13
6 files changed, 45 insertions, 40 deletions
diff --git a/broker/amp.go b/broker/amp.go
index b2def12..4c7a036 100644
--- a/broker/amp.go
+++ b/broker/amp.go
@@ -34,10 +34,11 @@ func ampClientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
encPollReq, err = amp.DecodePath(path)
if err == nil {
arg := messages.Arg{
- Body: encPollReq,
- RemoteAddr: "",
+ Body: encPollReq,
+ RemoteAddr: "",
+ RendezvousMethod: messages.RendezvousAmpCache,
}
- err = i.ClientOffers(arg, &response, RendezvousAmpCache)
+ err = i.ClientOffers(arg, &response)
} else {
response, err = (&messages.ClientPollResponse{
Error: "cannot decode URL path",
diff --git a/broker/http.go b/broker/http.go
index ca9488f..2b68747 100644
--- a/broker/http.go
+++ b/broker/http.go
@@ -167,12 +167,13 @@ func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
}
arg := messages.Arg{
- Body: body,
- RemoteAddr: "",
+ Body: body,
+ RemoteAddr: "",
+ RendezvousMethod: messages.RendezvousHttp,
}
var response []byte
- err = i.ClientOffers(arg, &response, RendezvousHttp)
+ err = i.ClientOffers(arg, &response)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
diff --git a/broker/ipc.go b/broker/ipc.go
index c61df1b..9116a1a 100644
--- a/broker/ipc.go
+++ b/broker/ipc.go
@@ -161,7 +161,8 @@ func sendClientResponse(resp *messages.ClientPollResponse, response *[]byte) err
}
}
-func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte, rendezvousMethod RendezvousMethod) error {
+func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
+
startTime := time.Now()
req, err := messages.DecodeClientPollRequest(arg.Body)
@@ -195,12 +196,12 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte, rendezvousMethod
snowflake.offerChannel <- offer
} else {
i.ctx.metrics.lock.Lock()
- i.ctx.metrics.clientDeniedCount[rendezvousMethod]++
- i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied", "rendezvous_method": string(rendezvousMethod)}).Inc()
+ i.ctx.metrics.clientDeniedCount[arg.RendezvousMethod]++
+ i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied", "rendezvous_method": string(arg.RendezvousMethod)}).Inc()
if offer.natType == NATUnrestricted {
- i.ctx.metrics.clientUnrestrictedDeniedCount[rendezvousMethod]++
+ i.ctx.metrics.clientUnrestrictedDeniedCount[arg.RendezvousMethod]++
} else {
- i.ctx.metrics.clientRestrictedDeniedCount[rendezvousMethod]++
+ i.ctx.metrics.clientRestrictedDeniedCount[arg.RendezvousMethod]++
}
i.ctx.metrics.lock.Unlock()
resp := &messages.ClientPollResponse{Error: messages.StrNoProxies}
@@ -211,8 +212,8 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte, rendezvousMethod
select {
case answer := <-snowflake.answerChannel:
i.ctx.metrics.lock.Lock()
- i.ctx.metrics.clientProxyMatchCount[rendezvousMethod]++
- i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched", "rendezvous_method": string(rendezvousMethod)}).Inc()
+ i.ctx.metrics.clientProxyMatchCount[arg.RendezvousMethod]++
+ i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched", "rendezvous_method": string(arg.RendezvousMethod)}).Inc()
i.ctx.metrics.lock.Unlock()
resp := &messages.ClientPollResponse{Answer: answer}
err = sendClientResponse(resp, response)
diff --git a/broker/metrics.go b/broker/metrics.go
index 5b0d3e5..5dfcd6c 100644
--- a/broker/metrics.go
+++ b/broker/metrics.go
@@ -36,14 +36,6 @@ type CountryStats struct {
counts map[string]int
}
-type RendezvousMethod string
-
-const (
- RendezvousHttp RendezvousMethod = "http"
- RendezvousAmpCache RendezvousMethod = "ampcache"
- RendezvousSqs RendezvousMethod = "sqs"
-)
-
// Implements Observable
type Metrics struct {
logger *log.Logger
@@ -52,10 +44,10 @@ type Metrics struct {
countryStats CountryStats
clientRoundtripEstimate time.Duration
proxyIdleCount uint
- clientDeniedCount map[RendezvousMethod]uint
- clientRestrictedDeniedCount map[RendezvousMethod]uint
- clientUnrestrictedDeniedCount map[RendezvousMethod]uint
- clientProxyMatchCount map[RendezvousMethod]uint
+ clientDeniedCount map[messages.RendezvousMethod]uint
+ clientRestrictedDeniedCount map[messages.RendezvousMethod]uint
+ clientUnrestrictedDeniedCount map[messages.RendezvousMethod]uint
+ clientProxyMatchCount map[messages.RendezvousMethod]uint
proxyPollWithRelayURLExtension uint
proxyPollWithoutRelayURLExtension uint
@@ -160,10 +152,10 @@ func (m *Metrics) LoadGeoipDatabases(geoipDB string, geoip6DB string) error {
func NewMetrics(metricsLogger *log.Logger) (*Metrics, error) {
m := new(Metrics)
- m.clientDeniedCount = make(map[RendezvousMethod]uint)
- m.clientRestrictedDeniedCount = make(map[RendezvousMethod]uint)
- m.clientUnrestrictedDeniedCount = make(map[RendezvousMethod]uint)
- m.clientProxyMatchCount = make(map[RendezvousMethod]uint)
+ m.clientDeniedCount = make(map[messages.RendezvousMethod]uint)
+ m.clientRestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
+ m.clientUnrestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
+ m.clientProxyMatchCount = make(map[messages.RendezvousMethod]uint)
m.countryStats = CountryStats{
counts: make(map[string]int),
@@ -215,7 +207,7 @@ func (m *Metrics) printMetrics() {
m.logger.Println("client-unrestricted-denied-count", binCount(sumMapValues(&m.clientUnrestrictedDeniedCount)))
m.logger.Println("client-snowflake-match-count", binCount(sumMapValues(&m.clientProxyMatchCount)))
- for _, rendezvousMethod := range [3]RendezvousMethod{RendezvousHttp, RendezvousAmpCache, RendezvousSqs} {
+ for _, rendezvousMethod := range [3]messages.RendezvousMethod{messages.RendezvousHttp, messages.RendezvousAmpCache, messages.RendezvousSqs} {
m.logger.Printf("client-%s-denied-count %d\n", rendezvousMethod, binCount(m.clientDeniedCount[rendezvousMethod]))
m.logger.Printf("client-%s-restricted-denied-count %d\n", rendezvousMethod, binCount(m.clientRestrictedDeniedCount[rendezvousMethod]))
m.logger.Printf("client-%s-unrestricted-denied-count %d\n", rendezvousMethod, binCount(m.clientUnrestrictedDeniedCount[rendezvousMethod]))
@@ -231,13 +223,13 @@ func (m *Metrics) printMetrics() {
// Restores all metrics to original values
func (m *Metrics) zeroMetrics() {
m.proxyIdleCount = 0
- m.clientDeniedCount = make(map[RendezvousMethod]uint)
- m.clientRestrictedDeniedCount = make(map[RendezvousMethod]uint)
- m.clientUnrestrictedDeniedCount = make(map[RendezvousMethod]uint)
+ m.clientDeniedCount = make(map[messages.RendezvousMethod]uint)
+ m.clientRestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
+ m.clientUnrestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.proxyPollRejectedWithRelayURLExtension = 0
m.proxyPollWithRelayURLExtension = 0
m.proxyPollWithoutRelayURLExtension = 0
- m.clientProxyMatchCount = make(map[RendezvousMethod]uint)
+ m.clientProxyMatchCount = make(map[messages.RendezvousMethod]uint)
m.countryStats.counts = make(map[string]int)
for pType := range m.countryStats.proxies {
m.countryStats.proxies[pType] = make(map[string]bool)
@@ -253,7 +245,7 @@ func binCount(count uint) uint {
return uint((math.Ceil(float64(count) / 8)) * 8)
}
-func sumMapValues(m *map[RendezvousMethod]uint) uint {
+func sumMapValues(m *map[messages.RendezvousMethod]uint) uint {
var s uint = 0
for _, v := range *m {
s += v
diff --git a/broker/sqs.go b/broker/sqs.go
index 59fa4c7..42e5dd3 100644
--- a/broker/sqs.go
+++ b/broker/sqs.go
@@ -145,10 +145,11 @@ func (r *sqsHandler) handleMessage(context context.Context, message *types.Messa
encPollReq = []byte(*message.Body)
arg := messages.Arg{
- Body: encPollReq,
- RemoteAddr: "",
+ Body: encPollReq,
+ RemoteAddr: "",
+ RendezvousMethod: messages.RendezvousSqs,
}
- err = r.IPC.ClientOffers(arg, &response, RendezvousSqs)
+ err = r.IPC.ClientOffers(arg, &response)
if err != nil {
log.Printf("SQSHandler: error encountered when handling message: %v\n", err)
diff --git a/common/messages/ipc.go b/common/messages/ipc.go
index 3250742..2a61b9d 100644
--- a/common/messages/ipc.go
+++ b/common/messages/ipc.go
@@ -4,9 +4,18 @@ import (
"errors"
)
+type RendezvousMethod string
+
+const (
+ RendezvousHttp RendezvousMethod = "http"
+ RendezvousAmpCache RendezvousMethod = "ampcache"
+ RendezvousSqs RendezvousMethod = "sqs"
+)
+
type Arg struct {
- Body []byte
- RemoteAddr string
+ Body []byte
+ RemoteAddr string
+ RendezvousMethod RendezvousMethod
}
var (