diff --git a/lib/model/model.go b/lib/model/model.go index bd87968d9..0b6fc24b8 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -157,8 +157,7 @@ type model struct { folderEncryptionPasswordTokens map[string][]byte // folder -> encryption token (may be missing, and only for encryption type folders) folderEncryptionFailures map[string]map[protocol.DeviceID]error // folder -> device -> error regarding encryption consistency (may be missing) - // fields protected by pmut - pmut sync.RWMutex + // fields also protected by fmut connections map[string]protocol.Connection // connection ID -> connection deviceConnIDs map[protocol.DeviceID][]string // device -> connection IDs (invariant: if the key exists, the value is len >= 1, with the primary connection at the start of the slice) promotedConnID map[protocol.DeviceID]string // device -> latest promoted connection ID @@ -238,8 +237,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, ldb *db.Lowlevel, protec folderEncryptionPasswordTokens: make(map[string][]byte), folderEncryptionFailures: make(map[string]map[protocol.DeviceID]error), - // fields protected by pmut - pmut: sync.NewRWMutex(), + // ditto connections: make(map[string]protocol.Connection), deviceConnIDs: make(map[protocol.DeviceID][]string), promotedConnID: make(map[protocol.DeviceID]string), @@ -312,13 +310,13 @@ func (m *model) initFolders(cfg config.Configuration) error { } func (m *model) closeAllConnectionsAndWait() { - m.pmut.RLock() + m.fmut.RLock() closed := make([]chan struct{}, 0, len(m.connections)) for connID, conn := range m.connections { closed = append(closed, m.closed[connID]) go conn.Close(errStopped) } - m.pmut.RUnlock() + m.fmut.RUnlock() for _, c := range closed { <-c } @@ -338,7 +336,6 @@ func (m *model) StartDeadlockDetector(timeout time.Duration) { l.Infof("Starting deadlock detector with %v timeout", timeout) detector := newDeadlockDetector(timeout, m.evLogger, m.fatal) detector.Watch("fmut", m.fmut) - detector.Watch("pmut", m.pmut) } // Need to hold lock on m.fmut when calling this. @@ -472,7 +469,6 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) { // We need to hold both fmut and pmut and must acquire locks in the same // order always. (The locks can be *released* in any order.) m.fmut.Lock() - m.pmut.RLock() isPathUnique := true for folderID, folderCfg := range m.folderCfgs { @@ -498,7 +494,6 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) { }) m.fmut.Unlock() - m.pmut.RUnlock() // Remove it from the database db.DropFolder(m.db, cfg.ID) @@ -563,15 +558,11 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF m.addAndStartFolderLocked(to, fset, cacheIgnoredFiles) } - // Care needs to be taken because we already hold fmut and the lock order - // must be the same everywhere. As fmut is acquired first, this is fine. - m.pmut.RLock() runner, _ := m.folderRunners.Get(to.ID) m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error { r.RegisterFolderState(to, fset, runner) return nil }) - m.pmut.RUnlock() var infoMsg string switch { @@ -603,15 +594,11 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool // Cluster configs might be received and processed before reaching this // point, i.e. before the folder is started. If that's the case, start // index senders here. - // Care needs to be taken because we already hold fmut and the lock order - // must be the same everywhere. As fmut is acquired first, this is fine. - m.pmut.RLock() m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error { runner, _ := m.folderRunners.Get(cfg.ID) r.RegisterFolderState(cfg, fset, runner) return nil }) - m.pmut.RUnlock() return nil } @@ -645,11 +632,11 @@ func (m *model) UsageReportingStats(report *contract.Report, version int, previe blockStatsMut.Unlock() // Transport stats - m.pmut.RLock() + m.fmut.RLock() for _, conn := range m.connections { report.TransportStats[conn.Transport()]++ } - m.pmut.RUnlock() + m.fmut.RUnlock() // Ignore stats var seenPrefix [3]bool @@ -736,8 +723,8 @@ type ConnectionInfo struct { // ConnectionStats returns a map with connection statistics for each device. func (m *model) ConnectionStats() map[string]interface{} { - m.pmut.RLock() - defer m.pmut.RUnlock() + m.fmut.RLock() + defer m.fmut.RUnlock() res := make(map[string]interface{}) devs := m.cfg.Devices() @@ -812,8 +799,6 @@ func (m *model) ConnectionStats() map[string]interface{} { func (m *model) DeviceStatistics() (map[protocol.DeviceID]stats.DeviceStatistics, error) { m.fmut.RLock() defer m.fmut.RUnlock() - m.pmut.RLock() - defer m.pmut.RUnlock() res := make(map[protocol.DeviceID]stats.DeviceStatistics, len(m.deviceStatRefs)) for id, sr := range m.deviceStatRefs { stats, err := sr.GetStatistics() @@ -965,10 +950,10 @@ func (m *model) folderCompletion(device protocol.DeviceID, folder string) (Folde } defer snap.Release() - m.pmut.RLock() + m.fmut.RLock() state := m.remoteFolderStates[device][folder] downloaded := m.deviceDownloads[device].BytesDownloaded(folder) - m.pmut.RUnlock() + m.fmut.RUnlock() need := snap.NeedSize(device) need.Bytes -= downloaded @@ -1191,9 +1176,9 @@ func (m *model) handleIndex(conn protocol.Connection, folder string, fs []protoc return fmt.Errorf("%s: %w", folder, ErrFolderPaused) } - m.pmut.RLock() + m.fmut.RLock() indexHandler, ok := m.getIndexHandlerPRLocked(conn) - m.pmut.RUnlock() + m.fmut.RUnlock() if !ok { // This should be impossible, as an index handler is registered when // we send a cluster config, and that is what triggers index @@ -1306,9 +1291,9 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi return err } - m.pmut.Lock() + m.fmut.Lock() m.remoteFolderStates[deviceID] = states - m.pmut.Unlock() + m.fmut.Unlock() m.evLogger.Log(events.ClusterConfigReceived, ClusterConfigReceivedEventData{ Device: deviceID, @@ -1317,11 +1302,11 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi if len(tempIndexFolders) > 0 { var connOK bool var conn protocol.Connection - m.pmut.RLock() + m.fmut.RLock() if connIDs, connIDOK := m.deviceConnIDs[deviceID]; connIDOK { conn, connOK = m.connections[connIDs[0]] } - m.pmut.RUnlock() + m.fmut.RUnlock() // In case we've got ClusterConfig, and the connection disappeared // from infront of our nose. if connOK { @@ -1354,11 +1339,8 @@ func (m *model) ensureIndexHandler(conn protocol.Connection) *indexHandlerRegist deviceID := conn.DeviceID() connID := conn.ConnectionID() - // We must acquire fmut first when acquiring both locks. - m.fmut.RLock() - defer m.fmut.RUnlock() - m.pmut.Lock() - defer m.pmut.Unlock() + m.fmut.Lock() + defer m.fmut.Unlock() indexHandlerRegistry, ok := m.indexHandlers.Get(deviceID) if ok && indexHandlerRegistry.conn.ConnectionID() == connID { @@ -1650,13 +1632,13 @@ func (m *model) sendClusterConfig(ids []protocol.DeviceID) { return } ccConns := make([]protocol.Connection, 0, len(ids)) - m.pmut.RLock() + m.fmut.RLock() for _, id := range ids { if connIDs, ok := m.deviceConnIDs[id]; ok { ccConns = append(ccConns, m.connections[connIDs[0]]) } } - m.pmut.RUnlock() + m.fmut.RUnlock() // Generating cluster-configs acquires fmut -> must happen outside of pmut. for _, conn := range ccConns { cm, passwords := m.generateClusterConfig(conn.DeviceID()) @@ -1893,10 +1875,10 @@ func (m *model) Closed(conn protocol.Connection, err error) { connID := conn.ConnectionID() deviceID := conn.DeviceID() - m.pmut.Lock() + m.fmut.Lock() conn, ok := m.connections[connID] if !ok { - m.pmut.Unlock() + m.fmut.Unlock() return } @@ -1927,7 +1909,7 @@ func (m *model) Closed(conn protocol.Connection, err error) { m.deviceConnIDs[deviceID] = remainingConns } - m.pmut.Unlock() + m.fmut.Unlock() if wait != nil { <-wait } @@ -2029,9 +2011,9 @@ func (m *model) Request(conn protocol.Connection, folder, name string, _, size i // Restrict parallel requests by connection/device - m.pmut.RLock() + m.fmut.RLock() limiter := m.connRequestLimiters[deviceID] - m.pmut.RUnlock() + m.fmut.RUnlock() // The requestResponse releases the bytes to the buffer pool and the // limiters when its Close method is called. @@ -2218,9 +2200,9 @@ func (m *model) GetMtimeMapping(folder string, file string) (fs.MtimeMapping, er // Connection returns if we are connected to the given device. func (m *model) ConnectedTo(deviceID protocol.DeviceID) bool { - m.pmut.RLock() + m.fmut.RLock() _, ok := m.deviceConnIDs[deviceID] - m.pmut.RUnlock() + m.fmut.RUnlock() return ok } @@ -2353,7 +2335,7 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) { connID := conn.ConnectionID() closed := make(chan struct{}) - m.pmut.Lock() + m.fmut.Lock() m.connections[connID] = conn m.closed[connID] = closed @@ -2384,7 +2366,7 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) { l.Infof(`Additional connection (+%d) for device %s at %s`, len(m.deviceConnIDs[deviceID])-1, deviceID.Short(), conn) } - m.pmut.Unlock() + m.fmut.Unlock() if (deviceCfg.Name == "" || m.cfg.Options().OverwriteRemoteDevNames) && hello.DeviceName != "" { m.cfg.Modify(func(cfg *config.Configuration) { @@ -2415,11 +2397,8 @@ func (m *model) scheduleConnectionPromotion() { // be called after adding new connections, and after closing a primary // device connection. func (m *model) promoteConnections() { - m.fmut.RLock() // for generateClusterConfigFRLocked - defer m.fmut.RUnlock() - - m.pmut.Lock() // for most other things - defer m.pmut.Unlock() + m.fmut.Lock() + defer m.fmut.Unlock() for deviceID, connIDs := range m.deviceConnIDs { cm, passwords := m.generateClusterConfigFRLocked(deviceID) @@ -2464,9 +2443,9 @@ func (m *model) DownloadProgress(conn protocol.Connection, folder string, update return nil } - m.pmut.RLock() + m.fmut.RLock() downloads := m.deviceDownloads[deviceID] - m.pmut.RUnlock() + m.fmut.RUnlock() downloads.Update(folder, updates) state := downloads.GetBlockCounts(folder) @@ -2511,8 +2490,8 @@ func (m *model) requestGlobal(ctx context.Context, deviceID protocol.DeviceID, f // ("primary") connection, which is dedicated to index data, and pick a // random one of the others. func (m *model) requestConnectionForDevice(deviceID protocol.DeviceID) (protocol.Connection, bool) { - m.pmut.RLock() - defer m.pmut.RUnlock() + m.fmut.RLock() + defer m.fmut.RUnlock() connIDs, ok := m.deviceConnIDs[deviceID] if !ok { @@ -2903,12 +2882,10 @@ func (m *model) Availability(folder string, file protocol.FileInfo, block protoc // get heavily modified on Close()), but also must acquire fmut before // pmut. (The locks can be *released* in any order.) m.fmut.RLock() - m.pmut.RLock() - defer m.pmut.RUnlock() + defer m.fmut.RUnlock() fs, ok := m.folderFiles[folder] cfg := m.folderCfgs[folder] - m.fmut.RUnlock() if !ok { return nil, ErrFolderMissing @@ -2924,8 +2901,8 @@ func (m *model) Availability(folder string, file protocol.FileInfo, block protoc } func (m *model) availabilityInSnapshot(cfg config.FolderConfiguration, snap *db.Snapshot, file protocol.FileInfo, block protocol.BlockInfo) []Availability { - m.pmut.RLock() - defer m.pmut.RUnlock() + m.fmut.RLock() + defer m.fmut.RUnlock() return m.availabilityInSnapshotPRlocked(cfg, snap, file, block) } @@ -3113,9 +3090,9 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { } if toCfg.MaxRequestKiB != fromCfg.MaxRequestKiB { - m.pmut.Lock() + m.fmut.Lock() m.setConnRequestLimitersPLocked(toCfg) - m.pmut.Unlock() + m.fmut.Unlock() } } @@ -3129,7 +3106,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { } m.fmut.Unlock() - m.pmut.RLock() + m.fmut.RLock() for _, id := range closeDevices { delete(clusterConfigDevices, id) if conns, ok := m.deviceConnIDs[id]; ok { @@ -3146,7 +3123,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool { } } } - m.pmut.RUnlock() + m.fmut.RUnlock() // Generating cluster-configs acquires fmut -> must happen outside of pmut. m.sendClusterConfig(clusterConfigDevices.AsSlice()) diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 3fbe60d57..5fa684521 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -902,13 +902,13 @@ func TestIssue5063(t *testing.T) { defer cleanupModel(m) defer cancel() - m.pmut.Lock() + m.fmut.Lock() for _, c := range m.connections { conn := c.(*fakeConnection) conn.CloseCalls(func(_ error) {}) defer m.Closed(c, errStopped) // to unblock deferred m.Stop() } - m.pmut.Unlock() + m.fmut.Unlock() wg := sync.WaitGroup{} @@ -2973,7 +2973,7 @@ func TestConnCloseOnRestart(t *testing.T) { ci := &protocolmocks.ConnectionInfo{} ci.ConnectionIDReturns(srand.String(16)) m.AddConnection(protocol.NewConnection(device1, br, nw, testutil.NoopCloser{}, m, ci, protocol.CompressionNever, nil, m.keyGen), protocol.Hello{}) - m.pmut.RLock() + m.fmut.RLock() if len(m.closed) != 1 { t.Fatalf("Expected just one conn (len(m.closed) == %v)", len(m.closed)) } @@ -2981,7 +2981,7 @@ func TestConnCloseOnRestart(t *testing.T) { for _, c := range m.closed { closed = c } - m.pmut.RUnlock() + m.fmut.RUnlock() waiter, err := w.RemoveDevice(device1) if err != nil { @@ -3074,12 +3074,12 @@ func TestDevicePause(t *testing.T) { sub := m.evLogger.Subscribe(events.DevicePaused) defer sub.Unsubscribe() - m.pmut.RLock() + m.fmut.RLock() var closed chan struct{} for _, c := range m.closed { closed = c } - m.pmut.RUnlock() + m.fmut.RUnlock() pauseDevice(t, m.cfg, device1, true)