aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--device/channels.go8
-rw-r--r--device/receive.go44
-rw-r--r--device/send.go58
3 files changed, 55 insertions, 55 deletions
diff --git a/device/channels.go b/device/channels.go
index 039d8df..40ee5c9 100644
--- a/device/channels.go
+++ b/device/channels.go
@@ -19,13 +19,13 @@ import (
// call wg.Done to remove the initial reference.
// When the refcount hits 0, the queue's channel is closed.
type outboundQueue struct {
- c chan *QueueOutboundElement
+ c chan *[]*QueueOutboundElement
wg sync.WaitGroup
}
func newOutboundQueue() *outboundQueue {
q := &outboundQueue{
- c: make(chan *QueueOutboundElement, QueueOutboundSize),
+ c: make(chan *[]*QueueOutboundElement, QueueOutboundSize),
}
q.wg.Add(1)
go func() {
@@ -37,13 +37,13 @@ func newOutboundQueue() *outboundQueue {
// A inboundQueue is similar to an outboundQueue; see those docs.
type inboundQueue struct {
- c chan *QueueInboundElement
+ c chan *[]*QueueInboundElement
wg sync.WaitGroup
}
func newInboundQueue() *inboundQueue {
q := &inboundQueue{
- c: make(chan *QueueInboundElement, QueueInboundSize),
+ c: make(chan *[]*QueueInboundElement, QueueInboundSize),
}
q.wg.Add(1)
go func() {
diff --git a/device/receive.go b/device/receive.go
index e24d29f..f0f37a1 100644
--- a/device/receive.go
+++ b/device/receive.go
@@ -220,9 +220,7 @@ func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.Receive
for peer, elems := range elemsByPeer {
if peer.isRunning.Load() {
peer.queue.inbound.c <- elems
- for _, elem := range *elems {
- device.queue.decryption.c <- elem
- }
+ device.queue.decryption.c <- elems
} else {
for _, elem := range *elems {
device.PutMessageBuffer(elem.buffer)
@@ -241,26 +239,28 @@ func (device *Device) RoutineDecryption(id int) {
defer device.log.Verbosef("Routine: decryption worker %d - stopped", id)
device.log.Verbosef("Routine: decryption worker %d - started", id)
- for elem := range device.queue.decryption.c {
- // split message into fields
- counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
- content := elem.packet[MessageTransportOffsetContent:]
-
- // decrypt and release to consumer
- var err error
- elem.counter = binary.LittleEndian.Uint64(counter)
- // copy counter to nonce
- binary.LittleEndian.PutUint64(nonce[0x4:0xc], elem.counter)
- elem.packet, err = elem.keypair.receive.Open(
- content[:0],
- nonce[:],
- content,
- nil,
- )
- if err != nil {
- elem.packet = nil
+ for elems := range device.queue.decryption.c {
+ for _, elem := range *elems {
+ // split message into fields
+ counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
+ content := elem.packet[MessageTransportOffsetContent:]
+
+ // decrypt and release to consumer
+ var err error
+ elem.counter = binary.LittleEndian.Uint64(counter)
+ // copy counter to nonce
+ binary.LittleEndian.PutUint64(nonce[0x4:0xc], elem.counter)
+ elem.packet, err = elem.keypair.receive.Open(
+ content[:0],
+ nonce[:],
+ content,
+ nil,
+ )
+ if err != nil {
+ elem.packet = nil
+ }
+ elem.Unlock()
}
- elem.Unlock()
}
}
diff --git a/device/send.go b/device/send.go
index cd8a2a0..e838c4e 100644
--- a/device/send.go
+++ b/device/send.go
@@ -385,9 +385,7 @@ top:
// add to parallel and sequential queue
if peer.isRunning.Load() {
peer.queue.outbound.c <- elems
- for _, elem := range *elems {
- peer.device.queue.encryption.c <- elem
- }
+ peer.device.queue.encryption.c <- elems
} else {
for _, elem := range *elems {
peer.device.PutMessageBuffer(elem.buffer)
@@ -447,32 +445,34 @@ func (device *Device) RoutineEncryption(id int) {
defer device.log.Verbosef("Routine: encryption worker %d - stopped", id)
device.log.Verbosef("Routine: encryption worker %d - started", id)
- for elem := range device.queue.encryption.c {
- // populate header fields
- header := elem.buffer[:MessageTransportHeaderSize]
-
- fieldType := header[0:4]
- fieldReceiver := header[4:8]
- fieldNonce := header[8:16]
-
- binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
- binary.LittleEndian.PutUint32(fieldReceiver, elem.keypair.remoteIndex)
- binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
-
- // pad content to multiple of 16
- paddingSize := calculatePaddingSize(len(elem.packet), int(device.tun.mtu.Load()))
- elem.packet = append(elem.packet, paddingZeros[:paddingSize]...)
-
- // encrypt content and release to consumer
-
- binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
- elem.packet = elem.keypair.send.Seal(
- header,
- nonce[:],
- elem.packet,
- nil,
- )
- elem.Unlock()
+ for elems := range device.queue.encryption.c {
+ for _, elem := range *elems {
+ // populate header fields
+ header := elem.buffer[:MessageTransportHeaderSize]
+
+ fieldType := header[0:4]
+ fieldReceiver := header[4:8]
+ fieldNonce := header[8:16]
+
+ binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
+ binary.LittleEndian.PutUint32(fieldReceiver, elem.keypair.remoteIndex)
+ binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
+
+ // pad content to multiple of 16
+ paddingSize := calculatePaddingSize(len(elem.packet), int(device.tun.mtu.Load()))
+ elem.packet = append(elem.packet, paddingZeros[:paddingSize]...)
+
+ // encrypt content and release to consumer
+
+ binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
+ elem.packet = elem.keypair.send.Seal(
+ header,
+ nonce[:],
+ elem.packet,
+ nil,
+ )
+ elem.Unlock()
+ }
}
}