aboutsummaryrefslogtreecommitdiff
path: root/src/cmd/go/internal/par/queue.go
blob: 180bc75e3430041c77c52466ec021cfb1d6c12b1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package par

import "fmt"

// Queue manages a set of work items to be executed in parallel. The number of
// active work items is limited, and excess items are queued sequentially.
type Queue struct {
	maxActive int
	st        chan queueState
}

type queueState struct {
	active  int // number of goroutines processing work; always nonzero when len(backlog) > 0
	backlog []func()
	idle    chan struct{} // if non-nil, closed when active becomes 0
}

// NewQueue returns a Queue that executes up to maxActive items in parallel.
//
// maxActive must be positive.
func NewQueue(maxActive int) *Queue {
	if maxActive < 1 {
		panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive))
	}

	q := &Queue{
		maxActive: maxActive,
		st:        make(chan queueState, 1),
	}
	q.st <- queueState{}
	return q
}

// Add adds f as a work item in the queue.
//
// Add returns immediately, but the queue will be marked as non-idle until after
// f (and any subsequently-added work) has completed.
func (q *Queue) Add(f func()) {
	st := <-q.st
	if st.active == q.maxActive {
		st.backlog = append(st.backlog, f)
		q.st <- st
		return
	}
	if st.active == 0 {
		// Mark q as non-idle.
		st.idle = nil
	}
	st.active++
	q.st <- st

	go func() {
		for {
			f()

			st := <-q.st
			if len(st.backlog) == 0 {
				if st.active--; st.active == 0 && st.idle != nil {
					close(st.idle)
				}
				q.st <- st
				return
			}
			f, st.backlog = st.backlog[0], st.backlog[1:]
			q.st <- st
		}
	}()
}

// Idle returns a channel that will be closed when q has no (active or enqueued)
// work outstanding.
func (q *Queue) Idle() <-chan struct{} {
	st := <-q.st
	defer func() { q.st <- st }()

	if st.idle == nil {
		st.idle = make(chan struct{})
		if st.active == 0 {
			close(st.idle)
		}
	}

	return st.idle
}