aboutsummaryrefslogtreecommitdiff
path: root/device/receive.go
diff options
context:
space:
mode:
Diffstat (limited to 'device/receive.go')
-rw-r--r--device/receive.go43
1 files changed, 23 insertions, 20 deletions
diff --git a/device/receive.go b/device/receive.go
index f0f37a1..4b32dc5 100644
--- a/device/receive.go
+++ b/device/receive.go
@@ -27,7 +27,6 @@ type QueueHandshakeElement struct {
}
type QueueInboundElement struct {
- sync.Mutex
buffer *[MaxMessageSize]byte
packet []byte
counter uint64
@@ -35,6 +34,11 @@ type QueueInboundElement struct {
endpoint conn.Endpoint
}
+type QueueInboundElementsContainer struct {
+ sync.Mutex
+ elems []*QueueInboundElement
+}
+
// clearPointers clears elem fields that contain pointers.
// This makes the garbage collector's life easier and
// avoids accidentally keeping other objects around unnecessarily.
@@ -87,7 +91,7 @@ func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.Receive
count int
endpoints = make([]conn.Endpoint, maxBatchSize)
deathSpiral int
- elemsByPeer = make(map[*Peer]*[]*QueueInboundElement, maxBatchSize)
+ elemsByPeer = make(map[*Peer]*QueueInboundElementsContainer, maxBatchSize)
)
for i := range bufsArrs {
@@ -170,15 +174,14 @@ func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.Receive
elem.keypair = keypair
elem.endpoint = endpoints[i]
elem.counter = 0
- elem.Mutex = sync.Mutex{}
- elem.Lock()
elemsForPeer, ok := elemsByPeer[peer]
if !ok {
- elemsForPeer = device.GetInboundElementsSlice()
+ elemsForPeer = device.GetInboundElementsContainer()
+ elemsForPeer.Lock()
elemsByPeer[peer] = elemsForPeer
}
- *elemsForPeer = append(*elemsForPeer, elem)
+ elemsForPeer.elems = append(elemsForPeer.elems, elem)
bufsArrs[i] = device.GetMessageBuffer()
bufs[i] = bufsArrs[i][:]
continue
@@ -217,16 +220,16 @@ func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.Receive
default:
}
}
- for peer, elems := range elemsByPeer {
+ for peer, elemsContainer := range elemsByPeer {
if peer.isRunning.Load() {
- peer.queue.inbound.c <- elems
- device.queue.decryption.c <- elems
+ peer.queue.inbound.c <- elemsContainer
+ device.queue.decryption.c <- elemsContainer
} else {
- for _, elem := range *elems {
+ for _, elem := range elemsContainer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutInboundElement(elem)
}
- device.PutInboundElementsSlice(elems)
+ device.PutInboundElementsContainer(elemsContainer)
}
delete(elemsByPeer, peer)
}
@@ -239,8 +242,8 @@ func (device *Device) RoutineDecryption(id int) {
defer device.log.Verbosef("Routine: decryption worker %d - stopped", id)
device.log.Verbosef("Routine: decryption worker %d - started", id)
- for elems := range device.queue.decryption.c {
- for _, elem := range *elems {
+ for elemsContainer := range device.queue.decryption.c {
+ for _, elem := range elemsContainer.elems {
// split message into fields
counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
content := elem.packet[MessageTransportOffsetContent:]
@@ -259,8 +262,8 @@ func (device *Device) RoutineDecryption(id int) {
if err != nil {
elem.packet = nil
}
- elem.Unlock()
}
+ elemsContainer.Unlock()
}
}
@@ -437,12 +440,12 @@ func (peer *Peer) RoutineSequentialReceiver(maxBatchSize int) {
bufs := make([][]byte, 0, maxBatchSize)
- for elems := range peer.queue.inbound.c {
- if elems == nil {
+ for elemsContainer := range peer.queue.inbound.c {
+ if elemsContainer == nil {
return
}
- for _, elem := range *elems {
- elem.Lock()
+ elemsContainer.Lock()
+ for _, elem := range elemsContainer.elems {
if elem.packet == nil {
// decryption failed
continue
@@ -515,11 +518,11 @@ func (peer *Peer) RoutineSequentialReceiver(maxBatchSize int) {
device.log.Errorf("Failed to write packets to TUN device: %v", err)
}
}
- for _, elem := range *elems {
+ for _, elem := range elemsContainer.elems {
device.PutMessageBuffer(elem.buffer)
device.PutInboundElement(elem)
}
bufs = bufs[:0]
- device.PutInboundElementsSlice(elems)
+ device.PutInboundElementsContainer(elemsContainer)
}
}