From 48c3b87eb824deb1cb3178a7cdd42276dbc70d2d Mon Sep 17 00:00:00 2001 From: Josh Bleecher Snyder Date: Mon, 11 Jan 2021 17:34:02 -0800 Subject: device: use channel close to shut down and drain decryption channel This is similar to commit e1fa1cc5560020e67d33aa7e74674853671cf0a0, but for the decryption channel. It is an alternative fix to f9f655567930a4cd78d40fa4ba0d58503335ae6a. Signed-off-by: Josh Bleecher Snyder --- device/device.go | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) (limited to 'device/device.go') diff --git a/device/device.go b/device/device.go index d37fe6f..9a9b1b3 100644 --- a/device/device.go +++ b/device/device.go @@ -76,7 +76,7 @@ type Device struct { queue struct { encryption *encryptionQueue - decryption chan *QueueInboundElement + decryption *decryptionQueue handshake chan QueueHandshakeElement } @@ -115,6 +115,24 @@ func newEncryptionQueue() *encryptionQueue { return q } +// A decryptionQueue is similar to an encryptionQueue; see those docs. +type decryptionQueue struct { + c chan *QueueInboundElement + wg sync.WaitGroup +} + +func newDecryptionQueue() *decryptionQueue { + q := &decryptionQueue{ + c: make(chan *QueueInboundElement, QueueInboundSize), + } + q.wg.Add(1) + go func() { + q.wg.Wait() + close(q.c) + }() + return q +} + /* Converts the peer into a "zombie", which remains in the peer map, * but processes no packets and does not exists in the routing table. * @@ -308,7 +326,7 @@ func NewDevice(tunDevice tun.Device, logger *Logger) *Device { device.queue.handshake = make(chan QueueHandshakeElement, QueueHandshakeSize) device.queue.encryption = newEncryptionQueue() - device.queue.decryption = make(chan *QueueInboundElement, QueueInboundSize) + device.queue.decryption = newDecryptionQueue() // prepare signals @@ -369,13 +387,6 @@ func (device *Device) RemoveAllPeers() { func (device *Device) FlushPacketQueues() { for { select { - case elem, ok := <-device.queue.decryption: - if ok { - if !elem.IsDropped() { - elem.Drop() - device.PutMessageBuffer(elem.buffer) - } - } case <-device.queue.handshake: default: return @@ -399,10 +410,11 @@ func (device *Device) Close() { device.isUp.Set(false) - // We kept a reference to the encryption queue, - // in case we started any new peers that might write to it. - // No new peers are coming; we are done with the encryption queue. + // We kept a reference to the encryption and decryption queues, + // in case we started any new peers that might write to them. + // No new peers are coming; we are done with these queues. device.queue.encryption.wg.Done() + device.queue.decryption.wg.Done() close(device.signals.stop) device.state.stopping.Wait() @@ -549,6 +561,7 @@ func (device *Device) BindUpdate() error { // start receiving routines device.net.stopping.Add(2) + device.queue.decryption.wg.Add(2) // each RoutineReceiveIncoming goroutine writes to device.queue.decryption go device.RoutineReceiveIncoming(ipv4.Version, netc.bind) go device.RoutineReceiveIncoming(ipv6.Version, netc.bind) -- cgit v1.2.3-54-g00ecf