aboutsummaryrefslogtreecommitdiff
path: root/src/cmd/go/internal/par/work.go
blob: 3f1e69adfef0e839dc76226d7b2c359806719458 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package par implements parallel execution helpers.
package par

import (
	"errors"
	"math/rand"
	"sync"
	"sync/atomic"
)

// Work manages a set of work items to be executed in parallel, at most once each.
// The items in the set must all be valid map keys.
type Work[T comparable] struct {
	f       func(T) // function to run for each item
	running int     // total number of runners

	mu      sync.Mutex
	added   map[T]bool // items added to set
	todo    []T        // items yet to be run
	wait    sync.Cond  // wait when todo is empty
	waiting int        // number of runners waiting for todo
}

func (w *Work[T]) init() {
	if w.added == nil {
		w.added = make(map[T]bool)
	}
}

// Add adds item to the work set, if it hasn't already been added.
func (w *Work[T]) Add(item T) {
	w.mu.Lock()
	w.init()
	if !w.added[item] {
		w.added[item] = true
		w.todo = append(w.todo, item)
		if w.waiting > 0 {
			w.wait.Signal()
		}
	}
	w.mu.Unlock()
}

// Do runs f in parallel on items from the work set,
// with at most n invocations of f running at a time.
// It returns when everything added to the work set has been processed.
// At least one item should have been added to the work set
// before calling Do (or else Do returns immediately),
// but it is allowed for f(item) to add new items to the set.
// Do should only be used once on a given Work.
func (w *Work[T]) Do(n int, f func(item T)) {
	if n < 1 {
		panic("par.Work.Do: n < 1")
	}
	if w.running >= 1 {
		panic("par.Work.Do: already called Do")
	}

	w.running = n
	w.f = f
	w.wait.L = &w.mu

	for i := 0; i < n-1; i++ {
		go w.runner()
	}
	w.runner()
}

// runner executes work in w until both nothing is left to do
// and all the runners are waiting for work.
// (Then all the runners return.)
func (w *Work[T]) runner() {
	for {
		// Wait for something to do.
		w.mu.Lock()
		for len(w.todo) == 0 {
			w.waiting++
			if w.waiting == w.running {
				// All done.
				w.wait.Broadcast()
				w.mu.Unlock()
				return
			}
			w.wait.Wait()
			w.waiting--
		}

		// Pick something to do at random,
		// to eliminate pathological contention
		// in case items added at about the same time
		// are most likely to contend.
		i := rand.Intn(len(w.todo))
		item := w.todo[i]
		w.todo[i] = w.todo[len(w.todo)-1]
		w.todo = w.todo[:len(w.todo)-1]
		w.mu.Unlock()

		w.f(item)
	}
}

// ErrCache is like Cache except that it also stores
// an error value alongside the cached value V.
type ErrCache[K comparable, V any] struct {
	Cache[K, errValue[V]]
}

type errValue[V any] struct {
	v   V
	err error
}

func (c *ErrCache[K, V]) Do(key K, f func() (V, error)) (V, error) {
	v := c.Cache.Do(key, func() errValue[V] {
		v, err := f()
		return errValue[V]{v, err}
	})
	return v.v, v.err
}

var ErrCacheEntryNotFound = errors.New("cache entry not found")

// Get returns the cached result associated with key.
// It returns ErrCacheEntryNotFound if there is no such result.
func (c *ErrCache[K, V]) Get(key K) (V, error) {
	v, ok := c.Cache.Get(key)
	if !ok {
		v.err = ErrCacheEntryNotFound
	}
	return v.v, v.err
}

// Cache runs an action once per key and caches the result.
type Cache[K comparable, V any] struct {
	m sync.Map
}

type cacheEntry[V any] struct {
	done   atomic.Bool
	mu     sync.Mutex
	result V
}

// Do calls the function f if and only if Do is being called for the first time with this key.
// No call to Do with a given key returns until the one call to f returns.
// Do returns the value returned by the one call to f.
func (c *Cache[K, V]) Do(key K, f func() V) V {
	entryIface, ok := c.m.Load(key)
	if !ok {
		entryIface, _ = c.m.LoadOrStore(key, new(cacheEntry[V]))
	}
	e := entryIface.(*cacheEntry[V])
	if !e.done.Load() {
		e.mu.Lock()
		if !e.done.Load() {
			e.result = f()
			e.done.Store(true)
		}
		e.mu.Unlock()
	}
	return e.result
}

// Get returns the cached result associated with key
// and reports whether there is such a result.
//
// If the result for key is being computed, Get does not wait for the computation to finish.
func (c *Cache[K, V]) Get(key K) (V, bool) {
	entryIface, ok := c.m.Load(key)
	if !ok {
		return *new(V), false
	}
	e := entryIface.(*cacheEntry[V])
	if !e.done.Load() {
		return *new(V), false
	}
	return e.result, true
}

// Clear removes all entries in the cache.
//
// Concurrent calls to Get may return old values. Concurrent calls to Do
// may return old values or store results in entries that have been deleted.
//
// TODO(jayconrod): Delete this after the package cache clearing functions
// in internal/load have been removed.
func (c *Cache[K, V]) Clear() {
	c.m.Clear()
}

// Delete removes an entry from the map. It is safe to call Delete for an
// entry that does not exist. Delete will return quickly, even if the result
// for a key is still being computed; the computation will finish, but the
// result won't be accessible through the cache.
//
// TODO(jayconrod): Delete this after the package cache clearing functions
// in internal/load have been removed.
func (c *Cache[K, V]) Delete(key K) {
	c.m.Delete(key)
}

// DeleteIf calls pred for each key in the map. If pred returns true for a key,
// DeleteIf removes the corresponding entry. If the result for a key is
// still being computed, DeleteIf will remove the entry without waiting for
// the computation to finish. The result won't be accessible through the cache.
//
// TODO(jayconrod): Delete this after the package cache clearing functions
// in internal/load have been removed.
func (c *Cache[K, V]) DeleteIf(pred func(key K) bool) {
	c.m.Range(func(key, _ any) bool {
		if key := key.(K); pred(key) {
			c.Delete(key)
		}
		return true
	})
}