aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Borg <jakob@kastelo.net>2023-08-21 18:39:13 +0200
committerGitHub <noreply@github.com>2023-08-21 18:39:13 +0200
commit40b3b9ad1571f3ae141d03efd1869441a5f0d491 (patch)
tree98d89a12a846447bbcb1649fbdc2b1798d3287a9
parentc2c6133aa576004641971e48d8596e523b1052ee (diff)
downloadsyncthing-40b3b9ad1571f3ae141d03efd1869441a5f0d491.tar.gz
syncthing-40b3b9ad1571f3ae141d03efd1869441a5f0d491.zip
lib/model: Clean up index handler life cycle (fixes #9021) (#9038)
Co-authored-by: Simon Frei <freisim93@gmail.com>
-rw-r--r--lib/model/fakeconns_test.go4
-rw-r--r--lib/model/indexhandler.go62
-rw-r--r--lib/model/model.go31
-rw-r--r--lib/model/model_test.go5
-rw-r--r--lib/model/service_map.go103
-rw-r--r--lib/model/service_map_test.go156
6 files changed, 300 insertions, 61 deletions
diff --git a/lib/model/fakeconns_test.go b/lib/model/fakeconns_test.go
index 8b51741a1..6aa6d196a 100644
--- a/lib/model/fakeconns_test.go
+++ b/lib/model/fakeconns_test.go
@@ -14,6 +14,7 @@ import (
"github.com/syncthing/syncthing/lib/protocol"
protocolmocks "github.com/syncthing/syncthing/lib/protocol/mocks"
+ "github.com/syncthing/syncthing/lib/rand"
"github.com/syncthing/syncthing/lib/scanner"
)
@@ -36,10 +37,11 @@ func newFakeConnection(id protocol.DeviceID, model Model) *fakeConnection {
f.CloseCalls(func(err error) {
f.closeOnce.Do(func() {
close(f.closed)
+ model.Closed(f, err)
})
- model.Closed(f, err)
f.ClosedReturns(f.closed)
})
+ f.StringReturns(rand.String(8))
return f
}
diff --git a/lib/model/indexhandler.go b/lib/model/indexhandler.go
index a227e7f41..ae11bbf48 100644
--- a/lib/model/indexhandler.go
+++ b/lib/model/indexhandler.go
@@ -12,8 +12,6 @@ import (
"sync"
"time"
- "github.com/thejerf/suture/v4"
-
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/events"
@@ -28,7 +26,6 @@ type indexHandler struct {
folderIsReceiveEncrypted bool
prevSequence int64
evLogger events.Logger
- token suture.ServiceToken
cond *sync.Cond
paused bool
@@ -373,11 +370,10 @@ func (s *indexHandler) String() string {
}
type indexHandlerRegistry struct {
- sup *suture.Supervisor
evLogger events.Logger
conn protocol.Connection
downloads *deviceDownloadState
- indexHandlers map[string]*indexHandler
+ indexHandlers *serviceMap[string, *indexHandler]
startInfos map[string]*clusterConfigDeviceInfo
folderStates map[string]*indexHandlerFolderState
mut sync.Mutex
@@ -389,27 +385,16 @@ type indexHandlerFolderState struct {
runner service
}
-func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, closed chan struct{}, parentSup *suture.Supervisor, evLogger events.Logger) *indexHandlerRegistry {
+func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, evLogger events.Logger) *indexHandlerRegistry {
r := &indexHandlerRegistry{
+ evLogger: evLogger,
conn: conn,
downloads: downloads,
- evLogger: evLogger,
- indexHandlers: make(map[string]*indexHandler),
+ indexHandlers: newServiceMap[string, *indexHandler](evLogger),
startInfos: make(map[string]*clusterConfigDeviceInfo),
folderStates: make(map[string]*indexHandlerFolderState),
mut: sync.Mutex{},
}
- r.sup = suture.New(r.String(), svcutil.SpecWithDebugLogger(l))
- ourToken := parentSup.Add(r.sup)
- r.sup.Add(svcutil.AsService(func(ctx context.Context) error {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-closed:
- parentSup.Remove(ourToken)
- }
- return nil
- }, fmt.Sprintf("%v/waitForClosed", r)))
return r
}
@@ -417,20 +402,18 @@ func (r *indexHandlerRegistry) String() string {
return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.DeviceID().Short())
}
-func (r *indexHandlerRegistry) GetSupervisor() *suture.Supervisor {
- return r.sup
+func (r *indexHandlerRegistry) Serve(ctx context.Context) error {
+ // Running the index handler registry means running the individual index
+ // handler children.
+ return r.indexHandlers.Serve(ctx)
}
func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo) {
- if is, ok := r.indexHandlers[folder.ID]; ok {
- r.sup.RemoveAndWait(is.token, 0)
- delete(r.indexHandlers, folder.ID)
- }
+ r.indexHandlers.RemoveAndWait(folder.ID, 0)
delete(r.startInfos, folder.ID)
is := newIndexHandler(r.conn, r.downloads, folder, fset, runner, startInfo, r.evLogger)
- is.token = r.sup.Add(is)
- r.indexHandlers[folder.ID] = is
+ r.indexHandlers.Add(folder.ID, is)
// This new connection might help us get in sync.
runner.SchedulePull()
@@ -444,9 +427,7 @@ func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterCon
r.mut.Lock()
defer r.mut.Unlock()
- if is, ok := r.indexHandlers[folder]; ok {
- r.sup.RemoveAndWait(is.token, 0)
- delete(r.indexHandlers, folder)
+ if r.indexHandlers.RemoveAndWait(folder, 0) {
l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.DeviceID().Short(), folder)
}
folderState, ok := r.folderStates[folder]
@@ -465,10 +446,7 @@ func (r *indexHandlerRegistry) Remove(folder string) {
defer r.mut.Unlock()
l.Debugf("Removing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
- if is, ok := r.indexHandlers[folder]; ok {
- r.sup.RemoveAndWait(is.token, 0)
- delete(r.indexHandlers, folder)
- }
+ r.indexHandlers.RemoveAndWait(folder, 0)
delete(r.startInfos, folder)
l.Debugf("Removed index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
}
@@ -480,13 +458,12 @@ func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]remoteFolderSta
r.mut.Lock()
defer r.mut.Unlock()
- for folder, is := range r.indexHandlers {
+ r.indexHandlers.Each(func(folder string, is *indexHandler) {
if _, ok := except[folder]; !ok {
- r.sup.RemoveAndWait(is.token, 0)
- delete(r.indexHandlers, folder)
+ r.indexHandlers.RemoveAndWait(folder, 0)
l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder)
}
- }
+ })
for folder := range r.startInfos {
if _, ok := except[folder]; !ok {
delete(r.startInfos, folder)
@@ -518,7 +495,7 @@ func (r *indexHandlerRegistry) RegisterFolderState(folder config.FolderConfigura
func (r *indexHandlerRegistry) folderPausedLocked(folder string) {
l.Debugf("Pausing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
delete(r.folderStates, folder)
- if is, ok := r.indexHandlers[folder]; ok {
+ if is, ok := r.indexHandlers.Get(folder); ok {
is.pause()
l.Debugf("Paused index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
} else {
@@ -536,11 +513,10 @@ func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfigura
runner: runner,
}
- is, isOk := r.indexHandlers[folder.ID]
+ is, isOk := r.indexHandlers.Get(folder.ID)
if info, ok := r.startInfos[folder.ID]; ok {
if isOk {
- r.sup.RemoveAndWait(is.token, 0)
- delete(r.indexHandlers, folder.ID)
+ r.indexHandlers.RemoveAndWait(folder.ID, 0)
l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.DeviceID().Short(), folder.ID)
}
r.startLocked(folder, fset, runner, info)
@@ -557,7 +533,7 @@ func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfigura
func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string) error {
r.mut.Lock()
defer r.mut.Unlock()
- is, isOk := r.indexHandlers[folder]
+ is, isOk := r.indexHandlers.Get(folder)
if !isOk {
l.Infof("%v for nonexistent or paused folder %q", op, folder)
return fmt.Errorf("%s: %w", folder, ErrFolderMissing)
diff --git a/lib/model/model.go b/lib/model/model.go
index 3f0acdd4c..0fafd0d03 100644
--- a/lib/model/model.go
+++ b/lib/model/model.go
@@ -165,7 +165,7 @@ type model struct {
helloMessages map[protocol.DeviceID]protocol.Hello
deviceDownloads map[protocol.DeviceID]*deviceDownloadState
remoteFolderStates map[protocol.DeviceID]map[string]remoteFolderState // deviceID -> folders
- indexHandlers map[protocol.DeviceID]*indexHandlerRegistry
+ indexHandlers *serviceMap[protocol.DeviceID, *indexHandlerRegistry]
// for testing only
foldersRunning atomic.Int32
@@ -248,12 +248,13 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
helloMessages: make(map[protocol.DeviceID]protocol.Hello),
deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
remoteFolderStates: make(map[protocol.DeviceID]map[string]remoteFolderState),
- indexHandlers: make(map[protocol.DeviceID]*indexHandlerRegistry),
+ indexHandlers: newServiceMap[protocol.DeviceID, *indexHandlerRegistry](evLogger),
}
for devID := range cfg.Devices() {
m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID)
}
m.Add(m.progressEmitter)
+ m.Add(m.indexHandlers)
m.Add(svcutil.AsService(m.serve, m.String()))
return m
@@ -487,9 +488,9 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) {
}
m.cleanupFolderLocked(cfg)
- for _, r := range m.indexHandlers {
+ m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) {
r.Remove(cfg.ID)
- }
+ })
m.fmut.Unlock()
m.pmut.RUnlock()
@@ -563,9 +564,9 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
// Care needs to be taken because we already hold fmut and the lock order
// must be the same everywhere. As fmut is acquired first, this is fine.
m.pmut.RLock()
- for _, indexRegistry := range m.indexHandlers {
- indexRegistry.RegisterFolderState(to, fset, m.folderRunners[to.ID])
- }
+ m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) {
+ r.RegisterFolderState(to, fset, m.folderRunners[to.ID])
+ })
m.pmut.RUnlock()
var infoMsg string
@@ -601,9 +602,9 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool
// Care needs to be taken because we already hold fmut and the lock order
// must be the same everywhere. As fmut is acquired first, this is fine.
m.pmut.RLock()
- for _, indexRegistry := range m.indexHandlers {
- indexRegistry.RegisterFolderState(cfg, fset, m.folderRunners[cfg.ID])
- }
+ m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) {
+ r.RegisterFolderState(cfg, fset, m.folderRunners[cfg.ID])
+ })
m.pmut.RUnlock()
return nil
@@ -1138,7 +1139,7 @@ func (m *model) handleIndex(conn protocol.Connection, folder string, fs []protoc
}
m.pmut.RLock()
- indexHandler, ok := m.indexHandlers[deviceID]
+ indexHandler, ok := m.indexHandlers.Get(deviceID)
m.pmut.RUnlock()
if !ok {
// This should be impossible, as an index handler always exists for an
@@ -1170,7 +1171,7 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi
l.Debugf("Handling ClusterConfig from %v", deviceID.Short())
m.pmut.RLock()
- indexHandlerRegistry, ok := m.indexHandlers[deviceID]
+ indexHandlerRegistry, ok := m.indexHandlers.Get(deviceID)
m.pmut.RUnlock()
if !ok {
panic("bug: ClusterConfig called on closed or nonexistent connection")
@@ -1792,7 +1793,7 @@ func (m *model) Closed(conn protocol.Connection, err error) {
delete(m.remoteFolderStates, device)
closed := m.closed[device]
delete(m.closed, device)
- delete(m.indexHandlers, device)
+ m.indexHandlers.RemoveAndWait(device, 0)
m.pmut.Unlock()
m.progressEmitter.temporaryIndexUnsubscribe(conn)
@@ -2251,11 +2252,11 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) {
closed := make(chan struct{})
m.closed[deviceID] = closed
m.deviceDownloads[deviceID] = newDeviceDownloadState()
- indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], closed, m.Supervisor, m.evLogger)
+ indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], m.evLogger)
for id, fcfg := range m.folderCfgs {
indexRegistry.RegisterFolderState(fcfg, m.folderFiles[id], m.folderRunners[id])
}
- m.indexHandlers[deviceID] = indexRegistry
+ m.indexHandlers.Add(deviceID, indexRegistry)
m.fmut.RUnlock()
// 0: default, <0: no limiting
switch {
diff --git a/lib/model/model_test.go b/lib/model/model_test.go
index 96cb31ae9..daaa76fde 100644
--- a/lib/model/model_test.go
+++ b/lib/model/model_test.go
@@ -1337,8 +1337,9 @@ func TestAutoAcceptEnc(t *testing.T) {
// Earlier tests might cause the connection to get closed, thus ClusterConfig
// would panic.
clusterConfig := func(deviceID protocol.DeviceID, cm protocol.ClusterConfig) {
- m.AddConnection(newFakeConnection(deviceID, m), protocol.Hello{})
- m.ClusterConfig(&protocolmocks.Connection{DeviceIDStub: func() protocol.DeviceID { return deviceID }}, cm)
+ conn := newFakeConnection(deviceID, m)
+ m.AddConnection(conn, protocol.Hello{})
+ m.ClusterConfig(conn, cm)
}
clusterConfig(device1, basicCC())
diff --git a/lib/model/service_map.go b/lib/model/service_map.go
new file mode 100644
index 000000000..bef67203f
--- /dev/null
+++ b/lib/model/service_map.go
@@ -0,0 +1,103 @@
+// Copyright (C) 2023 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at https://mozilla.org/MPL/2.0/.
+
+package model
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/syncthing/syncthing/lib/events"
+ "github.com/syncthing/syncthing/lib/svcutil"
+ "github.com/thejerf/suture/v4"
+)
+
+// A serviceMap is a utility map of arbitrary keys to a suture.Service of
+// some kind, where adding and removing services ensures they are properly
+// started and stopped on the given Supervisor. The serviceMap is itself a
+// suture.Service and should be added to a Supervisor.
+// Not safe for concurrent use.
+type serviceMap[K comparable, S suture.Service] struct {
+ services map[K]S
+ tokens map[K]suture.ServiceToken
+ supervisor *suture.Supervisor
+ eventLogger events.Logger
+}
+
+func newServiceMap[K comparable, S suture.Service](eventLogger events.Logger) *serviceMap[K, S] {
+ m := &serviceMap[K, S]{
+ services: make(map[K]S),
+ tokens: make(map[K]suture.ServiceToken),
+ eventLogger: eventLogger,
+ }
+ m.supervisor = suture.New(m.String(), svcutil.SpecWithDebugLogger(l))
+ return m
+}
+
+// Add adds a service to the map, starting it on the supervisor. If there is
+// already a service at the given key, it is removed first.
+func (s *serviceMap[K, S]) Add(k K, v S) {
+ if tok, ok := s.tokens[k]; ok {
+ // There is already a service at this key, remove it first.
+ s.supervisor.Remove(tok)
+ s.eventLogger.Log(events.Failure, fmt.Sprintf("%s replaced service at key %v", s, k))
+ }
+ s.services[k] = v
+ s.tokens[k] = s.supervisor.Add(v)
+}
+
+// Get returns the service at the given key, or the empty value and false if
+// there is no service at that key.
+func (s *serviceMap[K, S]) Get(k K) (v S, ok bool) {
+ v, ok = s.services[k]
+ return
+}
+
+// Remove removes the service at the given key, stopping it on the supervisor.
+// If there is no service at the given key, nothing happens. The return value
+// indicates whether a service was removed.
+func (s *serviceMap[K, S]) Remove(k K) (found bool) {
+ if tok, ok := s.tokens[k]; ok {
+ found = true
+ s.supervisor.Remove(tok)
+ }
+ delete(s.services, k)
+ delete(s.tokens, k)
+ return
+}
+
+// RemoveAndWait removes the service at the given key, stopping it on the
+// supervisor. If there is no service at the given key, nothing happens. The
+// return value indicates whether a service was removed.
+func (s *serviceMap[K, S]) RemoveAndWait(k K, timeout time.Duration) (found bool) {
+ if tok, ok := s.tokens[k]; ok {
+ found = true
+ s.supervisor.RemoveAndWait(tok, timeout)
+ }
+ delete(s.services, k)
+ delete(s.tokens, k)
+ return found
+}
+
+// Each calls the given function for each service in the map.
+func (s *serviceMap[K, S]) Each(fn func(K, S)) {
+ for key, svc := range s.services {
+ fn(key, svc)
+ }
+}
+
+// Suture implementation
+
+func (s *serviceMap[K, S]) Serve(ctx context.Context) error {
+ return s.supervisor.Serve(ctx)
+}
+
+func (s *serviceMap[K, S]) String() string {
+ var kv K
+ var sv S
+ return fmt.Sprintf("serviceMap[%T, %T]@%p", kv, sv, s)
+}
diff --git a/lib/model/service_map_test.go b/lib/model/service_map_test.go
new file mode 100644
index 000000000..3f7c9bae5
--- /dev/null
+++ b/lib/model/service_map_test.go
@@ -0,0 +1,156 @@
+// Copyright (C) 2023 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at https://mozilla.org/MPL/2.0/.
+
+package model
+
+import (
+ "context"
+ "strings"
+ "testing"
+
+ "github.com/syncthing/syncthing/lib/events"
+ "github.com/thejerf/suture/v4"
+)
+
+func TestServiceMap(t *testing.T) {
+ t.Parallel()
+ ctx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+ sup := suture.NewSimple("TestServiceMap")
+ sup.ServeBackground(ctx)
+
+ t.Run("SimpleAddRemove", func(t *testing.T) {
+ t.Parallel()
+
+ sm := newServiceMap[string, *dummyService](events.NoopLogger)
+ sup.Add(sm)
+
+ // Add two services. They should start.
+
+ d1 := newDummyService()
+ d2 := newDummyService()
+
+ sm.Add("d1", d1)
+ sm.Add("d2", d2)
+
+ <-d1.started
+ <-d2.started
+
+ // Remove them. They should stop.
+
+ if !sm.Remove("d1") {
+ t.Errorf("Remove failed")
+ }
+ if !sm.Remove("d2") {
+ t.Errorf("Remove failed")
+ }
+
+ <-d1.stopped
+ <-d2.stopped
+ })
+
+ t.Run("OverwriteImpliesRemove", func(t *testing.T) {
+ t.Parallel()
+
+ sm := newServiceMap[string, *dummyService](events.NoopLogger)
+ sup.Add(sm)
+
+ d1 := newDummyService()
+ d2 := newDummyService()
+
+ // Add d1, it should start.
+
+ sm.Add("k", d1)
+ <-d1.started
+
+ // Add d2, with the same key. The previous one should stop as we're
+ // doing a replace.
+
+ sm.Add("k", d2)
+ <-d1.stopped
+ <-d2.started
+
+ if !sm.Remove("k") {
+ t.Errorf("Remove failed")
+ }
+
+ <-d2.stopped
+ })
+
+ t.Run("IterateWithRemoveAndWait", func(t *testing.T) {
+ t.Parallel()
+
+ sm := newServiceMap[string, *dummyService](events.NoopLogger)
+ sup.Add(sm)
+
+ // Add four services.
+
+ d1 := newDummyService()
+ d2 := newDummyService()
+ d3 := newDummyService()
+ d4 := newDummyService()
+
+ sm.Add("keep1", d1)
+ sm.Add("remove2", d2)
+ sm.Add("keep3", d3)
+ sm.Add("remove4", d4)
+
+ <-d1.started
+ <-d2.started
+ <-d3.started
+ <-d4.started
+
+ // Remove two of them from within the iterator.
+
+ sm.Each(func(k string, v *dummyService) {
+ if strings.HasPrefix(k, "remove") {
+ sm.RemoveAndWait(k, 0)
+ }
+ })
+
+ // They should have stopped.
+
+ <-d2.stopped
+ <-d4.stopped
+
+ // They should not be in the map anymore.
+
+ if _, ok := sm.Get("remove2"); ok {
+ t.Errorf("Service still in map")
+ }
+ if _, ok := sm.Get("remove4"); ok {
+ t.Errorf("Service still in map")
+ }
+
+ // The other two should still be running.
+
+ if _, ok := sm.Get("keep1"); !ok {
+ t.Errorf("Service not in map")
+ }
+ if _, ok := sm.Get("keep3"); !ok {
+ t.Errorf("Service not in map")
+ }
+ })
+}
+
+type dummyService struct {
+ started chan struct{}
+ stopped chan struct{}
+}
+
+func newDummyService() *dummyService {
+ return &dummyService{
+ started: make(chan struct{}),
+ stopped: make(chan struct{}),
+ }
+}
+
+func (d *dummyService) Serve(ctx context.Context) error {
+ close(d.started)
+ defer close(d.stopped)
+ <-ctx.Done()
+ return nil
+}