aboutsummaryrefslogtreecommitdiff
path: root/device/device.go
diff options
context:
space:
mode:
Diffstat (limited to 'device/device.go')
-rw-r--r--device/device.go39
1 files changed, 32 insertions, 7 deletions
diff --git a/device/device.go b/device/device.go
index 9e2d001..d9367e5 100644
--- a/device/device.go
+++ b/device/device.go
@@ -74,7 +74,7 @@ type Device struct {
}
queue struct {
- encryption chan *QueueOutboundElement
+ encryption *encryptionQueue
decryption chan *QueueInboundElement
handshake chan QueueHandshakeElement
}
@@ -89,6 +89,31 @@ type Device struct {
}
}
+// An encryptionQueue is a channel of QueueOutboundElements awaiting encryption.
+// An encryptionQueue is ref-counted using its wg field.
+// An encryptionQueue created with newEncryptionQueue has one reference.
+// Every additional writer must call wg.Add(1).
+// Every completed writer must call wg.Done().
+// When no further writers will be added,
+// call wg.Done to remove the initial reference.
+// When the refcount hits 0, the queue's channel is closed.
+type encryptionQueue struct {
+ c chan *QueueOutboundElement
+ wg sync.WaitGroup
+}
+
+func newEncryptionQueue() *encryptionQueue {
+ q := &encryptionQueue{
+ c: make(chan *QueueOutboundElement, QueueOutboundSize),
+ }
+ q.wg.Add(1)
+ go func() {
+ q.wg.Wait()
+ close(q.c)
+ }()
+ return q
+}
+
/* Converts the peer into a "zombie", which remains in the peer map,
* but processes no packets and does not exists in the routing table.
*
@@ -280,7 +305,7 @@ func NewDevice(tunDevice tun.Device, logger *Logger) *Device {
// create queues
device.queue.handshake = make(chan QueueHandshakeElement, QueueHandshakeSize)
- device.queue.encryption = make(chan *QueueOutboundElement, QueueOutboundSize)
+ device.queue.encryption = newEncryptionQueue()
device.queue.decryption = make(chan *QueueInboundElement, QueueInboundSize)
// prepare signals
@@ -297,7 +322,7 @@ func NewDevice(tunDevice tun.Device, logger *Logger) *Device {
cpus := runtime.NumCPU()
device.state.stopping.Wait()
for i := 0; i < cpus; i += 1 {
- device.state.stopping.Add(3)
+ device.state.stopping.Add(2) // decryption and handshake
go device.RoutineEncryption()
go device.RoutineDecryption()
go device.RoutineHandshake()
@@ -346,10 +371,6 @@ func (device *Device) FlushPacketQueues() {
if ok {
elem.Drop()
}
- case elem, ok := <-device.queue.encryption:
- if ok {
- elem.Drop()
- }
case <-device.queue.handshake:
default:
return
@@ -373,6 +394,10 @@ func (device *Device) Close() {
device.isUp.Set(false)
+ // We kept a reference to the encryption queue,
+ // in case we started any new peers that might write to it.
+ // No new peers are coming; we are done with the encryption queue.
+ device.queue.encryption.wg.Done()
close(device.signals.stop)
device.state.stopping.Wait()