lib/model: Retain index info for new folders/devs (ref #7100) (#7133)

This commit is contained in:
Simon Frei 2020-11-20 15:53:13 +01:00 committed by GitHub
parent 24af89c8e2
commit db1f20603a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 59 deletions

View File

@ -242,12 +242,6 @@ func (r *indexSenderRegistry) add(folder config.FolderConfiguration, fset *db.Fi
r.mut.Unlock()
}
func (r *indexSenderRegistry) addNew(folder config.FolderConfiguration, fset *db.FileSet) {
r.mut.Lock()
r.startLocked(folder.ID, fset, 0)
r.mut.Unlock()
}
func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) {
myIndexID := fset.IndexID(protocol.LocalDeviceID)
mySequence := fset.Sequence(protocol.LocalDeviceID)
@ -282,7 +276,7 @@ func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset
l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", r.deviceID, folder.Description(), startInfo.local.IndexID, myIndexID)
startSequence = 0
} else {
l.Debugf("Device %v folder %s is not delta index compatible", r.deviceID, folder.Description())
l.Debugf("Device %v folder %s has no index ID for us", r.deviceID, folder.Description())
}
// This is the other side's description of themselves. We
@ -296,6 +290,7 @@ func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset
// do not support delta indexes and we should clear any
// information we have from them before accepting their
// index, which will presumably be a full index.
l.Debugf("Device %v folder %s does not announce an index ID", r.deviceID, folder.Description())
fset.Drop(r.deviceID)
} else if startInfo.remote.IndexID != theirIndexID {
// The index ID we have on file is not what they're
@ -308,22 +303,18 @@ func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset
fset.SetIndexID(r.deviceID, startInfo.remote.IndexID)
}
r.startLocked(folder.ID, fset, startSequence)
}
func (r *indexSenderRegistry) startLocked(folderID string, fset *db.FileSet, startSequence int64) {
if is, ok := r.indexSenders[folderID]; ok {
if is, ok := r.indexSenders[folder.ID]; ok {
r.sup.RemoveAndWait(is.token, 0)
delete(r.indexSenders, folderID)
delete(r.indexSenders, folder.ID)
}
if _, ok := r.startInfos[folderID]; ok {
delete(r.startInfos, folderID)
if _, ok := r.startInfos[folder.ID]; ok {
delete(r.startInfos, folder.ID)
}
is := &indexSender{
conn: r.conn,
connClosed: r.closed,
folder: folderID,
folder: folder.ID,
fset: fset,
prevSequence: startSequence,
evLogger: r.evLogger,
@ -331,13 +322,13 @@ func (r *indexSenderRegistry) startLocked(folderID string, fset *db.FileSet, sta
resumeChan: make(chan *db.FileSet),
}
is.token = r.sup.Add(is)
r.indexSenders[folderID] = is
r.indexSenders[folder.ID] = is
}
// addPaused stores the given info to start an index sender once resume is called
// addPending stores the given info to start an index sender once resume is called
// for this folder.
// If an index sender is already running, it will be stopped.
func (r *indexSenderRegistry) addPaused(folder config.FolderConfiguration, startInfo *indexSenderStartInfo) {
func (r *indexSenderRegistry) addPending(folder config.FolderConfiguration, startInfo *indexSenderStartInfo) {
r.mut.Lock()
defer r.mut.Unlock()

View File

@ -254,13 +254,16 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
func (m *model) serve(ctx context.Context) error {
// Add and start folders
cacheIgnoredFiles := m.cfg.Options().CacheIgnoredFiles
clusterConfigDevices := make(deviceIDSet, len(m.cfg.Devices()))
for _, folderCfg := range m.cfg.Folders() {
if folderCfg.Paused {
folderCfg.CreateRoot()
continue
}
m.newFolder(folderCfg, cacheIgnoredFiles)
clusterConfigDevices.add(folderCfg.DeviceIDs())
}
m.resendClusterConfig(clusterConfigDevices.AsSlice())
m.cfg.Subscribe(m)
close(m.started)
@ -519,13 +522,9 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
// In case the folder was newly shared with us we already got a
// cluster config and wont necessarily get another soon - start
// sending indexes if connected.
isNew := !from.SharedWith(indexSenders.deviceID)
if isNew {
indexSenders.addNew(to, fset)
}
if to.Paused {
indexSenders.pause(to.ID)
} else if !isNew && (fsetNil || from.Paused) {
} else if !from.SharedWith(indexSenders.deviceID) || fsetNil || from.Paused {
indexSenders.resume(to, fset)
}
}
@ -554,20 +553,10 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool
// Cluster configs might be received and processed before reaching this
// point, i.e. before the folder is started. If that's the case, start
// index senders here.
localSequenceZero := fset.Sequence(protocol.LocalDeviceID) == 0
m.pmut.RLock()
for _, id := range cfg.DeviceIDs() {
if is, ok := m.indexSenders[id]; ok {
if localSequenceZero && fset.Sequence(id) == 0 {
// In case this folder was shared to us and
// newly added, add a new index sender.
is.addNew(cfg, fset)
} else {
// For existing folders we stored the index data from
// the cluster config, so resume based on that - if
// we didn't get a cluster config yet, it's a noop.
is.resume(cfg, fset)
}
is.resume(cfg, fset)
}
}
m.pmut.RUnlock()
@ -1175,6 +1164,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi
continue
}
m.cfg.AddOrUpdatePendingFolder(folder.ID, folder.Label, deviceID)
indexSenders.addPending(cfg, ccDeviceInfos[folder.ID])
changed = true
m.evLogger.Log(events.FolderRejected, map[string]string{
"folder": folder.ID,
@ -1192,7 +1182,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi
}
if cfg.Paused {
indexSenders.addPaused(cfg, ccDeviceInfos[folder.ID])
indexSenders.addPending(cfg, ccDeviceInfos[folder.ID])
continue
}
@ -1234,7 +1224,7 @@ func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.Devi
// Shouldn't happen because !cfg.Paused, but might happen
// if the folder is about to be unpaused, but not yet.
l.Debugln("ccH: no fset", folder.ID)
indexSenders.addPaused(cfg, ccDeviceInfos[folder.ID])
indexSenders.addPending(cfg, ccDeviceInfos[folder.ID])
continue
}
@ -2482,7 +2472,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
// Go through the folder configs and figure out if we need to restart or not.
// Tracks devices affected by any configuration change to resend ClusterConfig.
clusterConfigDevices := make(map[protocol.DeviceID]struct{}, len(from.Devices)+len(to.Devices))
clusterConfigDevices := make(deviceIDSet, len(from.Devices)+len(to.Devices))
fromFolders := mapFolders(from.Folders)
toFolders := mapFolders(to.Folders)
@ -2495,7 +2485,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
l.Infoln("Adding folder", cfg.Description())
m.newFolder(cfg, to.Options.CacheIgnoredFiles)
}
clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, cfg.DeviceIDs())
clusterConfigDevices.add(cfg.DeviceIDs())
}
}
@ -2504,7 +2494,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
if !ok {
// The folder was removed.
m.removeFolder(fromCfg)
clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, fromCfg.DeviceIDs())
clusterConfigDevices.add(fromCfg.DeviceIDs())
continue
}
@ -2516,8 +2506,8 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
// Check if anything differs that requires a restart.
if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) || from.Options.CacheIgnoredFiles != to.Options.CacheIgnoredFiles {
m.restartFolder(fromCfg, toCfg, to.Options.CacheIgnoredFiles)
clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, fromCfg.DeviceIDs())
clusterConfigDevices = addDeviceIDsToMap(clusterConfigDevices, toCfg.DeviceIDs())
clusterConfigDevices.add(fromCfg.DeviceIDs())
clusterConfigDevices.add(toCfg.DeviceIDs())
}
// Emit the folder pause/resume event
@ -2591,11 +2581,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
}
m.pmut.RUnlock()
// Generating cluster-configs acquires fmut -> must happen outside of pmut.
ids := make([]protocol.DeviceID, 0, len(clusterConfigDevices))
for id := range clusterConfigDevices {
ids = append(ids, id)
}
m.resendClusterConfig(ids)
m.resendClusterConfig(clusterConfigDevices.AsSlice())
m.globalRequestLimiter.setCapacity(1024 * to.Options.MaxConcurrentIncomingRequestKiB())
m.folderIOLimiter.setCapacity(to.Options.MaxFolderConcurrency())
@ -2801,13 +2787,22 @@ func sanitizePath(path string) string {
return strings.TrimSpace(b.String())
}
func addDeviceIDsToMap(m map[protocol.DeviceID]struct{}, s []protocol.DeviceID) map[protocol.DeviceID]struct{} {
for _, id := range s {
if _, ok := m[id]; !ok {
m[id] = struct{}{}
type deviceIDSet map[protocol.DeviceID]struct{}
func (s deviceIDSet) add(ids []protocol.DeviceID) {
for _, id := range ids {
if _, ok := s[id]; !ok {
s[id] = struct{}{}
}
}
return m
}
func (s deviceIDSet) AsSlice() []protocol.DeviceID {
ids := make([]protocol.DeviceID, 0, len(s))
for id := range s {
ids = append(ids, id)
}
return ids
}
func encryptionTokenPath(cfg config.FolderConfiguration) string {

View File

@ -1275,9 +1275,6 @@ func TestRequestIndexSenderPause(t *testing.T) {
}
func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) {
done := make(chan struct{})
defer close(done)
ldb := db.NewLowlevel(backend.OpenMemory())
w, fcfg := tmpDefaultWrapper()
tfs := fcfg.Filesystem()
@ -1294,11 +1291,14 @@ func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) {
m.evCancel()
<-m.stopped
// Add connection (sends cluster config) before starting the new model
// Add connection (sends incoming cluster config) before starting the new model
m = newModel(w, myID, "syncthing", "dev", ldb, nil)
defer cleanupModel(m)
fc := addFakeConn(m, device1)
indexChan := make(chan []protocol.FileInfo)
done := make(chan struct{})
defer close(done) // Must be the last thing to be deferred, thus first to run.
indexChan := make(chan []protocol.FileInfo, 1)
ccChan := make(chan protocol.ClusterConfig, 1)
fc.mut.Lock()
fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
select {
@ -1306,16 +1306,30 @@ func TestRequestIndexSenderClusterConfigBeforeStart(t *testing.T) {
case <-done:
}
}
fc.clusterConfigFn = func(cc protocol.ClusterConfig) {
select {
case ccChan <- cc:
case <-done:
}
}
fc.mut.Unlock()
m.ServeBackground()
<-m.started
timeout := time.After(5 * time.Second)
// Check that cluster-config is resent after adding folders when starting model
select {
case <-timeout:
t.Fatal("timed out before receiving cluster-config")
case <-ccChan:
}
// Check that an index is sent for the newly added item
must(t, tfs.Mkdir(dir2, 0777))
m.ScanFolders()
select {
case <-time.After(5 * time.Second):
case <-timeout:
t.Fatal("timed out before receiving index")
case <-indexChan:
}