aboutsummaryrefslogtreecommitdiff
path: root/src/internal
diff options
context:
space:
mode:
authorAndy Pan <panjf2000@gmail.com>2020-11-19 19:09:14 +0800
committerIan Lance Taylor <iant@golang.org>2021-03-10 03:52:48 +0000
commit643d240a11b2d00e1718b02719707af0708e7519 (patch)
tree5883c80c4f42f17ea1c924e62f3dd00c88449422 /src/internal
parentd33e2192a71c33a604af247161ba1d2c1969e4c7 (diff)
downloadgo-643d240a11b2d00e1718b02719707af0708e7519.tar.gz
go-643d240a11b2d00e1718b02719707af0708e7519.zip
internal/poll: implement a pipe pool for splice() call
In scenarios where splice() is called, splice() is usually called not just once, but many times, which means that a lot of pipes will be created and destroyed frequently, costing an amount of system resources and slowing down performance, thus I suggest that we add a pipe pool for reusing pipes. Benchmark tests: goos: linux goarch: amd64 pkg: internal/poll cpu: AMD EPYC 7K62 48-Core Processor name old time/op new time/op delta SplicePipe-8 1.36µs ± 1% 0.02µs ± 0% -98.57% (p=0.001 n=7+7) SplicePipeParallel-8 747ns ± 4% 4ns ± 0% -99.41% (p=0.001 n=7+7) name old alloc/op new alloc/op delta SplicePipe-8 24.0B ± 0% 0.0B -100.00% (p=0.001 n=7+7) SplicePipeParallel-8 24.0B ± 0% 0.0B -100.00% (p=0.001 n=7+7) name old allocs/op new allocs/op delta SplicePipe-8 1.00 ± 0% 0.00 -100.00% (p=0.001 n=7+7) SplicePipeParallel-8 1.00 ± 0% 0.00 -100.00% (p=0.001 n=7+7) Fixes #42740 Change-Id: Idff654b7264342084e089b5ba796c87c380c471b Reviewed-on: https://go-review.googlesource.com/c/go/+/271537 Reviewed-by: Ian Lance Taylor <iant@golang.org> Run-TryBot: Ian Lance Taylor <iant@golang.org> TryBot-Result: Go Bot <gobot@golang.org> Trust: Brad Fitzpatrick <bradfitz@golang.org>
Diffstat (limited to 'src/internal')
-rw-r--r--src/internal/poll/export_linux_test.go22
-rw-r--r--src/internal/poll/splice_linux.go87
-rw-r--r--src/internal/poll/splice_linux_test.go96
3 files changed, 185 insertions, 20 deletions
diff --git a/src/internal/poll/export_linux_test.go b/src/internal/poll/export_linux_test.go
new file mode 100644
index 0000000000..7fba793697
--- /dev/null
+++ b/src/internal/poll/export_linux_test.go
@@ -0,0 +1,22 @@
+// Copyright 2021 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Export guts for testing on linux.
+// Since testing imports os and os imports internal/poll,
+// the internal/poll tests can not be in package poll.
+
+package poll
+
+var (
+ GetPipe = getPipe
+ PutPipe = putPipe
+ NewPipe = newPipe
+ DestroyPipe = destroyPipe
+)
+
+func GetPipeFds(p *SplicePipe) (int, int) {
+ return p.rfd, p.wfd
+}
+
+type SplicePipe = splicePipe
diff --git a/src/internal/poll/splice_linux.go b/src/internal/poll/splice_linux.go
index 968bc44a5f..971f754f43 100644
--- a/src/internal/poll/splice_linux.go
+++ b/src/internal/poll/splice_linux.go
@@ -6,6 +6,8 @@ package poll
import (
"internal/syscall/unix"
+ "runtime"
+ "sync"
"sync/atomic"
"syscall"
"unsafe"
@@ -23,23 +25,23 @@ const (
// Splice transfers at most remain bytes of data from src to dst, using the
// splice system call to minimize copies of data from and to userspace.
//
-// Splice creates a temporary pipe, to serve as a buffer for the data transfer.
+// Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer.
// src and dst must both be stream-oriented sockets.
//
// If err != nil, sc is the system call which caused the error.
func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) {
- prfd, pwfd, sc, err := newTempPipe()
+ p, sc, err := getPipe()
if err != nil {
return 0, false, sc, err
}
- defer destroyTempPipe(prfd, pwfd)
+ defer putPipe(p)
var inPipe, n int
for err == nil && remain > 0 {
max := maxSpliceSize
if int64(max) > remain {
max = int(remain)
}
- inPipe, err = spliceDrain(pwfd, src, max)
+ inPipe, err = spliceDrain(p.wfd, src, max)
// The operation is considered handled if splice returns no
// error, or an error other than EINVAL. An EINVAL means the
// kernel does not support splice for the socket type of src.
@@ -55,10 +57,13 @@ func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string,
if err != nil || inPipe == 0 {
break
}
- n, err = splicePump(dst, prfd, inPipe)
+ p.data += inPipe
+
+ n, err = splicePump(dst, p.rfd, inPipe)
if n > 0 {
written += int64(n)
remain -= int64(n)
+ p.data -= n
}
}
if err != nil {
@@ -149,13 +154,57 @@ func splice(out int, in int, max int, flags int) (int, error) {
return int(n), err
}
+type splicePipe struct {
+ rfd int
+ wfd int
+ data int
+}
+
+// splicePipePool caches pipes to avoid high frequency construction and destruction of pipe buffers.
+// The garbage collector will free all pipes in the sync.Pool in periodically, thus we need to set up
+// a finalizer for each pipe to close the its file descriptors before the actual GC.
+var splicePipePool = sync.Pool{New: newPoolPipe}
+
+func newPoolPipe() interface{} {
+ // Discard the error which occurred during the creation of pipe buffer,
+ // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
+ p := newPipe()
+ if p != nil {
+ runtime.SetFinalizer(p, destroyPipe)
+ }
+ return p
+}
+
+// getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from cache.
+//
+// Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error
+// and system call name splice in string as the indication.
+func getPipe() (*splicePipe, string, error) {
+ v := splicePipePool.Get()
+ if v == nil {
+ return nil, "splice", syscall.EINVAL
+ }
+ return v.(*splicePipe), "", nil
+}
+
+func putPipe(p *splicePipe) {
+ // If there is still data left in the pipe,
+ // then close and discard it instead of putting it back into the pool.
+ if p.data != 0 {
+ runtime.SetFinalizer(p, nil)
+ destroyPipe(p)
+ return
+ }
+ splicePipePool.Put(p)
+}
+
var disableSplice unsafe.Pointer
-// newTempPipe sets up a temporary pipe for a splice operation.
-func newTempPipe() (prfd, pwfd int, sc string, err error) {
+// newPipe sets up a pipe for a splice operation.
+func newPipe() (sp *splicePipe) {
p := (*bool)(atomic.LoadPointer(&disableSplice))
if p != nil && *p {
- return -1, -1, "splice", syscall.EINVAL
+ return nil
}
var fds [2]int
@@ -165,9 +214,11 @@ func newTempPipe() (prfd, pwfd int, sc string, err error) {
// closed.
const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK
if err := syscall.Pipe2(fds[:], flags); err != nil {
- return -1, -1, "pipe2", err
+ return nil
}
+ sp = &splicePipe{rfd: fds[0], wfd: fds[1]}
+
if p == nil {
p = new(bool)
defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p))
@@ -175,20 +226,16 @@ func newTempPipe() (prfd, pwfd int, sc string, err error) {
// F_GETPIPE_SZ was added in 2.6.35, which does not have the -EAGAIN bug.
if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 {
*p = true
- destroyTempPipe(fds[0], fds[1])
- return -1, -1, "fcntl", errno
+ destroyPipe(sp)
+ return nil
}
}
- return fds[0], fds[1], "", nil
+ return
}
-// destroyTempPipe destroys a temporary pipe.
-func destroyTempPipe(prfd, pwfd int) error {
- err := CloseFunc(prfd)
- err1 := CloseFunc(pwfd)
- if err == nil {
- return err1
- }
- return err
+// destroyPipe destroys a pipe.
+func destroyPipe(p *splicePipe) {
+ CloseFunc(p.rfd)
+ CloseFunc(p.wfd)
}
diff --git a/src/internal/poll/splice_linux_test.go b/src/internal/poll/splice_linux_test.go
new file mode 100644
index 0000000000..9ea5197242
--- /dev/null
+++ b/src/internal/poll/splice_linux_test.go
@@ -0,0 +1,96 @@
+// Copyright 2021 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package poll_test
+
+import (
+ "internal/poll"
+ "runtime"
+ "syscall"
+ "testing"
+ "time"
+)
+
+// checkPipes returns true if all pipes are closed properly, false otherwise.
+func checkPipes(fds []int) bool {
+ for _, fd := range fds {
+ // Check if each pipe fd has been closed.
+ err := syscall.FcntlFlock(uintptr(fd), syscall.F_GETFD, nil)
+ if err == nil {
+ return false
+ }
+ }
+ return true
+}
+
+func TestSplicePipePool(t *testing.T) {
+ const N = 64
+ var (
+ p *poll.SplicePipe
+ ps []*poll.SplicePipe
+ fds []int
+ err error
+ )
+ for i := 0; i < N; i++ {
+ p, _, err = poll.GetPipe()
+ if err != nil {
+ t.Skip("failed to create pipe, skip this test")
+ }
+ prfd, pwfd := poll.GetPipeFds(p)
+ fds = append(fds, prfd, pwfd)
+ ps = append(ps, p)
+ }
+ for _, p = range ps {
+ poll.PutPipe(p)
+ }
+ ps = nil
+
+ var ok bool
+ // Trigger garbage collection to free the pipes in sync.Pool and check whether or not
+ // those pipe buffers have been closed as we expected.
+ for i := 0; i < 5; i++ {
+ runtime.GC()
+ time.Sleep(time.Duration(i*100+10) * time.Millisecond)
+ if ok = checkPipes(fds); ok {
+ break
+ }
+ }
+
+ if !ok {
+ t.Fatal("at least one pipe is still open")
+ }
+}
+
+func BenchmarkSplicePipe(b *testing.B) {
+ b.Run("SplicePipeWithPool", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ p, _, _ := poll.GetPipe()
+ poll.PutPipe(p)
+ }
+ })
+ b.Run("SplicePipeWithoutPool", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ p := poll.NewPipe()
+ poll.DestroyPipe(p)
+ }
+ })
+}
+
+func BenchmarkSplicePipePoolParallel(b *testing.B) {
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ p, _, _ := poll.GetPipe()
+ poll.PutPipe(p)
+ }
+ })
+}
+
+func BenchmarkSplicePipeNativeParallel(b *testing.B) {
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ p := poll.NewPipe()
+ poll.DestroyPipe(p)
+ }
+ })
+}