aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Matloob <matloob@golang.org>2020-06-25 19:11:28 -0400
committerMichael Matloob <matloob@golang.org>2020-08-19 20:09:14 +0000
commit18239be10a0b5caa1b3222b228ff590b1c036382 (patch)
tree68268b3b479581073af1a9f6c49154cb37fefd52
parent64350f1eabeb688e997c6cd0c103e21c02ab2a46 (diff)
downloadgo-18239be10a0b5caa1b3222b228ff590b1c036382.tar.gz
go-18239be10a0b5caa1b3222b228ff590b1c036382.zip
cmd/go/internal: remove some users of par.Work
par.Work is used in a number of places as a parallel work queue. This change replaces it with goroutines and channels in a number of simpler places where it's used. This is the same CL as golang.org/cl/240062 and golang.org/cl/248326 except for the following changes in convert.go (all line numbers from this CL), as well as fixing up imports in download.go: - On line 44, the "*" before modules.Versions is removed (we were trying to assign to a nil value on lines 72 and 73). - Line 64 is new, and ensures that we receive on the semaphore channel once the goroutine function exits. (The previous versions of this CL only received at the end of the function, ignoring the return point in the branch in the middle of the function.) - The semaphore channel receive right before line 74 is gone, replaced with the deferred receive above. - The if block at line 83 is new, accounting for cases where modfetch.ImportRepoRev returned an error in the goroutine, so that versions[i] is ignored. Change-Id: I0e33670bb2eb0a1e4d7a5fa693a471e61ffbc8b7 Reviewed-on: https://go-review.googlesource.com/c/go/+/249020 Run-TryBot: Michael Matloob <matloob@golang.org> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Jay Conrod <jayconrod@google.com>
-rw-r--r--src/cmd/go/internal/modcmd/download.go69
-rw-r--r--src/cmd/go/internal/modcmd/graph.go19
-rw-r--r--src/cmd/go/internal/modconv/convert.go59
-rw-r--r--src/cmd/go/internal/modget/get.go41
-rw-r--r--src/cmd/go/internal/modload/list.go37
5 files changed, 134 insertions, 91 deletions
diff --git a/src/cmd/go/internal/modcmd/download.go b/src/cmd/go/internal/modcmd/download.go
index 946e8ed3cf..2f42d4306b 100644
--- a/src/cmd/go/internal/modcmd/download.go
+++ b/src/cmd/go/internal/modcmd/download.go
@@ -8,12 +8,12 @@ import (
"context"
"encoding/json"
"os"
+ "runtime"
"cmd/go/internal/base"
"cmd/go/internal/cfg"
- "cmd/go/internal/modfetch"
"cmd/go/internal/modload"
- "cmd/go/internal/par"
+ "cmd/go/internal/modfetch"
"cmd/go/internal/work"
"golang.org/x/mod/module"
@@ -102,33 +102,7 @@ func runDownload(ctx context.Context, cmd *base.Command, args []string) {
}
}
- var mods []*moduleJSON
- var work par.Work
- listU := false
- listVersions := false
- for _, info := range modload.ListModules(ctx, args, listU, listVersions) {
- if info.Replace != nil {
- info = info.Replace
- }
- if info.Version == "" && info.Error == nil {
- // main module or module replaced with file path.
- // Nothing to download.
- continue
- }
- m := &moduleJSON{
- Path: info.Path,
- Version: info.Version,
- }
- mods = append(mods, m)
- if info.Error != nil {
- m.Error = info.Error.Err
- continue
- }
- work.Add(m)
- }
-
- work.Do(10, func(item interface{}) {
- m := item.(*moduleJSON)
+ downloadModule := func(m *moduleJSON) {
var err error
m.Info, err = modfetch.InfoFile(m.Path, m.Version)
if err != nil {
@@ -157,7 +131,42 @@ func runDownload(ctx context.Context, cmd *base.Command, args []string) {
m.Error = err.Error()
return
}
- })
+ }
+
+ var mods []*moduleJSON
+ listU := false
+ listVersions := false
+ type token struct{}
+ sem := make(chan token, runtime.GOMAXPROCS(0))
+ for _, info := range modload.ListModules(ctx, args, listU, listVersions) {
+ if info.Replace != nil {
+ info = info.Replace
+ }
+ if info.Version == "" && info.Error == nil {
+ // main module or module replaced with file path.
+ // Nothing to download.
+ continue
+ }
+ m := &moduleJSON{
+ Path: info.Path,
+ Version: info.Version,
+ }
+ mods = append(mods, m)
+ if info.Error != nil {
+ m.Error = info.Error.Err
+ continue
+ }
+ sem <- token{}
+ go func() {
+ downloadModule(m)
+ <-sem
+ }()
+ }
+
+ // Fill semaphore channel to wait for goroutines to finish.
+ for n := cap(sem); n > 0; n-- {
+ sem <- token{}
+ }
if *downloadJSON {
for _, m := range mods {
diff --git a/src/cmd/go/internal/modcmd/graph.go b/src/cmd/go/internal/modcmd/graph.go
index 4853503fd4..6da12b9cab 100644
--- a/src/cmd/go/internal/modcmd/graph.go
+++ b/src/cmd/go/internal/modcmd/graph.go
@@ -15,7 +15,6 @@ import (
"cmd/go/internal/base"
"cmd/go/internal/cfg"
"cmd/go/internal/modload"
- "cmd/go/internal/par"
"cmd/go/internal/work"
"golang.org/x/mod/module"
@@ -59,23 +58,25 @@ func runGraph(ctx context.Context, cmd *base.Command, args []string) {
return m.Path + "@" + m.Version
}
- // Note: using par.Work only to manage work queue.
- // No parallelism here, so no locking.
var out []string
var deps int // index in out where deps start
- var work par.Work
- work.Add(modload.Target)
- work.Do(1, func(item interface{}) {
- m := item.(module.Version)
+ seen := map[module.Version]bool{modload.Target: true}
+ queue := []module.Version{modload.Target}
+ for len(queue) > 0 {
+ var m module.Version
+ m, queue = queue[0], queue[1:]
list, _ := reqs.Required(m)
for _, r := range list {
- work.Add(r)
+ if !seen[r] {
+ queue = append(queue, r)
+ seen[r] = true
+ }
out = append(out, format(m)+" "+format(r)+"\n")
}
if m == modload.Target {
deps = len(out)
}
- })
+ }
sort.Slice(out[deps:], func(i, j int) bool {
return out[deps+i][0] < out[deps+j][0]
diff --git a/src/cmd/go/internal/modconv/convert.go b/src/cmd/go/internal/modconv/convert.go
index f465a9f395..5d4165c944 100644
--- a/src/cmd/go/internal/modconv/convert.go
+++ b/src/cmd/go/internal/modconv/convert.go
@@ -7,13 +7,12 @@ package modconv
import (
"fmt"
"os"
+ "runtime"
"sort"
"strings"
- "sync"
"cmd/go/internal/base"
"cmd/go/internal/modfetch"
- "cmd/go/internal/par"
"golang.org/x/mod/modfile"
"golang.org/x/mod/module"
@@ -42,46 +41,54 @@ func ConvertLegacyConfig(f *modfile.File, file string, data []byte) error {
// Convert requirements block, which may use raw SHA1 hashes as versions,
// to valid semver requirement list, respecting major versions.
- var (
- work par.Work
- mu sync.Mutex
- need = make(map[string]string)
- replace = make(map[string]*modfile.Replace)
- )
+ versions := make([]module.Version, len(mf.Require))
+ replace := make(map[string]*modfile.Replace)
for _, r := range mf.Replace {
replace[r.New.Path] = r
replace[r.Old.Path] = r
}
- for _, r := range mf.Require {
+
+ type token struct{}
+ sem := make(chan token, runtime.GOMAXPROCS(0))
+ for i, r := range mf.Require {
m := r.Mod
if m.Path == "" {
continue
}
if re, ok := replace[m.Path]; ok {
- work.Add(re.New)
- continue
+ m = re.New
}
- work.Add(r.Mod)
+ sem <- token{}
+ go func(i int, m module.Version) {
+ defer func() { <-sem }()
+ repo, info, err := modfetch.ImportRepoRev(m.Path, m.Version)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "go: converting %s: stat %s@%s: %v\n", base.ShortPath(file), m.Path, m.Version, err)
+ return
+ }
+
+ path := repo.ModulePath()
+ versions[i].Path = path
+ versions[i].Version = info.Version
+ }(i, m)
+ }
+ // Fill semaphore channel to wait for all tasks to finish.
+ for n := cap(sem); n > 0; n-- {
+ sem <- token{}
}
- work.Do(10, func(item interface{}) {
- r := item.(module.Version)
- repo, info, err := modfetch.ImportRepoRev(r.Path, r.Version)
- if err != nil {
- fmt.Fprintf(os.Stderr, "go: converting %s: stat %s@%s: %v\n", base.ShortPath(file), r.Path, r.Version, err)
- return
+ need := map[string]string{}
+ for _, v := range versions {
+ if v.Path == "" {
+ continue
}
- mu.Lock()
- path := repo.ModulePath()
// Don't use semver.Max here; need to preserve +incompatible suffix.
- if v, ok := need[path]; !ok || semver.Compare(v, info.Version) < 0 {
- need[path] = info.Version
+ if needv, ok := need[v.Path]; !ok || semver.Compare(needv, v.Version) < 0 {
+ need[v.Path] = v.Version
}
- mu.Unlock()
- })
-
- var paths []string
+ }
+ paths := make([]string, 0, len(need))
for path := range need {
paths = append(paths, path)
}
diff --git a/src/cmd/go/internal/modget/get.go b/src/cmd/go/internal/modget/get.go
index 93a6bb54d5..d02c9a8da5 100644
--- a/src/cmd/go/internal/modget/get.go
+++ b/src/cmd/go/internal/modget/get.go
@@ -11,6 +11,7 @@ import (
"fmt"
"os"
"path/filepath"
+ "runtime"
"sort"
"strings"
"sync"
@@ -21,7 +22,6 @@ import (
"cmd/go/internal/load"
"cmd/go/internal/modload"
"cmd/go/internal/mvs"
- "cmd/go/internal/par"
"cmd/go/internal/search"
"cmd/go/internal/work"
@@ -725,18 +725,8 @@ func runGet(ctx context.Context, cmd *base.Command, args []string) {
// reported. A map from module paths to queries is returned, which includes
// queries and modOnly.
func runQueries(ctx context.Context, cache map[querySpec]*query, queries []*query, modOnly map[string]*query) map[string]*query {
- var lookup par.Work
- for _, q := range queries {
- if cached := cache[q.querySpec]; cached != nil {
- *q = *cached
- } else {
- cache[q.querySpec] = q
- lookup.Add(q)
- }
- }
- lookup.Do(10, func(item interface{}) {
- q := item.(*query)
+ runQuery := func(q *query) {
if q.vers == "none" {
// Wait for downgrade step.
q.m = module.Version{Path: q.path, Version: "none"}
@@ -747,7 +737,32 @@ func runQueries(ctx context.Context, cache map[querySpec]*query, queries []*quer
base.Errorf("go get %s: %v", q.arg, err)
}
q.m = m
- })
+ }
+
+ type token struct{}
+ sem := make(chan token, runtime.GOMAXPROCS(0))
+ for _, q := range queries {
+ if cached := cache[q.querySpec]; cached != nil {
+ *q = *cached
+ } else {
+ sem <- token{}
+ go func(q *query) {
+ runQuery(q)
+ <-sem
+ }(q)
+ }
+ }
+
+ // Fill semaphore channel to wait for goroutines to finish.
+ for n := cap(sem); n > 0; n-- {
+ sem <- token{}
+ }
+
+ // Add to cache after concurrent section to avoid races...
+ for _, q := range queries {
+ cache[q.querySpec] = q
+ }
+
base.ExitIfErrors()
byPath := make(map[string]*query)
diff --git a/src/cmd/go/internal/modload/list.go b/src/cmd/go/internal/modload/list.go
index 4768516e90..8db4d64706 100644
--- a/src/cmd/go/internal/modload/list.go
+++ b/src/cmd/go/internal/modload/list.go
@@ -9,12 +9,12 @@ import (
"errors"
"fmt"
"os"
+ "runtime"
"strings"
"cmd/go/internal/base"
"cmd/go/internal/cfg"
"cmd/go/internal/modinfo"
- "cmd/go/internal/par"
"cmd/go/internal/search"
"golang.org/x/mod/module"
@@ -22,24 +22,35 @@ import (
func ListModules(ctx context.Context, args []string, listU, listVersions bool) []*modinfo.ModulePublic {
mods := listModules(ctx, args, listVersions)
+
+ type token struct{}
+ sem := make(chan token, runtime.GOMAXPROCS(0))
if listU || listVersions {
- var work par.Work
for _, m := range mods {
- work.Add(m)
+ add := func(m *modinfo.ModulePublic) {
+ sem <- token{}
+ go func() {
+ if listU {
+ addUpdate(m)
+ }
+ if listVersions {
+ addVersions(m)
+ }
+ <-sem
+ }()
+ }
+
+ add(m)
if m.Replace != nil {
- work.Add(m.Replace)
+ add(m.Replace)
}
}
- work.Do(10, func(item interface{}) {
- m := item.(*modinfo.ModulePublic)
- if listU {
- addUpdate(m)
- }
- if listVersions {
- addVersions(m)
- }
- })
}
+ // Fill semaphore channel to wait for all tasks to finish.
+ for n := cap(sem); n > 0; n-- {
+ sem <- token{}
+ }
+
return mods
}