diff --git a/lib/model/fakeconns_test.go b/lib/model/fakeconns_test.go index 8b51741a1..6aa6d196a 100644 --- a/lib/model/fakeconns_test.go +++ b/lib/model/fakeconns_test.go @@ -14,6 +14,7 @@ import ( "github.com/syncthing/syncthing/lib/protocol" protocolmocks "github.com/syncthing/syncthing/lib/protocol/mocks" + "github.com/syncthing/syncthing/lib/rand" "github.com/syncthing/syncthing/lib/scanner" ) @@ -36,10 +37,11 @@ func newFakeConnection(id protocol.DeviceID, model Model) *fakeConnection { f.CloseCalls(func(err error) { f.closeOnce.Do(func() { close(f.closed) + model.Closed(f, err) }) - model.Closed(f, err) f.ClosedReturns(f.closed) }) + f.StringReturns(rand.String(8)) return f } diff --git a/lib/model/indexhandler.go b/lib/model/indexhandler.go index a227e7f41..ae11bbf48 100644 --- a/lib/model/indexhandler.go +++ b/lib/model/indexhandler.go @@ -12,8 +12,6 @@ import ( "sync" "time" - "github.com/thejerf/suture/v4" - "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/db" "github.com/syncthing/syncthing/lib/events" @@ -28,7 +26,6 @@ type indexHandler struct { folderIsReceiveEncrypted bool prevSequence int64 evLogger events.Logger - token suture.ServiceToken cond *sync.Cond paused bool @@ -373,11 +370,10 @@ func (s *indexHandler) String() string { } type indexHandlerRegistry struct { - sup *suture.Supervisor evLogger events.Logger conn protocol.Connection downloads *deviceDownloadState - indexHandlers map[string]*indexHandler + indexHandlers *serviceMap[string, *indexHandler] startInfos map[string]*clusterConfigDeviceInfo folderStates map[string]*indexHandlerFolderState mut sync.Mutex @@ -389,27 +385,16 @@ type indexHandlerFolderState struct { runner service } -func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, closed chan struct{}, parentSup *suture.Supervisor, evLogger events.Logger) *indexHandlerRegistry { +func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, evLogger events.Logger) *indexHandlerRegistry { r := &indexHandlerRegistry{ + evLogger: evLogger, conn: conn, downloads: downloads, - evLogger: evLogger, - indexHandlers: make(map[string]*indexHandler), + indexHandlers: newServiceMap[string, *indexHandler](evLogger), startInfos: make(map[string]*clusterConfigDeviceInfo), folderStates: make(map[string]*indexHandlerFolderState), mut: sync.Mutex{}, } - r.sup = suture.New(r.String(), svcutil.SpecWithDebugLogger(l)) - ourToken := parentSup.Add(r.sup) - r.sup.Add(svcutil.AsService(func(ctx context.Context) error { - select { - case <-ctx.Done(): - return ctx.Err() - case <-closed: - parentSup.Remove(ourToken) - } - return nil - }, fmt.Sprintf("%v/waitForClosed", r))) return r } @@ -417,20 +402,18 @@ func (r *indexHandlerRegistry) String() string { return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.DeviceID().Short()) } -func (r *indexHandlerRegistry) GetSupervisor() *suture.Supervisor { - return r.sup +func (r *indexHandlerRegistry) Serve(ctx context.Context) error { + // Running the index handler registry means running the individual index + // handler children. + return r.indexHandlers.Serve(ctx) } func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo) { - if is, ok := r.indexHandlers[folder.ID]; ok { - r.sup.RemoveAndWait(is.token, 0) - delete(r.indexHandlers, folder.ID) - } + r.indexHandlers.RemoveAndWait(folder.ID, 0) delete(r.startInfos, folder.ID) is := newIndexHandler(r.conn, r.downloads, folder, fset, runner, startInfo, r.evLogger) - is.token = r.sup.Add(is) - r.indexHandlers[folder.ID] = is + r.indexHandlers.Add(folder.ID, is) // This new connection might help us get in sync. runner.SchedulePull() @@ -444,9 +427,7 @@ func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterCon r.mut.Lock() defer r.mut.Unlock() - if is, ok := r.indexHandlers[folder]; ok { - r.sup.RemoveAndWait(is.token, 0) - delete(r.indexHandlers, folder) + if r.indexHandlers.RemoveAndWait(folder, 0) { l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.DeviceID().Short(), folder) } folderState, ok := r.folderStates[folder] @@ -465,10 +446,7 @@ func (r *indexHandlerRegistry) Remove(folder string) { defer r.mut.Unlock() l.Debugf("Removing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder) - if is, ok := r.indexHandlers[folder]; ok { - r.sup.RemoveAndWait(is.token, 0) - delete(r.indexHandlers, folder) - } + r.indexHandlers.RemoveAndWait(folder, 0) delete(r.startInfos, folder) l.Debugf("Removed index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder) } @@ -480,13 +458,12 @@ func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]remoteFolderSta r.mut.Lock() defer r.mut.Unlock() - for folder, is := range r.indexHandlers { + r.indexHandlers.Each(func(folder string, is *indexHandler) { if _, ok := except[folder]; !ok { - r.sup.RemoveAndWait(is.token, 0) - delete(r.indexHandlers, folder) + r.indexHandlers.RemoveAndWait(folder, 0) l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder) } - } + }) for folder := range r.startInfos { if _, ok := except[folder]; !ok { delete(r.startInfos, folder) @@ -518,7 +495,7 @@ func (r *indexHandlerRegistry) RegisterFolderState(folder config.FolderConfigura func (r *indexHandlerRegistry) folderPausedLocked(folder string) { l.Debugf("Pausing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder) delete(r.folderStates, folder) - if is, ok := r.indexHandlers[folder]; ok { + if is, ok := r.indexHandlers.Get(folder); ok { is.pause() l.Debugf("Paused index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder) } else { @@ -536,11 +513,10 @@ func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfigura runner: runner, } - is, isOk := r.indexHandlers[folder.ID] + is, isOk := r.indexHandlers.Get(folder.ID) if info, ok := r.startInfos[folder.ID]; ok { if isOk { - r.sup.RemoveAndWait(is.token, 0) - delete(r.indexHandlers, folder.ID) + r.indexHandlers.RemoveAndWait(folder.ID, 0) l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.DeviceID().Short(), folder.ID) } r.startLocked(folder, fset, runner, info) @@ -557,7 +533,7 @@ func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfigura func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string) error { r.mut.Lock() defer r.mut.Unlock() - is, isOk := r.indexHandlers[folder] + is, isOk := r.indexHandlers.Get(folder) if !isOk { l.Infof("%v for nonexistent or paused folder %q", op, folder) return fmt.Errorf("%s: %w", folder, ErrFolderMissing) diff --git a/lib/model/model.go b/lib/model/model.go index 3f0acdd4c..0fafd0d03 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -165,7 +165,7 @@ type model struct { helloMessages map[protocol.DeviceID]protocol.Hello deviceDownloads map[protocol.DeviceID]*deviceDownloadState remoteFolderStates map[protocol.DeviceID]map[string]remoteFolderState // deviceID -> folders - indexHandlers map[protocol.DeviceID]*indexHandlerRegistry + indexHandlers *serviceMap[protocol.DeviceID, *indexHandlerRegistry] // for testing only foldersRunning atomic.Int32 @@ -248,12 +248,13 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio helloMessages: make(map[protocol.DeviceID]protocol.Hello), deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState), remoteFolderStates: make(map[protocol.DeviceID]map[string]remoteFolderState), - indexHandlers: make(map[protocol.DeviceID]*indexHandlerRegistry), + indexHandlers: newServiceMap[protocol.DeviceID, *indexHandlerRegistry](evLogger), } for devID := range cfg.Devices() { m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID) } m.Add(m.progressEmitter) + m.Add(m.indexHandlers) m.Add(svcutil.AsService(m.serve, m.String())) return m @@ -487,9 +488,9 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) { } m.cleanupFolderLocked(cfg) - for _, r := range m.indexHandlers { + m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) { r.Remove(cfg.ID) - } + }) m.fmut.Unlock() m.pmut.RUnlock() @@ -563,9 +564,9 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF // Care needs to be taken because we already hold fmut and the lock order // must be the same everywhere. As fmut is acquired first, this is fine. m.pmut.RLock() - for _, indexRegistry := range m.indexHandlers { - indexRegistry.RegisterFolderState(to, fset, m.folderRunners[to.ID]) - } + m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) { + r.RegisterFolderState(to, fset, m.folderRunners[to.ID]) + }) m.pmut.RUnlock() var infoMsg string @@ -601,9 +602,9 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool // Care needs to be taken because we already hold fmut and the lock order // must be the same everywhere. As fmut is acquired first, this is fine. m.pmut.RLock() - for _, indexRegistry := range m.indexHandlers { - indexRegistry.RegisterFolderState(cfg, fset, m.folderRunners[cfg.ID]) - } + m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) { + r.RegisterFolderState(cfg, fset, m.folderRunners[cfg.ID]) + }) m.pmut.RUnlock() return nil @@ -1138,7 +1139,7 @@ func (m *model) handleIndex(conn protocol.Connection, folder string, fs []protoc } m.pmut.RLock() - indexHandler, ok := m.indexHandlers[deviceID] + indexHandler, ok := m.indexHandlers.Get(deviceID) m.pmut.RUnlock() if !ok { // This should be impossible, as an index handler always exists for an @@ -1170,7 +1171,7 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi l.Debugf("Handling ClusterConfig from %v", deviceID.Short()) m.pmut.RLock() - indexHandlerRegistry, ok := m.indexHandlers[deviceID] + indexHandlerRegistry, ok := m.indexHandlers.Get(deviceID) m.pmut.RUnlock() if !ok { panic("bug: ClusterConfig called on closed or nonexistent connection") @@ -1792,7 +1793,7 @@ func (m *model) Closed(conn protocol.Connection, err error) { delete(m.remoteFolderStates, device) closed := m.closed[device] delete(m.closed, device) - delete(m.indexHandlers, device) + m.indexHandlers.RemoveAndWait(device, 0) m.pmut.Unlock() m.progressEmitter.temporaryIndexUnsubscribe(conn) @@ -2251,11 +2252,11 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) { closed := make(chan struct{}) m.closed[deviceID] = closed m.deviceDownloads[deviceID] = newDeviceDownloadState() - indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], closed, m.Supervisor, m.evLogger) + indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], m.evLogger) for id, fcfg := range m.folderCfgs { indexRegistry.RegisterFolderState(fcfg, m.folderFiles[id], m.folderRunners[id]) } - m.indexHandlers[deviceID] = indexRegistry + m.indexHandlers.Add(deviceID, indexRegistry) m.fmut.RUnlock() // 0: default, <0: no limiting switch { diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 96cb31ae9..daaa76fde 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -1337,8 +1337,9 @@ func TestAutoAcceptEnc(t *testing.T) { // Earlier tests might cause the connection to get closed, thus ClusterConfig // would panic. clusterConfig := func(deviceID protocol.DeviceID, cm protocol.ClusterConfig) { - m.AddConnection(newFakeConnection(deviceID, m), protocol.Hello{}) - m.ClusterConfig(&protocolmocks.Connection{DeviceIDStub: func() protocol.DeviceID { return deviceID }}, cm) + conn := newFakeConnection(deviceID, m) + m.AddConnection(conn, protocol.Hello{}) + m.ClusterConfig(conn, cm) } clusterConfig(device1, basicCC()) diff --git a/lib/model/service_map.go b/lib/model/service_map.go new file mode 100644 index 000000000..bef67203f --- /dev/null +++ b/lib/model/service_map.go @@ -0,0 +1,103 @@ +// Copyright (C) 2023 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package model + +import ( + "context" + "fmt" + "time" + + "github.com/syncthing/syncthing/lib/events" + "github.com/syncthing/syncthing/lib/svcutil" + "github.com/thejerf/suture/v4" +) + +// A serviceMap is a utility map of arbitrary keys to a suture.Service of +// some kind, where adding and removing services ensures they are properly +// started and stopped on the given Supervisor. The serviceMap is itself a +// suture.Service and should be added to a Supervisor. +// Not safe for concurrent use. +type serviceMap[K comparable, S suture.Service] struct { + services map[K]S + tokens map[K]suture.ServiceToken + supervisor *suture.Supervisor + eventLogger events.Logger +} + +func newServiceMap[K comparable, S suture.Service](eventLogger events.Logger) *serviceMap[K, S] { + m := &serviceMap[K, S]{ + services: make(map[K]S), + tokens: make(map[K]suture.ServiceToken), + eventLogger: eventLogger, + } + m.supervisor = suture.New(m.String(), svcutil.SpecWithDebugLogger(l)) + return m +} + +// Add adds a service to the map, starting it on the supervisor. If there is +// already a service at the given key, it is removed first. +func (s *serviceMap[K, S]) Add(k K, v S) { + if tok, ok := s.tokens[k]; ok { + // There is already a service at this key, remove it first. + s.supervisor.Remove(tok) + s.eventLogger.Log(events.Failure, fmt.Sprintf("%s replaced service at key %v", s, k)) + } + s.services[k] = v + s.tokens[k] = s.supervisor.Add(v) +} + +// Get returns the service at the given key, or the empty value and false if +// there is no service at that key. +func (s *serviceMap[K, S]) Get(k K) (v S, ok bool) { + v, ok = s.services[k] + return +} + +// Remove removes the service at the given key, stopping it on the supervisor. +// If there is no service at the given key, nothing happens. The return value +// indicates whether a service was removed. +func (s *serviceMap[K, S]) Remove(k K) (found bool) { + if tok, ok := s.tokens[k]; ok { + found = true + s.supervisor.Remove(tok) + } + delete(s.services, k) + delete(s.tokens, k) + return +} + +// RemoveAndWait removes the service at the given key, stopping it on the +// supervisor. If there is no service at the given key, nothing happens. The +// return value indicates whether a service was removed. +func (s *serviceMap[K, S]) RemoveAndWait(k K, timeout time.Duration) (found bool) { + if tok, ok := s.tokens[k]; ok { + found = true + s.supervisor.RemoveAndWait(tok, timeout) + } + delete(s.services, k) + delete(s.tokens, k) + return found +} + +// Each calls the given function for each service in the map. +func (s *serviceMap[K, S]) Each(fn func(K, S)) { + for key, svc := range s.services { + fn(key, svc) + } +} + +// Suture implementation + +func (s *serviceMap[K, S]) Serve(ctx context.Context) error { + return s.supervisor.Serve(ctx) +} + +func (s *serviceMap[K, S]) String() string { + var kv K + var sv S + return fmt.Sprintf("serviceMap[%T, %T]@%p", kv, sv, s) +} diff --git a/lib/model/service_map_test.go b/lib/model/service_map_test.go new file mode 100644 index 000000000..3f7c9bae5 --- /dev/null +++ b/lib/model/service_map_test.go @@ -0,0 +1,156 @@ +// Copyright (C) 2023 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package model + +import ( + "context" + "strings" + "testing" + + "github.com/syncthing/syncthing/lib/events" + "github.com/thejerf/suture/v4" +) + +func TestServiceMap(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + sup := suture.NewSimple("TestServiceMap") + sup.ServeBackground(ctx) + + t.Run("SimpleAddRemove", func(t *testing.T) { + t.Parallel() + + sm := newServiceMap[string, *dummyService](events.NoopLogger) + sup.Add(sm) + + // Add two services. They should start. + + d1 := newDummyService() + d2 := newDummyService() + + sm.Add("d1", d1) + sm.Add("d2", d2) + + <-d1.started + <-d2.started + + // Remove them. They should stop. + + if !sm.Remove("d1") { + t.Errorf("Remove failed") + } + if !sm.Remove("d2") { + t.Errorf("Remove failed") + } + + <-d1.stopped + <-d2.stopped + }) + + t.Run("OverwriteImpliesRemove", func(t *testing.T) { + t.Parallel() + + sm := newServiceMap[string, *dummyService](events.NoopLogger) + sup.Add(sm) + + d1 := newDummyService() + d2 := newDummyService() + + // Add d1, it should start. + + sm.Add("k", d1) + <-d1.started + + // Add d2, with the same key. The previous one should stop as we're + // doing a replace. + + sm.Add("k", d2) + <-d1.stopped + <-d2.started + + if !sm.Remove("k") { + t.Errorf("Remove failed") + } + + <-d2.stopped + }) + + t.Run("IterateWithRemoveAndWait", func(t *testing.T) { + t.Parallel() + + sm := newServiceMap[string, *dummyService](events.NoopLogger) + sup.Add(sm) + + // Add four services. + + d1 := newDummyService() + d2 := newDummyService() + d3 := newDummyService() + d4 := newDummyService() + + sm.Add("keep1", d1) + sm.Add("remove2", d2) + sm.Add("keep3", d3) + sm.Add("remove4", d4) + + <-d1.started + <-d2.started + <-d3.started + <-d4.started + + // Remove two of them from within the iterator. + + sm.Each(func(k string, v *dummyService) { + if strings.HasPrefix(k, "remove") { + sm.RemoveAndWait(k, 0) + } + }) + + // They should have stopped. + + <-d2.stopped + <-d4.stopped + + // They should not be in the map anymore. + + if _, ok := sm.Get("remove2"); ok { + t.Errorf("Service still in map") + } + if _, ok := sm.Get("remove4"); ok { + t.Errorf("Service still in map") + } + + // The other two should still be running. + + if _, ok := sm.Get("keep1"); !ok { + t.Errorf("Service not in map") + } + if _, ok := sm.Get("keep3"); !ok { + t.Errorf("Service not in map") + } + }) +} + +type dummyService struct { + started chan struct{} + stopped chan struct{} +} + +func newDummyService() *dummyService { + return &dummyService{ + started: make(chan struct{}), + stopped: make(chan struct{}), + } +} + +func (d *dummyService) Serve(ctx context.Context) error { + close(d.started) + defer close(d.stopped) + <-ctx.Done() + return nil +}