diff options
author | Cecylia Bocovich <cohosh@torproject.org> | 2023-10-28 16:04:09 -0400 |
---|---|---|
committer | Cecylia Bocovich <cohosh@torproject.org> | 2023-10-30 12:42:45 -0400 |
commit | 354cb65432a4e3fff393407749f5240b0bb97df6 (patch) | |
tree | 91ad785d102b7841e50ec654cb37c42f20f0bbcb | |
parent | 83a7422fe61546fd05c1a71d62f9a384f9f9e374 (diff) | |
download | snowflake-354cb65432a4e3fff393407749f5240b0bb97df6.tar.gz snowflake-354cb65432a4e3fff393407749f5240b0bb97df6.zip |
Move creation of periodic stats task inside proxy library
This adds a new type of SnowflakeEvent. EventOnProxyStats is triggered
by the periodic task run at SummaryInterval and produces an event with a
proxy stats output string.
-rw-r--r-- | common/event/interface.go | 9 | ||||
-rw-r--r-- | proxy/lib/pt_event_logger.go | 41 | ||||
-rw-r--r-- | proxy/lib/snowflake.go | 12 | ||||
-rw-r--r-- | proxy/main.go | 11 |
4 files changed, 54 insertions, 19 deletions
diff --git a/common/event/interface.go b/common/event/interface.go index a7fa532..d54e9d3 100644 --- a/common/event/interface.go +++ b/common/event/interface.go @@ -76,6 +76,15 @@ func (e EventOnProxyConnectionOver) String() string { return fmt.Sprintf("Proxy connection closed (↑ %d, ↓ %d)", e.InboundTraffic, e.OutboundTraffic) } +type EventOnProxyStats struct { + SnowflakeEvent + StatString string +} + +func (e EventOnProxyStats) String() string { + return e.StatString +} + type EventOnCurrentNATTypeDetermined struct { SnowflakeEvent CurNATType string diff --git a/proxy/lib/pt_event_logger.go b/proxy/lib/pt_event_logger.go index 0c5a8a9..5af9f28 100644 --- a/proxy/lib/pt_event_logger.go +++ b/proxy/lib/pt_event_logger.go @@ -1,54 +1,67 @@ package snowflake_proxy import ( - "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/task" + "fmt" "io" "log" "time" + "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/task" + "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event" ) -func NewProxyEventLogger(logPeriod time.Duration, output io.Writer) event.SnowflakeEventReceiver { +func NewProxyEventLogger(output io.Writer) event.SnowflakeEventReceiver { logger := log.New(output, "", log.LstdFlags|log.LUTC) - el := &logEventLogger{logPeriod: logPeriod, logger: logger} - el.task = &task.Periodic{Interval: logPeriod, Execute: el.logTick} - el.task.WaitThenStart() - return el + return &proxyEventLogger{logger: logger} +} + +type proxyEventLogger struct { + logger *log.Logger } -type logEventLogger struct { +func (p *proxyEventLogger) OnNewSnowflakeEvent(e event.SnowflakeEvent) { + p.logger.Println(e.String()) +} + +type periodicProxyStats struct { inboundSum int64 outboundSum int64 connectionCount int logPeriod time.Duration task *task.Periodic - logger *log.Logger + dispatcher event.SnowflakeEventDispatcher +} + +func newPeriodicProxyStats(logPeriod time.Duration, dispatcher event.SnowflakeEventDispatcher) *periodicProxyStats { + el := &periodicProxyStats{logPeriod: logPeriod, dispatcher: dispatcher} + el.task = &task.Periodic{Interval: logPeriod, Execute: el.logTick} + el.task.WaitThenStart() + return el } -func (p *logEventLogger) OnNewSnowflakeEvent(e event.SnowflakeEvent) { +func (p *periodicProxyStats) OnNewSnowflakeEvent(e event.SnowflakeEvent) { switch e.(type) { case event.EventOnProxyConnectionOver: e := e.(event.EventOnProxyConnectionOver) p.inboundSum += e.InboundTraffic p.outboundSum += e.OutboundTraffic p.connectionCount += 1 - default: - p.logger.Println(e.String()) } } -func (p *logEventLogger) logTick() error { +func (p *periodicProxyStats) logTick() error { inbound, inboundUnit := formatTraffic(p.inboundSum) outbound, outboundUnit := formatTraffic(p.outboundSum) - p.logger.Printf("In the last %v, there were %v connections. Traffic Relayed ↓ %v %v, ↑ %v %v.\n", + statString := fmt.Sprintf("In the last %v, there were %v connections. Traffic Relayed ↓ %v %v, ↑ %v %v.\n", p.logPeriod.String(), p.connectionCount, inbound, inboundUnit, outbound, outboundUnit) + p.dispatcher.OnNewSnowflakeEvent(&event.EventOnProxyStats{StatString: statString}) p.outboundSum = 0 p.inboundSum = 0 p.connectionCount = 0 return nil } -func (p *logEventLogger) Close() error { +func (p *periodicProxyStats) Close() error { return p.task.Close() } diff --git a/proxy/lib/snowflake.go b/proxy/lib/snowflake.go index 53a502d..a0d630a 100644 --- a/proxy/lib/snowflake.go +++ b/proxy/lib/snowflake.go @@ -139,6 +139,14 @@ type SnowflakeProxy struct { ProxyType string EventDispatcher event.SnowflakeEventDispatcher shutdown chan struct{} + + // DisableStatsLogger indicates whether proxy stats will be logged periodically + DisableStatsLogger bool + // SummaryInterval is the time interval at which proxy stats will be logged + SummaryInterval time.Duration + + periodicProxyStats *periodicProxyStats + bytesLogger bytesLogger } // Checks whether an IP address is a remote address for the client @@ -654,6 +662,10 @@ func (sf *SnowflakeProxy) Start() error { sf.EventDispatcher = event.NewSnowflakeEventDispatcher() } + if !sf.DisableStatsLogger { + sf.periodicProxyStats = newPeriodicProxyStats(sf.SummaryInterval, sf.EventDispatcher) + } + broker, err = newSignalingServer(sf.BrokerURL, sf.KeepLocalAddresses) if err != nil { return fmt.Errorf("error configuring broker: %s", err) diff --git a/proxy/main.go b/proxy/main.go index 66dbc8f..3713515 100644 --- a/proxy/main.go +++ b/proxy/main.go @@ -31,7 +31,7 @@ func main() { allowNonTLSRelay := flag.Bool("allow-non-tls-relay", false, "allow relay without tls encryption") NATTypeMeasurementInterval := flag.Duration("nat-retest-interval", time.Hour*24, "the time interval in second before NAT type is retested, 0s disables retest. Valid time units are \"s\", \"m\", \"h\". ") - SummaryInterval := flag.Duration("summary-interval", time.Hour, + summaryInterval := flag.Duration("summary-interval", time.Hour, "the time interval to output summary, 0s disables summaries. Valid time units are \"s\", \"m\", \"h\". ") disableStatsLogger := flag.Bool("disable-stats-logger", false, "disable the exposing mechanism for stats using logs") enableMetrics := flag.Bool("metrics", false, "enable the exposing mechanism for stats using metrics") @@ -96,6 +96,9 @@ func main() { RelayDomainNamePattern: *allowedRelayHostNamePattern, AllowNonTLSRelay: *allowNonTLSRelay, + + DisableStatsLogger: *disableStatsLogger, + SummaryInterval: *summaryInterval, } var logOutput = ioutil.Discard @@ -124,10 +127,8 @@ func main() { log.SetOutput(&safelog.LogScrubber{Output: logOutput}) } - if !*disableStatsLogger { - periodicEventLogger := sf.NewProxyEventLogger(*SummaryInterval, eventlogOutput) - eventLogger.AddSnowflakeEventListener(periodicEventLogger) - } + proxyEventLogger := sf.NewProxyEventLogger(eventlogOutput) + eventLogger.AddSnowflakeEventListener(proxyEventLogger) if *enableMetrics { metrics := sf.NewMetrics() |