aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Borg <jakob@kastelo.net>2024-02-10 19:16:27 +0100
committerGitHub <noreply@github.com>2024-02-10 19:16:27 +0100
commit96c30f83870f12ad35477cf440cf57206324e0d3 (patch)
tree2ee2c13202244aa49091e3480dc2627f149e0902
parentfc8b353011c1cd8b5bd6216fcdc60e3777a00c64 (diff)
downloadsyncthing-96c30f83870f12ad35477cf440cf57206324e0d3.tar.gz
syncthing-96c30f83870f12ad35477cf440cf57206324e0d3.zip
lib/model, lib/protocol: Remove FileInfoBatch reuse behavior (#9399)
-rw-r--r--lib/db/util.go12
-rw-r--r--lib/model/indexhandler_test.go84
-rw-r--r--lib/protocol/protocol.go28
-rw-r--r--lib/protocol/wireformat.go14
4 files changed, 119 insertions, 19 deletions
diff --git a/lib/db/util.go b/lib/db/util.go
index c67ca5508..5e9adb267 100644
--- a/lib/db/util.go
+++ b/lib/db/util.go
@@ -22,11 +22,10 @@ type FileInfoBatch struct {
flushFn func([]protocol.FileInfo) error
}
+// NewFileInfoBatch returns a new FileInfoBatch that calls fn when it's time
+// to flush.
func NewFileInfoBatch(fn func([]protocol.FileInfo) error) *FileInfoBatch {
- return &FileInfoBatch{
- infos: make([]protocol.FileInfo, 0, MaxBatchSizeFiles),
- flushFn: fn,
- }
+ return &FileInfoBatch{flushFn: fn}
}
func (b *FileInfoBatch) SetFlushFunc(fn func([]protocol.FileInfo) error) {
@@ -34,6 +33,9 @@ func (b *FileInfoBatch) SetFlushFunc(fn func([]protocol.FileInfo) error) {
}
func (b *FileInfoBatch) Append(f protocol.FileInfo) {
+ if b.infos == nil {
+ b.infos = make([]protocol.FileInfo, 0, MaxBatchSizeFiles)
+ }
b.infos = append(b.infos, f)
b.size += f.ProtoSize()
}
@@ -61,7 +63,7 @@ func (b *FileInfoBatch) Flush() error {
}
func (b *FileInfoBatch) Reset() {
- b.infos = b.infos[:0]
+ b.infos = nil
b.size = 0
}
diff --git a/lib/model/indexhandler_test.go b/lib/model/indexhandler_test.go
new file mode 100644
index 000000000..ba571028d
--- /dev/null
+++ b/lib/model/indexhandler_test.go
@@ -0,0 +1,84 @@
+// Copyright (C) 2024 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at https://mozilla.org/MPL/2.0/.
+
+package model_test
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "testing"
+
+ "github.com/syncthing/syncthing/lib/db"
+ "github.com/syncthing/syncthing/lib/model/mocks"
+ "github.com/syncthing/syncthing/lib/protocol"
+ protomock "github.com/syncthing/syncthing/lib/protocol/mocks"
+ "github.com/syncthing/syncthing/lib/testutil"
+)
+
+func TestIndexhandlerConcurrency(t *testing.T) {
+ // Verify that sending a lot of index update messages using the
+ // FileInfoBatch works and doesn't trigger the race detector.
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ ar, aw := io.Pipe()
+ br, bw := io.Pipe()
+ ci := &protomock.ConnectionInfo{}
+
+ m1 := &mocks.Model{}
+ c1 := protocol.NewConnection(protocol.EmptyDeviceID, ar, bw, testutil.NoopCloser{}, m1, ci, protocol.CompressionNever, nil, nil)
+ c1.Start()
+ defer c1.Close(io.EOF)
+
+ m2 := &mocks.Model{}
+ c2 := protocol.NewConnection(protocol.EmptyDeviceID, br, aw, testutil.NoopCloser{}, m2, ci, protocol.CompressionNever, nil, nil)
+ c2.Start()
+ defer c2.Close(io.EOF)
+
+ c1.ClusterConfig(protocol.ClusterConfig{})
+ c2.ClusterConfig(protocol.ClusterConfig{})
+ c1.Index(ctx, "foo", nil)
+ c2.Index(ctx, "foo", nil)
+
+ const msgs = 5e2
+ const files = 1e3
+
+ recvd := 0
+ m2.IndexUpdateCalls(func(_ protocol.Connection, idxUp *protocol.IndexUpdate) error {
+ for j := 0; j < files; j++ {
+ if n := idxUp.Files[j].Name; n != fmt.Sprintf("f%d-%d", recvd, j) {
+ t.Error("wrong filename", n)
+ }
+ }
+ recvd++
+ return nil
+ })
+
+ b1 := db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
+ return c1.IndexUpdate(ctx, "foo", fs)
+ })
+ for i := 0; i < msgs; i++ {
+ for j := 0; j < files; j++ {
+ b1.Append(protocol.FileInfo{
+ Name: fmt.Sprintf("f%d-%d", i, j),
+ Blocks: []protocol.BlockInfo{{Hash: make([]byte, 32)}},
+ })
+ }
+ if err := b1.Flush(); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ c1.Close(io.EOF)
+ c2.Close(io.EOF)
+ <-c1.Closed()
+ <-c2.Closed()
+
+ if recvd != msgs-1 {
+ t.Error("didn't receive all expected messages")
+ }
+}
diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go
index ec408b6dc..fe5de5d7a 100644
--- a/lib/protocol/protocol.go
+++ b/lib/protocol/protocol.go
@@ -154,15 +154,35 @@ type RequestResponse interface {
}
type Connection interface {
- Start()
- SetFolderPasswords(passwords map[string]string)
- Close(err error)
- DeviceID() DeviceID
+ // Send an index message. The connection will read and marshal the
+ // parameters asynchronously, so they should not be modified after
+ // calling Index().
Index(ctx context.Context, folder string, files []FileInfo) error
+
+ // Send an index update message. The connection will read and marshal
+ // the parameters asynchronously, so they should not be modified after
+ // calling IndexUpdate().
IndexUpdate(ctx context.Context, folder string, files []FileInfo) error
+
+ // Send a request message. The connection will read and marshal the
+ // parameters asynchronously, so they should not be modified after
+ // calling Request().
Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error)
+
+ // Send a cluster configuration message. The connection will read and
+ // marshal the message asynchronously, so it should not be modified
+ // after calling ClusterConfig().
ClusterConfig(config ClusterConfig)
+
+ // Send a download progress message. The connection will read and
+ // marshal the parameters asynchronously, so they should not be modified
+ // after calling DownloadProgress().
DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate)
+
+ Start()
+ SetFolderPasswords(passwords map[string]string)
+ Close(err error)
+ DeviceID() DeviceID
Statistics() Statistics
Closed() <-chan struct{}
ConnectionInfo
diff --git a/lib/protocol/wireformat.go b/lib/protocol/wireformat.go
index cca53cab1..f97a1ed08 100644
--- a/lib/protocol/wireformat.go
+++ b/lib/protocol/wireformat.go
@@ -14,25 +14,19 @@ type wireFormatConnection struct {
}
func (c wireFormatConnection) Index(ctx context.Context, folder string, fs []FileInfo) error {
- var myFs = make([]FileInfo, len(fs))
- copy(myFs, fs)
-
for i := range fs {
- myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name))
+ fs[i].Name = norm.NFC.String(filepath.ToSlash(fs[i].Name))
}
- return c.Connection.Index(ctx, folder, myFs)
+ return c.Connection.Index(ctx, folder, fs)
}
func (c wireFormatConnection) IndexUpdate(ctx context.Context, folder string, fs []FileInfo) error {
- var myFs = make([]FileInfo, len(fs))
- copy(myFs, fs)
-
for i := range fs {
- myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name))
+ fs[i].Name = norm.NFC.String(filepath.ToSlash(fs[i].Name))
}
- return c.Connection.IndexUpdate(ctx, folder, myFs)
+ return c.Connection.IndexUpdate(ctx, folder, fs)
}
func (c wireFormatConnection) Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {