aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Fifield <david@bamsoftware.com>2023-09-05 05:42:15 +0000
committerShelikhoo <xiaokangwang@outlook.com>2023-10-09 16:16:05 +0100
commit6393af6bab0f7c3c95b11352d5c582d2000062fa (patch)
treeadee64e54777dcde3d09903552dc74ffcfdb577e
parenta615e8b1ab88ac91a1a5588c1d6e41698e7af043 (diff)
downloadsnowflake-6393af6bab0f7c3c95b11352d5c582d2000062fa.tar.gz
snowflake-6393af6bab0f7c3c95b11352d5c582d2000062fa.zip
Remove proxy churn measurements from broker.
We've done the analysis we planned to do on these measurements. A program to analyze the proxy churn and extract hour-by-hour intersections is available at: https://github.com/turfed/snowflake-paper/tree/main/figures/proxy-churn Closes #40280.
-rw-r--r--broker/broker.go17
-rw-r--r--broker/ipc.go1
-rw-r--r--broker/metrics.go13
-rw-r--r--common/ipsetsink/sink.go55
-rw-r--r--common/ipsetsink/sink_test.go47
-rw-r--r--common/ipsetsink/sinkcluster/common.go24
-rw-r--r--common/ipsetsink/sinkcluster/reader.go64
-rw-r--r--common/ipsetsink/sinkcluster/writer.go68
-rw-r--r--common/ipsetsink/sinkcluster/writer_test.go33
-rw-r--r--distinctcounter/counter.go37
-rw-r--r--go.mod1
-rw-r--r--go.sum2
12 files changed, 0 insertions, 362 deletions
diff --git a/broker/broker.go b/broker/broker.go
index c3541c1..33f45ab 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -11,8 +11,6 @@ import (
"crypto/tls"
"flag"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint"
- "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink"
- "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink/sinkcluster"
"io"
"log"
"net/http"
@@ -196,8 +194,6 @@ func main() {
var certFilename, keyFilename string
var disableGeoip bool
var metricsFilename string
- var ipCountFilename, ipCountMaskingKey string
- var ipCountInterval time.Duration
var unsafeLogging bool
flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications")
@@ -214,9 +210,6 @@ func main() {
flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS")
flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection")
flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output")
- flag.StringVar(&ipCountFilename, "ip-count-log", "", "path to ip count logging output")
- flag.StringVar(&ipCountMaskingKey, "ip-count-mask", "", "masking key for ip count logging")
- flag.DurationVar(&ipCountInterval, "ip-count-interval", time.Hour, "time interval between each chunk")
flag.BoolVar(&unsafeLogging, "unsafe-logging", false, "prevent logs from being scrubbed")
flag.Parse()
@@ -264,16 +257,6 @@ func main() {
}
}
- if ipCountFilename != "" {
- ipCountFile, err := os.OpenFile(ipCountFilename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
-
- if err != nil {
- log.Fatal(err.Error())
- }
- ipSetSink := ipsetsink.NewIPSetSink(ipCountMaskingKey)
- ctx.metrics.distinctIPWriter = sinkcluster.NewClusterWriter(ipCountFile, ipCountInterval, ipSetSink)
- }
-
go ctx.Broker()
i := &IPC{ctx}
diff --git a/broker/ipc.go b/broker/ipc.go
index b77c579..7cbdd06 100644
--- a/broker/ipc.go
+++ b/broker/ipc.go
@@ -107,7 +107,6 @@ func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error {
} else {
i.ctx.metrics.lock.Lock()
i.ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType)
- i.ctx.metrics.RecordIPAddress(remoteIP)
i.ctx.metrics.lock.Unlock()
}
diff --git a/broker/metrics.go b/broker/metrics.go
index b3f9d40..8788a96 100644
--- a/broker/metrics.go
+++ b/broker/metrics.go
@@ -16,7 +16,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"gitlab.torproject.org/tpo/anti-censorship/geoip"
- "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink/sinkcluster"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages"
)
@@ -42,8 +41,6 @@ type Metrics struct {
logger *log.Logger
geoipdb *geoip.Geoip
- distinctIPWriter *sinkcluster.ClusterWriter
-
countryStats CountryStats
clientRoundtripEstimate time.Duration
proxyIdleCount uint
@@ -327,13 +324,3 @@ func initPrometheus() *PromMetrics {
return promMetrics
}
-
-func (m *Metrics) RecordIPAddress(ip string) {
- if m.distinctIPWriter != nil {
- m.distinctIPWriter.AddIPToSet(ip)
- }
-}
-
-func (m *Metrics) SetIPAddressRecorder(recorder *sinkcluster.ClusterWriter) {
- m.distinctIPWriter = recorder
-}
diff --git a/common/ipsetsink/sink.go b/common/ipsetsink/sink.go
deleted file mode 100644
index 168d061..0000000
--- a/common/ipsetsink/sink.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package ipsetsink
-
-import (
- "bytes"
- "crypto/hmac"
- "encoding/binary"
- "hash"
-
- "github.com/clarkduvall/hyperloglog"
- "golang.org/x/crypto/sha3"
-)
-
-func NewIPSetSink(maskingKey string) *IPSetSink {
- countDistinct, _ := hyperloglog.NewPlus(18)
- return &IPSetSink{
- ipMaskingKey: maskingKey,
- countDistinct: countDistinct,
- }
-}
-
-type IPSetSink struct {
- ipMaskingKey string
- countDistinct *hyperloglog.HyperLogLogPlus
-}
-
-func (s *IPSetSink) maskIPAddress(ipAddress string) []byte {
- hmacIPMasker := hmac.New(func() hash.Hash {
- return sha3.New256()
- }, []byte(s.ipMaskingKey))
- hmacIPMasker.Write([]byte(ipAddress))
- return hmacIPMasker.Sum(nil)
-}
-
-func (s *IPSetSink) AddIPToSet(ipAddress string) {
- s.countDistinct.Add(truncatedHash64FromBytes{hashValue(s.maskIPAddress(ipAddress))})
-}
-
-func (s *IPSetSink) Dump() ([]byte, error) {
- return s.countDistinct.GobEncode()
-}
-
-func (s *IPSetSink) Reset() {
- s.countDistinct.Clear()
-}
-
-type hashValue []byte
-type truncatedHash64FromBytes struct {
- hashValue
-}
-
-func (c truncatedHash64FromBytes) Sum64() uint64 {
- var value uint64
- binary.Read(bytes.NewReader(c.hashValue), binary.BigEndian, &value)
- return value
-}
diff --git a/common/ipsetsink/sink_test.go b/common/ipsetsink/sink_test.go
deleted file mode 100644
index 00ae965..0000000
--- a/common/ipsetsink/sink_test.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package ipsetsink
-
-import (
- "fmt"
- "github.com/clarkduvall/hyperloglog"
- "testing"
-)
-import . "github.com/smartystreets/goconvey/convey"
-
-func TestSinkInit(t *testing.T) {
- Convey("Context", t, func() {
- sink := NewIPSetSink("demo")
- sink.AddIPToSet("test1")
- sink.AddIPToSet("test2")
- data, err := sink.Dump()
- So(err, ShouldBeNil)
- structure, err := hyperloglog.NewPlus(18)
- So(err, ShouldBeNil)
- err = structure.GobDecode(data)
- So(err, ShouldBeNil)
- count := structure.Count()
- So(count, ShouldBeBetweenOrEqual, 1, 3)
- })
-}
-
-func TestSinkCounting(t *testing.T) {
- Convey("Context", t, func() {
- for itemCount := 300; itemCount <= 10000; itemCount += 200 {
- sink := NewIPSetSink("demo")
- for i := 0; i <= itemCount; i++ {
- sink.AddIPToSet(fmt.Sprintf("demo%v", i))
- }
- for i := 0; i <= itemCount; i++ {
- sink.AddIPToSet(fmt.Sprintf("demo%v", i))
- }
- data, err := sink.Dump()
- So(err, ShouldBeNil)
- structure, err := hyperloglog.NewPlus(18)
- So(err, ShouldBeNil)
- err = structure.GobDecode(data)
- So(err, ShouldBeNil)
- count := structure.Count()
- So((float64(count)/float64(itemCount))-1.0, ShouldAlmostEqual, 0, 0.01)
- }
-
- })
-}
diff --git a/common/ipsetsink/sinkcluster/common.go b/common/ipsetsink/sinkcluster/common.go
deleted file mode 100644
index 4360f70..0000000
--- a/common/ipsetsink/sinkcluster/common.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package sinkcluster
-
-/* ClusterWriter, and (ClusterCountResult).Count output a streamed IP set journal file to remember distinct IP address
-
- its format is as follows:
-
- This file should be in newline-delimited JSON format(https://jsonlines.org/).
- For each line, the format of json data should be in the format of:
- {"recordingStart":"2022-05-30T14:38:44.678610091Z","recordingEnd":"2022-05-30T14:39:48.157630926Z","recorded":""}
-
- recordingStart:datetime is the time this chunk of recording start.
-
- recordingEnd:datetime is the time this chunk of recording end.
-
- recorded is the checkpoint data generated by hyperloglog.
-*/
-
-import "time"
-
-type SinkEntry struct {
- RecordingStart time.Time `json:"recordingStart"`
- RecordingEnd time.Time `json:"recordingEnd"`
- Recorded []byte `json:"recorded"`
-}
diff --git a/common/ipsetsink/sinkcluster/reader.go b/common/ipsetsink/sinkcluster/reader.go
deleted file mode 100644
index 78d383b..0000000
--- a/common/ipsetsink/sinkcluster/reader.go
+++ /dev/null
@@ -1,64 +0,0 @@
-package sinkcluster
-
-import (
- "bufio"
- "encoding/json"
- "github.com/clarkduvall/hyperloglog"
- "io"
- "time"
-)
-
-func NewClusterCounter(from time.Time, to time.Time) *ClusterCounter {
- return &ClusterCounter{from: from, to: to}
-}
-
-type ClusterCounter struct {
- from time.Time
- to time.Time
-}
-
-type ClusterCountResult struct {
- Sum uint64
- ChunkIncluded int64
-}
-
-func (c ClusterCounter) Count(reader io.Reader) (*ClusterCountResult, error) {
- result := ClusterCountResult{}
- counter, err := hyperloglog.NewPlus(18)
- if err != nil {
- return nil, err
- }
- inputScanner := bufio.NewScanner(reader)
- for inputScanner.Scan() {
- inputLine := inputScanner.Bytes()
- sinkInfo := SinkEntry{}
- if err := json.Unmarshal(inputLine, &sinkInfo); err != nil {
- return nil, err
- }
-
- if (sinkInfo.RecordingStart.Before(c.from) && !sinkInfo.RecordingStart.Equal(c.from)) ||
- sinkInfo.RecordingEnd.After(c.to) {
- continue
- }
-
- restoredCounter, err := hyperloglog.NewPlus(18)
- if err != nil {
- return nil, err
- }
- err = restoredCounter.GobDecode(sinkInfo.Recorded)
- if err != nil {
- return nil, err
- }
- result.ChunkIncluded++
- err = counter.Merge(restoredCounter)
- if err != nil {
- return nil, err
- }
- }
- err = inputScanner.Err()
- if err != nil {
- return nil, err
- }
- result.Sum = counter.Count()
- return &result, nil
-}
diff --git a/common/ipsetsink/sinkcluster/writer.go b/common/ipsetsink/sinkcluster/writer.go
deleted file mode 100644
index 29cdb9c..0000000
--- a/common/ipsetsink/sinkcluster/writer.go
+++ /dev/null
@@ -1,68 +0,0 @@
-package sinkcluster
-
-import (
- "bytes"
- "encoding/json"
- "io"
- "log"
- "time"
-
- "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink"
-)
-
-func NewClusterWriter(writer WriteSyncer, writeInterval time.Duration, sink *ipsetsink.IPSetSink) *ClusterWriter {
- c := &ClusterWriter{
- writer: writer,
- lastWriteTime: time.Now(),
- writeInterval: writeInterval,
- current: sink,
- }
- return c
-}
-
-type ClusterWriter struct {
- writer WriteSyncer
- lastWriteTime time.Time
- writeInterval time.Duration
- current *ipsetsink.IPSetSink
-}
-
-type WriteSyncer interface {
- Sync() error
- io.Writer
-}
-
-func (c *ClusterWriter) WriteIPSetToDisk() {
- currentTime := time.Now()
- data, err := c.current.Dump()
- if err != nil {
- log.Println("unable able to write ipset to file:", err)
- return
- }
- entry := &SinkEntry{
- RecordingStart: c.lastWriteTime,
- RecordingEnd: currentTime,
- Recorded: data,
- }
- jsonData, err := json.Marshal(entry)
- if err != nil {
- log.Println("unable able to write ipset to file:", err)
- return
- }
- jsonData = append(jsonData, byte('\n'))
- _, err = io.Copy(c.writer, bytes.NewReader(jsonData))
- if err != nil {
- log.Println("unable able to write ipset to file:", err)
- return
- }
- c.writer.Sync()
- c.lastWriteTime = currentTime
- c.current.Reset()
-}
-
-func (c *ClusterWriter) AddIPToSet(ipAddress string) {
- if c.lastWriteTime.Add(c.writeInterval).Before(time.Now()) {
- c.WriteIPSetToDisk()
- }
- c.current.AddIPToSet(ipAddress)
-}
diff --git a/common/ipsetsink/sinkcluster/writer_test.go b/common/ipsetsink/sinkcluster/writer_test.go
deleted file mode 100644
index dd8dc9f..0000000
--- a/common/ipsetsink/sinkcluster/writer_test.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package sinkcluster
-
-import (
- "bytes"
- "io"
- "testing"
- "time"
-
- "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink"
-
- . "github.com/smartystreets/goconvey/convey"
-)
-
-type writerStub struct {
- io.Writer
-}
-
-func (w writerStub) Sync() error {
- return nil
-}
-
-func TestSinkWriter(t *testing.T) {
-
- Convey("Context", t, func() {
- buffer := bytes.NewBuffer(nil)
- writerStubInst := &writerStub{buffer}
- sink := ipsetsink.NewIPSetSink("demo")
- clusterWriter := NewClusterWriter(writerStubInst, time.Minute, sink)
- clusterWriter.AddIPToSet("1")
- clusterWriter.WriteIPSetToDisk()
- So(buffer.Bytes(), ShouldNotBeNil)
- })
-}
diff --git a/distinctcounter/counter.go b/distinctcounter/counter.go
deleted file mode 100644
index b87c965..0000000
--- a/distinctcounter/counter.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package main
-
-import (
- "flag"
- "fmt"
- "log"
- "os"
- "time"
-
- "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/ipsetsink/sinkcluster"
-)
-
-func main() {
- inputFile := flag.String("in", "", "")
- start := flag.String("from", "", "")
- end := flag.String("to", "", "")
- flag.Parse()
- startTime, err := time.Parse(time.RFC3339, *start)
- if err != nil {
- log.Fatal("unable to parse start time:", err)
- }
- endTime, err := time.Parse(time.RFC3339, *end)
- if err != nil {
- log.Fatal("unable to parse end time:", err)
- }
- fd, err := os.Open(*inputFile)
- if err != nil {
- log.Fatal("unable to open input file:", err)
- }
- counter := sinkcluster.NewClusterCounter(startTime, endTime)
- result, err := counter.Count(fd)
- if err != nil {
- log.Fatal("unable to count:", err)
- }
- fmt.Printf("sum = %v\n", result.Sum)
- fmt.Printf("chunkIncluded = %v\n", result.ChunkIncluded)
-}
diff --git a/go.mod b/go.mod
index fc3d7b9..ab5212a 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,6 @@ module gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/
go 1.21
require (
- github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004
github.com/gorilla/websocket v1.5.0
github.com/pion/ice/v2 v2.3.11
github.com/pion/sdp/v3 v3.0.6
diff --git a/go.sum b/go.sum
index d8c5ff9..7297431 100644
--- a/go.sum
+++ b/go.sum
@@ -7,8 +7,6 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
-github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004 h1:mK6JroY6bLiPS3s6QCYOSjRyErFc2iHNkhhmRfF0nHo=
-github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004/go.mod h1:drodPoQNro6QBO6TJ/MpMZbz8Bn2eSDtRN6jpG4VGw8=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEMs=
github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA=