aboutsummaryrefslogtreecommitdiff
path: root/device/channels.go
diff options
context:
space:
mode:
Diffstat (limited to 'device/channels.go')
-rw-r--r--device/channels.go52
1 files changed, 28 insertions, 24 deletions
diff --git a/device/channels.go b/device/channels.go
index bf78868..6f4370a 100644
--- a/device/channels.go
+++ b/device/channels.go
@@ -83,21 +83,23 @@ func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
q := &autodrainingInboundQueue{
c: make(chan *QueueInboundElement, QueueInboundSize),
}
- runtime.SetFinalizer(q, func(q *autodrainingInboundQueue) {
- for {
- select {
- case elem := <-q.c:
- elem.Lock()
- device.PutMessageBuffer(elem.buffer)
- device.PutInboundElement(elem)
- default:
- return
- }
- }
- })
+ runtime.SetFinalizer(q, device.flushInboundQueue)
return q
}
+func (device *Device) flushInboundQueue(q *autodrainingInboundQueue) {
+ for {
+ select {
+ case elem := <-q.c:
+ elem.Lock()
+ device.PutMessageBuffer(elem.buffer)
+ device.PutInboundElement(elem)
+ default:
+ return
+ }
+ }
+}
+
type autodrainingOutboundQueue struct {
c chan *QueueOutboundElement
}
@@ -111,17 +113,19 @@ func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
q := &autodrainingOutboundQueue{
c: make(chan *QueueOutboundElement, QueueOutboundSize),
}
- runtime.SetFinalizer(q, func(q *autodrainingOutboundQueue) {
- for {
- select {
- case elem := <-q.c:
- elem.Lock()
- device.PutMessageBuffer(elem.buffer)
- device.PutOutboundElement(elem)
- default:
- return
- }
- }
- })
+ runtime.SetFinalizer(q, device.flushOutboundQueue)
return q
}
+
+func (device *Device) flushOutboundQueue(q *autodrainingOutboundQueue) {
+ for {
+ select {
+ case elem := <-q.c:
+ elem.Lock()
+ device.PutMessageBuffer(elem.buffer)
+ device.PutOutboundElement(elem)
+ default:
+ return
+ }
+ }
+}