diff options
author | David Fifield <david@bamsoftware.com> | 2023-09-05 05:42:15 +0000 |
---|---|---|
committer | Shelikhoo <xiaokangwang@outlook.com> | 2023-10-09 16:16:05 +0100 |
commit | 6393af6bab0f7c3c95b11352d5c582d2000062fa (patch) | |
tree | adee64e54777dcde3d09903552dc74ffcfdb577e | |
parent | a615e8b1ab88ac91a1a5588c1d6e41698e7af043 (diff) | |
download | snowflake-6393af6bab0f7c3c95b11352d5c582d2000062fa.tar.gz snowflake-6393af6bab0f7c3c95b11352d5c582d2000062fa.zip |
Remove proxy churn measurements from broker.
We've done the analysis we planned to do on these measurements.
A program to analyze the proxy churn and extract hour-by-hour
intersections is available at:
https://github.com/turfed/snowflake-paper/tree/main/figures/proxy-churn
Closes #40280.
-rw-r--r-- | broker/broker.go | 17 | ||||
-rw-r--r-- | broker/ipc.go | 1 | ||||
-rw-r--r-- | broker/metrics.go | 13 | ||||
-rw-r--r-- | common/ipsetsink/sink.go | 55 | ||||
-rw-r--r-- | common/ipsetsink/sink_test.go | 47 | ||||
-rw-r--r-- | common/ipsetsink/sinkcluster/common.go | 24 | ||||
-rw-r--r-- | common/ipsetsink/sinkcluster/reader.go | 64 | ||||
-rw-r--r-- | common/ipsetsink/sinkcluster/writer.go | 68 | ||||
-rw-r--r-- | common/ipsetsink/sinkcluster/writer_test.go | 33 | ||||
-rw-r--r-- | distinctcounter/counter.go | 37 | ||||
-rw-r--r-- | go.mod | 1 | ||||
-rw-r--r-- | go.sum | 2 |
12 files changed, 0 insertions, 362 deletions
diff --git a/broker/broker.go b/broker/broker.go index c3541c1..33f45ab 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -11,8 +11,6 @@ import ( "crypto/tls" "flag" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint" - "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink" - "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink/sinkcluster" "io" "log" "net/http" @@ -196,8 +194,6 @@ func main() { var certFilename, keyFilename string var disableGeoip bool var metricsFilename string - var ipCountFilename, ipCountMaskingKey string - var ipCountInterval time.Duration var unsafeLogging bool flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications") @@ -214,9 +210,6 @@ func main() { flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS") flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection") flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output") - flag.StringVar(&ipCountFilename, "ip-count-log", "", "path to ip count logging output") - flag.StringVar(&ipCountMaskingKey, "ip-count-mask", "", "masking key for ip count logging") - flag.DurationVar(&ipCountInterval, "ip-count-interval", time.Hour, "time interval between each chunk") flag.BoolVar(&unsafeLogging, "unsafe-logging", false, "prevent logs from being scrubbed") flag.Parse() @@ -264,16 +257,6 @@ func main() { } } - if ipCountFilename != "" { - ipCountFile, err := os.OpenFile(ipCountFilename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - - if err != nil { - log.Fatal(err.Error()) - } - ipSetSink := ipsetsink.NewIPSetSink(ipCountMaskingKey) - ctx.metrics.distinctIPWriter = sinkcluster.NewClusterWriter(ipCountFile, ipCountInterval, ipSetSink) - } - go ctx.Broker() i := &IPC{ctx} diff --git a/broker/ipc.go b/broker/ipc.go index b77c579..7cbdd06 100644 --- a/broker/ipc.go +++ b/broker/ipc.go @@ -107,7 +107,6 @@ func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error { } else { i.ctx.metrics.lock.Lock() i.ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType) - i.ctx.metrics.RecordIPAddress(remoteIP) i.ctx.metrics.lock.Unlock() } diff --git a/broker/metrics.go b/broker/metrics.go index b3f9d40..8788a96 100644 --- a/broker/metrics.go +++ b/broker/metrics.go @@ -16,7 +16,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "gitlab.torproject.org/tpo/anti-censorship/geoip" - "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink/sinkcluster" "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages" ) @@ -42,8 +41,6 @@ type Metrics struct { logger *log.Logger geoipdb *geoip.Geoip - distinctIPWriter *sinkcluster.ClusterWriter - countryStats CountryStats clientRoundtripEstimate time.Duration proxyIdleCount uint @@ -327,13 +324,3 @@ func initPrometheus() *PromMetrics { return promMetrics } - -func (m *Metrics) RecordIPAddress(ip string) { - if m.distinctIPWriter != nil { - m.distinctIPWriter.AddIPToSet(ip) - } -} - -func (m *Metrics) SetIPAddressRecorder(recorder *sinkcluster.ClusterWriter) { - m.distinctIPWriter = recorder -} diff --git a/common/ipsetsink/sink.go b/common/ipsetsink/sink.go deleted file mode 100644 index 168d061..0000000 --- a/common/ipsetsink/sink.go +++ /dev/null @@ -1,55 +0,0 @@ -package ipsetsink - -import ( - "bytes" - "crypto/hmac" - "encoding/binary" - "hash" - - "github.com/clarkduvall/hyperloglog" - "golang.org/x/crypto/sha3" -) - -func NewIPSetSink(maskingKey string) *IPSetSink { - countDistinct, _ := hyperloglog.NewPlus(18) - return &IPSetSink{ - ipMaskingKey: maskingKey, - countDistinct: countDistinct, - } -} - -type IPSetSink struct { - ipMaskingKey string - countDistinct *hyperloglog.HyperLogLogPlus -} - -func (s *IPSetSink) maskIPAddress(ipAddress string) []byte { - hmacIPMasker := hmac.New(func() hash.Hash { - return sha3.New256() - }, []byte(s.ipMaskingKey)) - hmacIPMasker.Write([]byte(ipAddress)) - return hmacIPMasker.Sum(nil) -} - -func (s *IPSetSink) AddIPToSet(ipAddress string) { - s.countDistinct.Add(truncatedHash64FromBytes{hashValue(s.maskIPAddress(ipAddress))}) -} - -func (s *IPSetSink) Dump() ([]byte, error) { - return s.countDistinct.GobEncode() -} - -func (s *IPSetSink) Reset() { - s.countDistinct.Clear() -} - -type hashValue []byte -type truncatedHash64FromBytes struct { - hashValue -} - -func (c truncatedHash64FromBytes) Sum64() uint64 { - var value uint64 - binary.Read(bytes.NewReader(c.hashValue), binary.BigEndian, &value) - return value -} diff --git a/common/ipsetsink/sink_test.go b/common/ipsetsink/sink_test.go deleted file mode 100644 index 00ae965..0000000 --- a/common/ipsetsink/sink_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package ipsetsink - -import ( - "fmt" - "github.com/clarkduvall/hyperloglog" - "testing" -) -import . "github.com/smartystreets/goconvey/convey" - -func TestSinkInit(t *testing.T) { - Convey("Context", t, func() { - sink := NewIPSetSink("demo") - sink.AddIPToSet("test1") - sink.AddIPToSet("test2") - data, err := sink.Dump() - So(err, ShouldBeNil) - structure, err := hyperloglog.NewPlus(18) - So(err, ShouldBeNil) - err = structure.GobDecode(data) - So(err, ShouldBeNil) - count := structure.Count() - So(count, ShouldBeBetweenOrEqual, 1, 3) - }) -} - -func TestSinkCounting(t *testing.T) { - Convey("Context", t, func() { - for itemCount := 300; itemCount <= 10000; itemCount += 200 { - sink := NewIPSetSink("demo") - for i := 0; i <= itemCount; i++ { - sink.AddIPToSet(fmt.Sprintf("demo%v", i)) - } - for i := 0; i <= itemCount; i++ { - sink.AddIPToSet(fmt.Sprintf("demo%v", i)) - } - data, err := sink.Dump() - So(err, ShouldBeNil) - structure, err := hyperloglog.NewPlus(18) - So(err, ShouldBeNil) - err = structure.GobDecode(data) - So(err, ShouldBeNil) - count := structure.Count() - So((float64(count)/float64(itemCount))-1.0, ShouldAlmostEqual, 0, 0.01) - } - - }) -} diff --git a/common/ipsetsink/sinkcluster/common.go b/common/ipsetsink/sinkcluster/common.go deleted file mode 100644 index 4360f70..0000000 --- a/common/ipsetsink/sinkcluster/common.go +++ /dev/null @@ -1,24 +0,0 @@ -package sinkcluster - -/* ClusterWriter, and (ClusterCountResult).Count output a streamed IP set journal file to remember distinct IP address - - its format is as follows: - - This file should be in newline-delimited JSON format(https://jsonlines.org/). - For each line, the format of json data should be in the format of: - {"recordingStart":"2022-05-30T14:38:44.678610091Z","recordingEnd":"2022-05-30T14:39:48.157630926Z","recorded":""} - - recordingStart:datetime is the time this chunk of recording start. - - recordingEnd:datetime is the time this chunk of recording end. - - recorded is the checkpoint data generated by hyperloglog. -*/ - -import "time" - -type SinkEntry struct { - RecordingStart time.Time `json:"recordingStart"` - RecordingEnd time.Time `json:"recordingEnd"` - Recorded []byte `json:"recorded"` -} diff --git a/common/ipsetsink/sinkcluster/reader.go b/common/ipsetsink/sinkcluster/reader.go deleted file mode 100644 index 78d383b..0000000 --- a/common/ipsetsink/sinkcluster/reader.go +++ /dev/null @@ -1,64 +0,0 @@ -package sinkcluster - -import ( - "bufio" - "encoding/json" - "github.com/clarkduvall/hyperloglog" - "io" - "time" -) - -func NewClusterCounter(from time.Time, to time.Time) *ClusterCounter { - return &ClusterCounter{from: from, to: to} -} - -type ClusterCounter struct { - from time.Time - to time.Time -} - -type ClusterCountResult struct { - Sum uint64 - ChunkIncluded int64 -} - -func (c ClusterCounter) Count(reader io.Reader) (*ClusterCountResult, error) { - result := ClusterCountResult{} - counter, err := hyperloglog.NewPlus(18) - if err != nil { - return nil, err - } - inputScanner := bufio.NewScanner(reader) - for inputScanner.Scan() { - inputLine := inputScanner.Bytes() - sinkInfo := SinkEntry{} - if err := json.Unmarshal(inputLine, &sinkInfo); err != nil { - return nil, err - } - - if (sinkInfo.RecordingStart.Before(c.from) && !sinkInfo.RecordingStart.Equal(c.from)) || - sinkInfo.RecordingEnd.After(c.to) { - continue - } - - restoredCounter, err := hyperloglog.NewPlus(18) - if err != nil { - return nil, err - } - err = restoredCounter.GobDecode(sinkInfo.Recorded) - if err != nil { - return nil, err - } - result.ChunkIncluded++ - err = counter.Merge(restoredCounter) - if err != nil { - return nil, err - } - } - err = inputScanner.Err() - if err != nil { - return nil, err - } - result.Sum = counter.Count() - return &result, nil -} diff --git a/common/ipsetsink/sinkcluster/writer.go b/common/ipsetsink/sinkcluster/writer.go deleted file mode 100644 index 29cdb9c..0000000 --- a/common/ipsetsink/sinkcluster/writer.go +++ /dev/null @@ -1,68 +0,0 @@ -package sinkcluster - -import ( - "bytes" - "encoding/json" - "io" - "log" - "time" - - "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink" -) - -func NewClusterWriter(writer WriteSyncer, writeInterval time.Duration, sink *ipsetsink.IPSetSink) *ClusterWriter { - c := &ClusterWriter{ - writer: writer, - lastWriteTime: time.Now(), - writeInterval: writeInterval, - current: sink, - } - return c -} - -type ClusterWriter struct { - writer WriteSyncer - lastWriteTime time.Time - writeInterval time.Duration - current *ipsetsink.IPSetSink -} - -type WriteSyncer interface { - Sync() error - io.Writer -} - -func (c *ClusterWriter) WriteIPSetToDisk() { - currentTime := time.Now() - data, err := c.current.Dump() - if err != nil { - log.Println("unable able to write ipset to file:", err) - return - } - entry := &SinkEntry{ - RecordingStart: c.lastWriteTime, - RecordingEnd: currentTime, - Recorded: data, - } - jsonData, err := json.Marshal(entry) - if err != nil { - log.Println("unable able to write ipset to file:", err) - return - } - jsonData = append(jsonData, byte('\n')) - _, err = io.Copy(c.writer, bytes.NewReader(jsonData)) - if err != nil { - log.Println("unable able to write ipset to file:", err) - return - } - c.writer.Sync() - c.lastWriteTime = currentTime - c.current.Reset() -} - -func (c *ClusterWriter) AddIPToSet(ipAddress string) { - if c.lastWriteTime.Add(c.writeInterval).Before(time.Now()) { - c.WriteIPSetToDisk() - } - c.current.AddIPToSet(ipAddress) -} diff --git a/common/ipsetsink/sinkcluster/writer_test.go b/common/ipsetsink/sinkcluster/writer_test.go deleted file mode 100644 index dd8dc9f..0000000 --- a/common/ipsetsink/sinkcluster/writer_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package sinkcluster - -import ( - "bytes" - "io" - "testing" - "time" - - "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink" - - . "github.com/smartystreets/goconvey/convey" -) - -type writerStub struct { - io.Writer -} - -func (w writerStub) Sync() error { - return nil -} - -func TestSinkWriter(t *testing.T) { - - Convey("Context", t, func() { - buffer := bytes.NewBuffer(nil) - writerStubInst := &writerStub{buffer} - sink := ipsetsink.NewIPSetSink("demo") - clusterWriter := NewClusterWriter(writerStubInst, time.Minute, sink) - clusterWriter.AddIPToSet("1") - clusterWriter.WriteIPSetToDisk() - So(buffer.Bytes(), ShouldNotBeNil) - }) -} diff --git a/distinctcounter/counter.go b/distinctcounter/counter.go deleted file mode 100644 index b87c965..0000000 --- a/distinctcounter/counter.go +++ /dev/null @@ -1,37 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "log" - "os" - "time" - - "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink/sinkcluster" -) - -func main() { - inputFile := flag.String("in", "", "") - start := flag.String("from", "", "") - end := flag.String("to", "", "") - flag.Parse() - startTime, err := time.Parse(time.RFC3339, *start) - if err != nil { - log.Fatal("unable to parse start time:", err) - } - endTime, err := time.Parse(time.RFC3339, *end) - if err != nil { - log.Fatal("unable to parse end time:", err) - } - fd, err := os.Open(*inputFile) - if err != nil { - log.Fatal("unable to open input file:", err) - } - counter := sinkcluster.NewClusterCounter(startTime, endTime) - result, err := counter.Count(fd) - if err != nil { - log.Fatal("unable to count:", err) - } - fmt.Printf("sum = %v\n", result.Sum) - fmt.Printf("chunkIncluded = %v\n", result.ChunkIncluded) -} @@ -3,7 +3,6 @@ module gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/ go 1.21 require ( - github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004 github.com/gorilla/websocket v1.5.0 github.com/pion/ice/v2 v2.3.11 github.com/pion/sdp/v3 v3.0.6 @@ -7,8 +7,6 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004 h1:mK6JroY6bLiPS3s6QCYOSjRyErFc2iHNkhhmRfF0nHo= -github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004/go.mod h1:drodPoQNro6QBO6TJ/MpMZbz8Bn2eSDtRN6jpG4VGw8= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEMs= github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA= |