diff options
author | Andy Pan <panjf2000@gmail.com> | 2020-11-19 19:09:14 +0800 |
---|---|---|
committer | Ian Lance Taylor <iant@golang.org> | 2021-03-10 03:52:48 +0000 |
commit | 643d240a11b2d00e1718b02719707af0708e7519 (patch) | |
tree | 5883c80c4f42f17ea1c924e62f3dd00c88449422 /src/internal | |
parent | d33e2192a71c33a604af247161ba1d2c1969e4c7 (diff) | |
download | go-643d240a11b2d00e1718b02719707af0708e7519.tar.gz go-643d240a11b2d00e1718b02719707af0708e7519.zip |
internal/poll: implement a pipe pool for splice() call
In scenarios where splice() is called, splice() is usually called not just once, but many times,
which means that a lot of pipes will be created and destroyed frequently, costing an amount of system resources
and slowing down performance, thus I suggest that we add a pipe pool for reusing pipes.
Benchmark tests:
goos: linux
goarch: amd64
pkg: internal/poll
cpu: AMD EPYC 7K62 48-Core Processor
name old time/op new time/op delta
SplicePipe-8 1.36µs ± 1% 0.02µs ± 0% -98.57% (p=0.001 n=7+7)
SplicePipeParallel-8 747ns ± 4% 4ns ± 0% -99.41% (p=0.001 n=7+7)
name old alloc/op new alloc/op delta
SplicePipe-8 24.0B ± 0% 0.0B -100.00% (p=0.001 n=7+7)
SplicePipeParallel-8 24.0B ± 0% 0.0B -100.00% (p=0.001 n=7+7)
name old allocs/op new allocs/op delta
SplicePipe-8 1.00 ± 0% 0.00 -100.00% (p=0.001 n=7+7)
SplicePipeParallel-8 1.00 ± 0% 0.00 -100.00% (p=0.001 n=7+7)
Fixes #42740
Change-Id: Idff654b7264342084e089b5ba796c87c380c471b
Reviewed-on: https://go-review.googlesource.com/c/go/+/271537
Reviewed-by: Ian Lance Taylor <iant@golang.org>
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Trust: Brad Fitzpatrick <bradfitz@golang.org>
Diffstat (limited to 'src/internal')
-rw-r--r-- | src/internal/poll/export_linux_test.go | 22 | ||||
-rw-r--r-- | src/internal/poll/splice_linux.go | 87 | ||||
-rw-r--r-- | src/internal/poll/splice_linux_test.go | 96 |
3 files changed, 185 insertions, 20 deletions
diff --git a/src/internal/poll/export_linux_test.go b/src/internal/poll/export_linux_test.go new file mode 100644 index 0000000000..7fba793697 --- /dev/null +++ b/src/internal/poll/export_linux_test.go @@ -0,0 +1,22 @@ +// Copyright 2021 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Export guts for testing on linux. +// Since testing imports os and os imports internal/poll, +// the internal/poll tests can not be in package poll. + +package poll + +var ( + GetPipe = getPipe + PutPipe = putPipe + NewPipe = newPipe + DestroyPipe = destroyPipe +) + +func GetPipeFds(p *SplicePipe) (int, int) { + return p.rfd, p.wfd +} + +type SplicePipe = splicePipe diff --git a/src/internal/poll/splice_linux.go b/src/internal/poll/splice_linux.go index 968bc44a5f..971f754f43 100644 --- a/src/internal/poll/splice_linux.go +++ b/src/internal/poll/splice_linux.go @@ -6,6 +6,8 @@ package poll import ( "internal/syscall/unix" + "runtime" + "sync" "sync/atomic" "syscall" "unsafe" @@ -23,23 +25,23 @@ const ( // Splice transfers at most remain bytes of data from src to dst, using the // splice system call to minimize copies of data from and to userspace. // -// Splice creates a temporary pipe, to serve as a buffer for the data transfer. +// Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer. // src and dst must both be stream-oriented sockets. // // If err != nil, sc is the system call which caused the error. func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) { - prfd, pwfd, sc, err := newTempPipe() + p, sc, err := getPipe() if err != nil { return 0, false, sc, err } - defer destroyTempPipe(prfd, pwfd) + defer putPipe(p) var inPipe, n int for err == nil && remain > 0 { max := maxSpliceSize if int64(max) > remain { max = int(remain) } - inPipe, err = spliceDrain(pwfd, src, max) + inPipe, err = spliceDrain(p.wfd, src, max) // The operation is considered handled if splice returns no // error, or an error other than EINVAL. An EINVAL means the // kernel does not support splice for the socket type of src. @@ -55,10 +57,13 @@ func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, if err != nil || inPipe == 0 { break } - n, err = splicePump(dst, prfd, inPipe) + p.data += inPipe + + n, err = splicePump(dst, p.rfd, inPipe) if n > 0 { written += int64(n) remain -= int64(n) + p.data -= n } } if err != nil { @@ -149,13 +154,57 @@ func splice(out int, in int, max int, flags int) (int, error) { return int(n), err } +type splicePipe struct { + rfd int + wfd int + data int +} + +// splicePipePool caches pipes to avoid high frequency construction and destruction of pipe buffers. +// The garbage collector will free all pipes in the sync.Pool in periodically, thus we need to set up +// a finalizer for each pipe to close the its file descriptors before the actual GC. +var splicePipePool = sync.Pool{New: newPoolPipe} + +func newPoolPipe() interface{} { + // Discard the error which occurred during the creation of pipe buffer, + // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback. + p := newPipe() + if p != nil { + runtime.SetFinalizer(p, destroyPipe) + } + return p +} + +// getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from cache. +// +// Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error +// and system call name splice in string as the indication. +func getPipe() (*splicePipe, string, error) { + v := splicePipePool.Get() + if v == nil { + return nil, "splice", syscall.EINVAL + } + return v.(*splicePipe), "", nil +} + +func putPipe(p *splicePipe) { + // If there is still data left in the pipe, + // then close and discard it instead of putting it back into the pool. + if p.data != 0 { + runtime.SetFinalizer(p, nil) + destroyPipe(p) + return + } + splicePipePool.Put(p) +} + var disableSplice unsafe.Pointer -// newTempPipe sets up a temporary pipe for a splice operation. -func newTempPipe() (prfd, pwfd int, sc string, err error) { +// newPipe sets up a pipe for a splice operation. +func newPipe() (sp *splicePipe) { p := (*bool)(atomic.LoadPointer(&disableSplice)) if p != nil && *p { - return -1, -1, "splice", syscall.EINVAL + return nil } var fds [2]int @@ -165,9 +214,11 @@ func newTempPipe() (prfd, pwfd int, sc string, err error) { // closed. const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK if err := syscall.Pipe2(fds[:], flags); err != nil { - return -1, -1, "pipe2", err + return nil } + sp = &splicePipe{rfd: fds[0], wfd: fds[1]} + if p == nil { p = new(bool) defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p)) @@ -175,20 +226,16 @@ func newTempPipe() (prfd, pwfd int, sc string, err error) { // F_GETPIPE_SZ was added in 2.6.35, which does not have the -EAGAIN bug. if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 { *p = true - destroyTempPipe(fds[0], fds[1]) - return -1, -1, "fcntl", errno + destroyPipe(sp) + return nil } } - return fds[0], fds[1], "", nil + return } -// destroyTempPipe destroys a temporary pipe. -func destroyTempPipe(prfd, pwfd int) error { - err := CloseFunc(prfd) - err1 := CloseFunc(pwfd) - if err == nil { - return err1 - } - return err +// destroyPipe destroys a pipe. +func destroyPipe(p *splicePipe) { + CloseFunc(p.rfd) + CloseFunc(p.wfd) } diff --git a/src/internal/poll/splice_linux_test.go b/src/internal/poll/splice_linux_test.go new file mode 100644 index 0000000000..9ea5197242 --- /dev/null +++ b/src/internal/poll/splice_linux_test.go @@ -0,0 +1,96 @@ +// Copyright 2021 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package poll_test + +import ( + "internal/poll" + "runtime" + "syscall" + "testing" + "time" +) + +// checkPipes returns true if all pipes are closed properly, false otherwise. +func checkPipes(fds []int) bool { + for _, fd := range fds { + // Check if each pipe fd has been closed. + err := syscall.FcntlFlock(uintptr(fd), syscall.F_GETFD, nil) + if err == nil { + return false + } + } + return true +} + +func TestSplicePipePool(t *testing.T) { + const N = 64 + var ( + p *poll.SplicePipe + ps []*poll.SplicePipe + fds []int + err error + ) + for i := 0; i < N; i++ { + p, _, err = poll.GetPipe() + if err != nil { + t.Skip("failed to create pipe, skip this test") + } + prfd, pwfd := poll.GetPipeFds(p) + fds = append(fds, prfd, pwfd) + ps = append(ps, p) + } + for _, p = range ps { + poll.PutPipe(p) + } + ps = nil + + var ok bool + // Trigger garbage collection to free the pipes in sync.Pool and check whether or not + // those pipe buffers have been closed as we expected. + for i := 0; i < 5; i++ { + runtime.GC() + time.Sleep(time.Duration(i*100+10) * time.Millisecond) + if ok = checkPipes(fds); ok { + break + } + } + + if !ok { + t.Fatal("at least one pipe is still open") + } +} + +func BenchmarkSplicePipe(b *testing.B) { + b.Run("SplicePipeWithPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + p, _, _ := poll.GetPipe() + poll.PutPipe(p) + } + }) + b.Run("SplicePipeWithoutPool", func(b *testing.B) { + for i := 0; i < b.N; i++ { + p := poll.NewPipe() + poll.DestroyPipe(p) + } + }) +} + +func BenchmarkSplicePipePoolParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p, _, _ := poll.GetPipe() + poll.PutPipe(p) + } + }) +} + +func BenchmarkSplicePipeNativeParallel(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + p := poll.NewPipe() + poll.DestroyPipe(p) + } + }) +} |