// Copyright (C) 2014 The Syncthing Authors. // // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this file, // You can obtain one at http://mozilla.org/MPL/2.0/. package model import ( "bufio" "bytes" "crypto/tls" "encoding/json" "errors" "fmt" "io" "net" "os" "path/filepath" "reflect" "runtime" "sort" "strings" stdsync "sync" "time" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/connections" "github.com/syncthing/syncthing/lib/db" "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/ignore" "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/scanner" "github.com/syncthing/syncthing/lib/stats" "github.com/syncthing/syncthing/lib/symlinks" "github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/versioner" "github.com/thejerf/suture" ) // How many files to send in each Index/IndexUpdate message. const ( indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed) indexBatchSize = 1000 // Either way, don't include more files than this ) type service interface { BringToFront(string) DelayScan(d time.Duration) IndexUpdated() // Remote index was updated notification Jobs() ([]string, []string) // In progress, Queued Scan(subs []string) error Serve() Stop() getState() (folderState, time.Time, error) setState(state folderState) clearError() setError(err error) } type Availability struct { ID protocol.DeviceID `json:"id"` FromTemporary bool `json:"fromTemporary"` } type Model struct { *suture.Supervisor cfg *config.Wrapper db *db.Instance finder *db.BlockFinder progressEmitter *ProgressEmitter id protocol.DeviceID shortID protocol.ShortID cacheIgnoredFiles bool protectedFiles []string deviceName string clientName string clientVersion string folderCfgs map[string]config.FolderConfiguration // folder -> cfg folderFiles map[string]*db.FileSet // folder -> files folderDevices map[string][]protocol.DeviceID // folder -> deviceIDs deviceFolders map[protocol.DeviceID][]string // deviceID -> folders deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef folderIgnores map[string]*ignore.Matcher // folder -> matcher object folderRunners map[string]service // folder -> puller or scanner folderRunnerTokens map[string][]suture.ServiceToken // folder -> tokens for puller or scanner folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef fmut sync.RWMutex // protects the above conn map[protocol.DeviceID]connections.Connection closed map[protocol.DeviceID]chan struct{} helloMessages map[protocol.DeviceID]protocol.HelloResult devicePaused map[protocol.DeviceID]bool deviceDownloads map[protocol.DeviceID]*deviceDownloadState pmut sync.RWMutex // protects the above } type folderFactory func(*Model, config.FolderConfiguration, versioner.Versioner, *fs.MtimeFS) service var ( symlinkWarning = stdsync.Once{} folderFactories = make(map[config.FolderType]folderFactory, 0) ) // errors returned by the CheckFolderHealth method var ( errFolderPathEmpty = errors.New("folder path empty") errFolderPathMissing = errors.New("folder path missing") errFolderMarkerMissing = errors.New("folder marker missing") errHomeDiskNoSpace = errors.New("home disk has insufficient free space") errFolderNoSpace = errors.New("folder has insufficient free space") errUnsupportedSymlink = errors.New("symlink not supported") errInvalidFilename = errors.New("filename is invalid") errDeviceUnknown = errors.New("unknown device") errDevicePaused = errors.New("device is paused") errDeviceIgnored = errors.New("device is ignored") ) // NewModel creates and starts a new model. The model starts in read-only mode, // where it sends index information to connected peers and responds to requests // for file data without altering the local folder in any way. func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName, clientVersion string, ldb *db.Instance, protectedFiles []string) *Model { m := &Model{ Supervisor: suture.New("model", suture.Spec{ Log: func(line string) { l.Debugln(line) }, }), cfg: cfg, db: ldb, finder: db.NewBlockFinder(ldb), progressEmitter: NewProgressEmitter(cfg), id: id, shortID: id.Short(), cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles, protectedFiles: protectedFiles, deviceName: deviceName, clientName: clientName, clientVersion: clientVersion, folderCfgs: make(map[string]config.FolderConfiguration), folderFiles: make(map[string]*db.FileSet), folderDevices: make(map[string][]protocol.DeviceID), deviceFolders: make(map[protocol.DeviceID][]string), deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference), folderIgnores: make(map[string]*ignore.Matcher), folderRunners: make(map[string]service), folderRunnerTokens: make(map[string][]suture.ServiceToken), folderStatRefs: make(map[string]*stats.FolderStatisticsReference), conn: make(map[protocol.DeviceID]connections.Connection), closed: make(map[protocol.DeviceID]chan struct{}), helloMessages: make(map[protocol.DeviceID]protocol.HelloResult), devicePaused: make(map[protocol.DeviceID]bool), deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState), fmut: sync.NewRWMutex(), pmut: sync.NewRWMutex(), } if cfg.Options().ProgressUpdateIntervalS > -1 { go m.progressEmitter.Serve() } cfg.Subscribe(m) return m } // StartDeadlockDetector starts a deadlock detector on the models locks which // causes panics in case the locks cannot be acquired in the given timeout // period. func (m *Model) StartDeadlockDetector(timeout time.Duration) { l.Infof("Starting deadlock detector with %v timeout", timeout) deadlockDetect(m.fmut, timeout, "fmut") deadlockDetect(m.pmut, timeout, "pmut") } // StartFolder constructs the folder service and starts it. func (m *Model) StartFolder(folder string) { m.fmut.Lock() folderType := m.startFolderLocked(folder) m.fmut.Unlock() l.Infoln("Ready to synchronize", folder, fmt.Sprintf("(%s)", folderType)) } func (m *Model) startFolderLocked(folder string) config.FolderType { cfg, ok := m.folderCfgs[folder] if !ok { panic("cannot start nonexistent folder " + folder) } _, ok = m.folderRunners[folder] if ok { panic("cannot start already running folder " + folder) } folderFactory, ok := folderFactories[cfg.Type] if !ok { panic(fmt.Sprintf("unknown folder type 0x%x", cfg.Type)) } fs := m.folderFiles[folder] // Find any devices for which we hold the index in the db, but the folder // is not shared, and drop it. expected := mapDevices(cfg.DeviceIDs()) for _, available := range fs.ListDevices() { if _, ok := expected[available]; !ok { l.Debugln("dropping", folder, "state for", available) fs.Replace(available, nil) } } v, ok := fs.Sequence(protocol.LocalDeviceID), true indexHasFiles := ok && v > 0 if !indexHasFiles { // It's a blank folder, so this may the first time we're looking at // it. Attempt to create and tag with our marker as appropriate. We // don't really do anything with errors at this point except warn - // if these things don't work, we still want to start the folder and // it'll show up as errored later. if _, err := os.Stat(cfg.Path()); os.IsNotExist(err) { if err := osutil.MkdirAll(cfg.Path(), 0700); err != nil { l.Warnln("Creating folder:", err) } } if err := cfg.CreateMarker(); err != nil { l.Warnln("Creating folder marker:", err) } } var ver versioner.Versioner if len(cfg.Versioning.Type) > 0 { versionerFactory, ok := versioner.Factories[cfg.Versioning.Type] if !ok { l.Fatalf("Requested versioning type %q that does not exist", cfg.Versioning.Type) } ver = versionerFactory(folder, cfg.Path(), cfg.Versioning.Params) if service, ok := ver.(suture.Service); ok { // The versioner implements the suture.Service interface, so // expects to be run in the background in addition to being called // when files are going to be archived. token := m.Add(service) m.folderRunnerTokens[folder] = append(m.folderRunnerTokens[folder], token) } } p := folderFactory(m, cfg, ver, fs.MtimeFS()) m.folderRunners[folder] = p m.warnAboutOverwritingProtectedFiles(folder) token := m.Add(p) m.folderRunnerTokens[folder] = append(m.folderRunnerTokens[folder], token) return cfg.Type } func (m *Model) warnAboutOverwritingProtectedFiles(folder string) { if m.folderCfgs[folder].Type == config.FolderTypeReadOnly { return } folderLocation := m.folderCfgs[folder].Path() ignores := m.folderIgnores[folder] var filesAtRisk []string for _, protectedFilePath := range m.protectedFiles { // check if file is synced in this folder if !strings.HasPrefix(protectedFilePath, folderLocation) { continue } // check if file is ignored if ignores.Match(protectedFilePath).IsIgnored() { continue } filesAtRisk = append(filesAtRisk, protectedFilePath) } if len(filesAtRisk) > 0 { l.Warnln("Some protected files may be overwritten and cause issues. See https://docs.syncthing.net/users/config.html#syncing-configuration-files for more information. The at risk files are:", strings.Join(filesAtRisk, ", ")) } } func (m *Model) AddFolder(cfg config.FolderConfiguration) { if len(cfg.ID) == 0 { panic("cannot add empty folder id") } m.fmut.Lock() m.addFolderLocked(cfg) m.fmut.Unlock() } func (m *Model) addFolderLocked(cfg config.FolderConfiguration) { m.folderCfgs[cfg.ID] = cfg m.folderFiles[cfg.ID] = db.NewFileSet(cfg.ID, m.db) m.folderDevices[cfg.ID] = make([]protocol.DeviceID, len(cfg.Devices)) for i, device := range cfg.Devices { m.folderDevices[cfg.ID][i] = device.DeviceID m.deviceFolders[device.DeviceID] = append(m.deviceFolders[device.DeviceID], cfg.ID) } ignores := ignore.New(m.cacheIgnoredFiles) if err := ignores.Load(filepath.Join(cfg.Path(), ".stignore")); err != nil && !os.IsNotExist(err) { l.Warnln("Loading ignores:", err) } m.folderIgnores[cfg.ID] = ignores } func (m *Model) RemoveFolder(folder string) { m.fmut.Lock() m.pmut.Lock() m.tearDownFolderLocked(folder) // Remove it from the database db.DropFolder(m.db, folder) m.pmut.Unlock() m.fmut.Unlock() } func (m *Model) tearDownFolderLocked(folder string) { // Stop the services running for this folder for _, id := range m.folderRunnerTokens[folder] { m.Remove(id) } // Close connections to affected devices for _, dev := range m.folderDevices[folder] { if conn, ok := m.conn[dev]; ok { closeRawConn(conn) } } // Clean up our config maps delete(m.folderCfgs, folder) delete(m.folderFiles, folder) delete(m.folderDevices, folder) delete(m.folderIgnores, folder) delete(m.folderRunners, folder) delete(m.folderRunnerTokens, folder) delete(m.folderStatRefs, folder) for dev, folders := range m.deviceFolders { m.deviceFolders[dev] = stringSliceWithout(folders, folder) } } func (m *Model) RestartFolder(cfg config.FolderConfiguration) { if len(cfg.ID) == 0 { panic("cannot add empty folder id") } m.fmut.Lock() m.pmut.Lock() m.tearDownFolderLocked(cfg.ID) m.addFolderLocked(cfg) folderType := m.startFolderLocked(cfg.ID) m.pmut.Unlock() m.fmut.Unlock() l.Infoln("Restarted folder", cfg.ID, fmt.Sprintf("(%s)", folderType)) } type ConnectionInfo struct { protocol.Statistics Connected bool Paused bool Address string ClientVersion string Type string } func (info ConnectionInfo) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]interface{}{ "at": info.At, "inBytesTotal": info.InBytesTotal, "outBytesTotal": info.OutBytesTotal, "connected": info.Connected, "paused": info.Paused, "address": info.Address, "clientVersion": info.ClientVersion, "type": info.Type, }) } // ConnectionStats returns a map with connection statistics for each device. func (m *Model) ConnectionStats() map[string]interface{} { m.fmut.RLock() m.pmut.RLock() res := make(map[string]interface{}) devs := m.cfg.Devices() conns := make(map[string]ConnectionInfo, len(devs)) for device := range devs { hello := m.helloMessages[device] versionString := hello.ClientVersion if hello.ClientName != "syncthing" { versionString = hello.ClientName + " " + hello.ClientVersion } ci := ConnectionInfo{ ClientVersion: strings.TrimSpace(versionString), Paused: m.devicePaused[device], } if conn, ok := m.conn[device]; ok { ci.Type = conn.Type ci.Connected = ok ci.Statistics = conn.Statistics() if addr := conn.RemoteAddr(); addr != nil { ci.Address = addr.String() } } conns[device.String()] = ci } res["connections"] = conns m.pmut.RUnlock() m.fmut.RUnlock() in, out := protocol.TotalInOut() res["total"] = ConnectionInfo{ Statistics: protocol.Statistics{ At: time.Now(), InBytesTotal: in, OutBytesTotal: out, }, } return res } // DeviceStatistics returns statistics about each device func (m *Model) DeviceStatistics() map[string]stats.DeviceStatistics { var res = make(map[string]stats.DeviceStatistics) for id := range m.cfg.Devices() { res[id.String()] = m.deviceStatRef(id).GetStatistics() } return res } // FolderStatistics returns statistics about each folder func (m *Model) FolderStatistics() map[string]stats.FolderStatistics { var res = make(map[string]stats.FolderStatistics) for id := range m.cfg.Folders() { res[id] = m.folderStatRef(id).GetStatistics() } return res } type FolderCompletion struct { CompletionPct float64 NeedBytes int64 GlobalBytes int64 NeedDeletes int64 } // Completion returns the completion status, in percent, for the given device // and folder. func (m *Model) Completion(device protocol.DeviceID, folder string) FolderCompletion { m.fmut.RLock() rf, ok := m.folderFiles[folder] m.fmut.RUnlock() if !ok { return FolderCompletion{} // Folder doesn't exist, so we hardly have any of it } tot := rf.GlobalSize().Bytes if tot == 0 { // Folder is empty, so we have all of it return FolderCompletion{ CompletionPct: 100, } } m.pmut.RLock() counts := m.deviceDownloads[device].GetBlockCounts(folder) m.pmut.RUnlock() var need, fileNeed, downloaded, deletes int64 rf.WithNeedTruncated(device, func(f db.FileIntf) bool { ft := f.(db.FileInfoTruncated) // If the file is deleted, we account it only in the deleted column. if ft.Deleted { deletes++ return true } // This might might be more than it really is, because some blocks can be of a smaller size. downloaded = int64(counts[ft.Name] * protocol.BlockSize) fileNeed = ft.FileSize() - downloaded if fileNeed < 0 { fileNeed = 0 } need += fileNeed return true }) needRatio := float64(need) / float64(tot) completionPct := 100 * (1 - needRatio) // If the completion is 100% but there are deletes we need to handle, // drop it down a notch. Hack for consumers that look only at the // percentage (our own GUI does the same calculation as here on it's own // and needs the same fixup). if need == 0 && deletes > 0 { completionPct = 95 // chosen by fair dice roll } l.Debugf("%v Completion(%s, %q): %f (%d / %d = %f)", m, device, folder, completionPct, need, tot, needRatio) return FolderCompletion{ CompletionPct: completionPct, NeedBytes: need, GlobalBytes: tot, NeedDeletes: deletes, } } func addSizeOfFile(s *db.Counts, f db.FileIntf) { switch { case f.IsDeleted(): s.Deleted++ case f.IsDirectory(): s.Directories++ case f.IsSymlink(): s.Symlinks++ default: s.Files++ } s.Bytes += f.FileSize() return } // GlobalSize returns the number of files, deleted files and total bytes for all // files in the global model. func (m *Model) GlobalSize(folder string) db.Counts { m.fmut.RLock() defer m.fmut.RUnlock() if rf, ok := m.folderFiles[folder]; ok { return rf.GlobalSize() } return db.Counts{} } // LocalSize returns the number of files, deleted files and total bytes for all // files in the local folder. func (m *Model) LocalSize(folder string) db.Counts { m.fmut.RLock() defer m.fmut.RUnlock() if rf, ok := m.folderFiles[folder]; ok { return rf.LocalSize() } return db.Counts{} } // NeedSize returns the number and total size of currently needed files. func (m *Model) NeedSize(folder string) db.Counts { m.fmut.RLock() defer m.fmut.RUnlock() var result db.Counts if rf, ok := m.folderFiles[folder]; ok { ignores := m.folderIgnores[folder] cfg := m.folderCfgs[folder] rf.WithNeedTruncated(protocol.LocalDeviceID, func(f db.FileIntf) bool { if shouldIgnore(f, ignores, cfg.IgnoreDelete) { return true } addSizeOfFile(&result, f) return true }) } result.Bytes -= m.progressEmitter.BytesCompleted(folder) l.Debugf("%v NeedSize(%q): %v", m, folder, result) return result } // NeedFolderFiles returns paginated list of currently needed files in // progress, queued, and to be queued on next puller iteration, as well as the // total number of files currently needed. func (m *Model) NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfoTruncated, []db.FileInfoTruncated, []db.FileInfoTruncated, int) { m.fmut.RLock() defer m.fmut.RUnlock() total := 0 rf, ok := m.folderFiles[folder] if !ok { return nil, nil, nil, 0 } var progress, queued, rest []db.FileInfoTruncated var seen map[string]struct{} skip := (page - 1) * perpage get := perpage runner, ok := m.folderRunners[folder] if ok { allProgressNames, allQueuedNames := runner.Jobs() var progressNames, queuedNames []string progressNames, skip, get = getChunk(allProgressNames, skip, get) queuedNames, skip, get = getChunk(allQueuedNames, skip, get) progress = make([]db.FileInfoTruncated, len(progressNames)) queued = make([]db.FileInfoTruncated, len(queuedNames)) seen = make(map[string]struct{}, len(progressNames)+len(queuedNames)) for i, name := range progressNames { if f, ok := rf.GetGlobalTruncated(name); ok { progress[i] = f seen[name] = struct{}{} } } for i, name := range queuedNames { if f, ok := rf.GetGlobalTruncated(name); ok { queued[i] = f seen[name] = struct{}{} } } } rest = make([]db.FileInfoTruncated, 0, perpage) ignores := m.folderIgnores[folder] cfg := m.folderCfgs[folder] rf.WithNeedTruncated(protocol.LocalDeviceID, func(f db.FileIntf) bool { if shouldIgnore(f, ignores, cfg.IgnoreDelete) { return true } total++ if skip > 0 { skip-- return true } if get > 0 { ft := f.(db.FileInfoTruncated) if _, ok := seen[ft.Name]; !ok { rest = append(rest, ft) get-- } } return true }) return progress, queued, rest, total } // Index is called when a new device is connected and we receive their full index. // Implements the protocol.Model interface. func (m *Model) Index(deviceID protocol.DeviceID, folder string, fs []protocol.FileInfo) { l.Debugf("IDX(in): %s %q: %d files", deviceID, folder, len(fs)) if !m.folderSharedWith(folder, deviceID) { l.Debugf("Unexpected folder ID %q sent from device %q; ensure that the folder exists and that this device is selected under \"Share With\" in the folder configuration.", folder, deviceID) return } m.fmut.RLock() files, ok := m.folderFiles[folder] runner := m.folderRunners[folder] m.fmut.RUnlock() if runner != nil { // Runner may legitimately not be set if this is the "cleanup" Index // message at startup. defer runner.IndexUpdated() } if !ok { l.Fatalf("Index for nonexistent folder %q", folder) } m.pmut.RLock() m.deviceDownloads[deviceID].Update(folder, makeForgetUpdate(fs)) m.pmut.RUnlock() files.Replace(deviceID, fs) events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{ "device": deviceID.String(), "folder": folder, "items": len(fs), "version": files.Sequence(deviceID), }) } // IndexUpdate is called for incremental updates to connected devices' indexes. // Implements the protocol.Model interface. func (m *Model) IndexUpdate(deviceID protocol.DeviceID, folder string, fs []protocol.FileInfo) { l.Debugf("%v IDXUP(in): %s / %q: %d files", m, deviceID, folder, len(fs)) if !m.folderSharedWith(folder, deviceID) { l.Debugf("Update for unexpected folder ID %q sent from device %q; ensure that the folder exists and that this device is selected under \"Share With\" in the folder configuration.", folder, deviceID) return } m.fmut.RLock() files := m.folderFiles[folder] runner, ok := m.folderRunners[folder] m.fmut.RUnlock() if !ok { l.Fatalf("IndexUpdate for nonexistent folder %q", folder) } m.pmut.RLock() m.deviceDownloads[deviceID].Update(folder, makeForgetUpdate(fs)) m.pmut.RUnlock() files.Update(deviceID, fs) events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{ "device": deviceID.String(), "folder": folder, "items": len(fs), "version": files.Sequence(deviceID), }) runner.IndexUpdated() } func (m *Model) folderSharedWith(folder string, deviceID protocol.DeviceID) bool { m.fmut.RLock() defer m.fmut.RUnlock() return m.folderSharedWithLocked(folder, deviceID) } func (m *Model) folderSharedWithLocked(folder string, deviceID protocol.DeviceID) bool { for _, nfolder := range m.deviceFolders[deviceID] { if nfolder == folder { return true } } return false } func (m *Model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterConfig) { // Check the peer device's announced folders against our own. Emits events // for folders that we don't expect (unknown or not shared). // Also, collect a list of folders we do share, and if he's interested in // temporary indexes, subscribe the connection. tempIndexFolders := make([]string, 0, len(cm.Folders)) m.pmut.RLock() conn, ok := m.conn[deviceID] m.pmut.RUnlock() if !ok { panic("bug: ClusterConfig called on closed or nonexistent connection") } dbLocation := filepath.Dir(m.db.Location()) m.fmut.Lock() for _, folder := range cm.Folders { if !m.folderSharedWithLocked(folder.ID, deviceID) { events.Default.Log(events.FolderRejected, map[string]string{ "folder": folder.ID, "folderLabel": folder.Label, "device": deviceID.String(), }) l.Infof("Unexpected folder ID %q sent from device %q; ensure that the folder exists and that this device is selected under \"Share With\" in the folder configuration.", folder.ID, deviceID) continue } if !folder.DisableTempIndexes { tempIndexFolders = append(tempIndexFolders, folder.ID) } fs := m.folderFiles[folder.ID] myIndexID := fs.IndexID(protocol.LocalDeviceID) mySequence := fs.Sequence(protocol.LocalDeviceID) var startSequence int64 for _, dev := range folder.Devices { if bytes.Equal(dev.ID, m.id[:]) { // This is the other side's description of what it knows // about us. Lets check to see if we can start sending index // updates directly or need to send the index from start... if dev.IndexID == myIndexID { // They say they've seen our index ID before, so we can // send a delta update only. if dev.MaxSequence > mySequence { // Safety check. They claim to have more or newer // index data than we have - either we have lost // index data, or reset the index without resetting // the IndexID, or something else weird has // happened. We send a full index to reset the // situation. l.Infof("Device %v folder %q is delta index compatible, but seems out of sync with reality", deviceID, folder.ID) startSequence = 0 continue } l.Debugf("Device %v folder %q is delta index compatible (mlv=%d)", deviceID, folder.ID, dev.MaxSequence) startSequence = dev.MaxSequence } else if dev.IndexID != 0 { // They say they've seen an index ID from us, but it's // not the right one. Either they are confused or we // must have reset our database since last talking to // them. We'll start with a full index transfer. l.Infof("Device %v folder %q has mismatching index ID for us (%v != %v)", deviceID, folder.ID, dev.IndexID, myIndexID) startSequence = 0 } } else if bytes.Equal(dev.ID, deviceID[:]) && dev.IndexID != 0 { // This is the other side's description of themselves. We // check to see that it matches the IndexID we have on file, // otherwise we drop our old index data and expect to get a // completely new set. theirIndexID := fs.IndexID(deviceID) if dev.IndexID == 0 { // They're not announcing an index ID. This means they // do not support delta indexes and we should clear any // information we have from them before accepting their // index, which will presumably be a full index. fs.Replace(deviceID, nil) } else if dev.IndexID != theirIndexID { // The index ID we have on file is not what they're // announcing. They must have reset their database and // will probably send us a full index. We drop any // information we have and remember this new index ID // instead. l.Infof("Device %v folder %q has a new index ID (%v)", deviceID, folder.ID, dev.IndexID) fs.Replace(deviceID, nil) fs.SetIndexID(deviceID, dev.IndexID) } else { // They're sending a recognized index ID and will most // likely use delta indexes. We might already have files // that we need to pull so let the folder runner know // that it should recheck the index data. if runner := m.folderRunners[folder.ID]; runner != nil { defer runner.IndexUpdated() } } } } go sendIndexes(conn, folder.ID, fs, m.folderIgnores[folder.ID], startSequence, dbLocation) } m.fmut.Unlock() // This breaks if we send multiple CM messages during the same connection. if len(tempIndexFolders) > 0 { m.pmut.RLock() conn, ok := m.conn[deviceID] m.pmut.RUnlock() // In case we've got ClusterConfig, and the connection disappeared // from infront of our nose. if ok { m.progressEmitter.temporaryIndexSubscribe(conn, tempIndexFolders) } } var changed bool if m.cfg.Devices()[deviceID].Introducer { // This device is an introducer. Go through the announced lists of folders // and devices and add what we are missing. for _, folder := range cm.Folders { if _, ok := m.folderDevices[folder.ID]; !ok { continue } nextDevice: for _, device := range folder.Devices { var id protocol.DeviceID copy(id[:], device.ID) if _, ok := m.cfg.Devices()[id]; !ok { // The device is currently unknown. Add it to the config. addresses := []string{"dynamic"} for _, addr := range device.Addresses { if addr != "dynamic" { addresses = append(addresses, addr) } } l.Infof("Adding device %v to config (vouched for by introducer %v)", id, deviceID) newDeviceCfg := config.DeviceConfiguration{ DeviceID: id, Name: device.Name, Compression: m.cfg.Devices()[deviceID].Compression, Addresses: addresses, CertName: device.CertName, } // The introducers' introducers are also our introducers. if device.Introducer { l.Infof("Device %v is now also an introducer", id) newDeviceCfg.Introducer = true } m.cfg.SetDevice(newDeviceCfg) changed = true } for _, er := range m.deviceFolders[id] { if er == folder.ID { // We already share the folder with this device, so // nothing to do. continue nextDevice } } // We don't yet share this folder with this device. Add the device // to sharing list of the folder. l.Infof("Adding device %v to share %q (vouched for by introducer %v)", id, folder.ID, deviceID) m.deviceFolders[id] = append(m.deviceFolders[id], folder.ID) m.folderDevices[folder.ID] = append(m.folderDevices[folder.ID], id) folderCfg := m.cfg.Folders()[folder.ID] folderCfg.Devices = append(folderCfg.Devices, config.FolderDeviceConfiguration{ DeviceID: id, }) m.cfg.SetFolder(folderCfg) changed = true } } } if changed { m.cfg.Save() } } // Closed is called when a connection has been closed func (m *Model) Closed(conn protocol.Connection, err error) { device := conn.ID() m.pmut.Lock() conn, ok := m.conn[device] if ok { m.progressEmitter.temporaryIndexUnsubscribe(conn) } delete(m.conn, device) delete(m.helloMessages, device) delete(m.deviceDownloads, device) closed := m.closed[device] delete(m.closed, device) m.pmut.Unlock() l.Infof("Connection to %s closed: %v", device, err) events.Default.Log(events.DeviceDisconnected, map[string]string{ "id": device.String(), "error": err.Error(), }) close(closed) } // close will close the underlying connection for a given device func (m *Model) close(device protocol.DeviceID) { m.pmut.Lock() conn, ok := m.conn[device] m.pmut.Unlock() if !ok { // There is no connection to close return } closeRawConn(conn) } // Request returns the specified data segment by reading it from local disk. // Implements the protocol.Model interface. func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset int64, hash []byte, fromTemporary bool, buf []byte) error { if offset < 0 { return protocol.ErrInvalid } if !m.folderSharedWith(folder, deviceID) { l.Warnf("Request from %s for file %s in unshared folder %q", deviceID, name, folder) return protocol.ErrNoSuchFile } if deviceID != protocol.LocalDeviceID { l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d t=%v", m, deviceID, folder, name, offset, len(buf), fromTemporary) } m.fmut.RLock() folderCfg := m.folderCfgs[folder] folderPath := folderCfg.Path() folderIgnores := m.folderIgnores[folder] m.fmut.RUnlock() // filepath.Join() returns a filepath.Clean()ed path, which (quoting the // docs for clarity here): // // Clean returns the shortest path name equivalent to path by purely lexical // processing. It applies the following rules iteratively until no further // processing can be done: // // 1. Replace multiple Separator elements with a single one. // 2. Eliminate each . path name element (the current directory). // 3. Eliminate each inner .. path name element (the parent directory) // along with the non-.. element that precedes it. // 4. Eliminate .. elements that begin a rooted path: // that is, replace "/.." by "/" at the beginning of a path, // assuming Separator is '/'. fn := filepath.Join(folderPath, name) if !strings.HasPrefix(fn, folderPath) { // Request tries to escape! l.Debugf("%v Invalid REQ(in) tries to escape: %s: %q / %q o=%d s=%d", m, deviceID, folder, name, offset, len(buf)) return protocol.ErrInvalid } if folderIgnores != nil { // "rn" becomes the relative name of the file within the folder. This is // different than the original "name" parameter in that it's been // cleaned from any possible funny business. if rn, err := filepath.Rel(folderPath, fn); err != nil { return err } else if folderIgnores.Match(rn).IsIgnored() { l.Debugf("%v REQ(in) for ignored file: %s: %q / %q o=%d s=%d", m, deviceID, folder, name, offset, len(buf)) return protocol.ErrNoSuchFile } } if info, err := osutil.Lstat(fn); err == nil && info.Mode()&os.ModeSymlink != 0 { target, _, err := symlinks.Read(fn) if err != nil { l.Debugln("symlinks.Read:", err) if os.IsNotExist(err) { return protocol.ErrNoSuchFile } return protocol.ErrGeneric } if _, err := strings.NewReader(target).ReadAt(buf, offset); err != nil { l.Debugln("symlink.Reader.ReadAt", err) return protocol.ErrGeneric } return nil } // Only check temp files if the flag is set, and if we are set to advertise // the temp indexes. if fromTemporary && !folderCfg.DisableTempIndexes { tempFn := filepath.Join(folderPath, defTempNamer.TempName(name)) if err := readOffsetIntoBuf(tempFn, offset, buf); err == nil { return nil } // Fall through to reading from a non-temp file, just incase the temp // file has finished downloading. } err := readOffsetIntoBuf(fn, offset, buf) if os.IsNotExist(err) { return protocol.ErrNoSuchFile } else if err != nil { return protocol.ErrGeneric } return nil } func (m *Model) CurrentFolderFile(folder string, file string) (protocol.FileInfo, bool) { m.fmut.RLock() fs, ok := m.folderFiles[folder] m.fmut.RUnlock() if !ok { return protocol.FileInfo{}, false } f, ok := fs.Get(protocol.LocalDeviceID, file) return f, ok } func (m *Model) CurrentGlobalFile(folder string, file string) (protocol.FileInfo, bool) { m.fmut.RLock() fs, ok := m.folderFiles[folder] m.fmut.RUnlock() if !ok { return protocol.FileInfo{}, false } f, ok := fs.GetGlobal(file) return f, ok } type cFiler struct { m *Model r string } // Implements scanner.CurrentFiler func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) { return cf.m.CurrentFolderFile(cf.r, file) } // ConnectedTo returns true if we are connected to the named device. func (m *Model) ConnectedTo(deviceID protocol.DeviceID) bool { m.pmut.RLock() _, ok := m.conn[deviceID] m.pmut.RUnlock() if ok { m.deviceWasSeen(deviceID) } return ok } func (m *Model) GetIgnores(folder string) ([]string, []string, error) { var lines []string m.fmut.RLock() cfg, ok := m.folderCfgs[folder] m.fmut.RUnlock() if !ok { return lines, nil, fmt.Errorf("Folder %s does not exist", folder) } if !cfg.HasMarker() { return lines, nil, fmt.Errorf("Folder %s stopped", folder) } fd, err := os.Open(filepath.Join(cfg.Path(), ".stignore")) if err != nil { if os.IsNotExist(err) { return lines, nil, nil } l.Warnln("Loading .stignore:", err) return lines, nil, err } defer fd.Close() scanner := bufio.NewScanner(fd) for scanner.Scan() { lines = append(lines, strings.TrimSpace(scanner.Text())) } m.fmut.RLock() patterns := m.folderIgnores[folder].Patterns() m.fmut.RUnlock() return lines, patterns, nil } func (m *Model) SetIgnores(folder string, content []string) error { cfg, ok := m.folderCfgs[folder] if !ok { return fmt.Errorf("Folder %s does not exist", folder) } path := filepath.Join(cfg.Path(), ".stignore") fd, err := osutil.CreateAtomic(path, 0644) if err != nil { l.Warnln("Saving .stignore:", err) return err } for _, line := range content { fmt.Fprintln(fd, line) } if err := fd.Close(); err != nil { l.Warnln("Saving .stignore:", err) return err } osutil.HideFile(path) return m.ScanFolder(folder) } // OnHello is called when an device connects to us. // This allows us to extract some information from the Hello message // and add it to a list of known devices ahead of any checks. func (m *Model) OnHello(remoteID protocol.DeviceID, addr net.Addr, hello protocol.HelloResult) error { if m.IsPaused(remoteID) { return errDevicePaused } if m.cfg.IgnoredDevice(remoteID) { return errDeviceIgnored } if _, ok := m.cfg.Device(remoteID); ok { // The device exists return nil } events.Default.Log(events.DeviceRejected, map[string]string{ "name": hello.DeviceName, "device": remoteID.String(), "address": addr.String(), }) return errDeviceUnknown } // GetHello is called when we are about to connect to some remote device. func (m *Model) GetHello(protocol.DeviceID) protocol.HelloIntf { return &protocol.Hello{ DeviceName: m.deviceName, ClientName: m.clientName, ClientVersion: m.clientVersion, } } // AddConnection adds a new peer connection to the model. An initial index will // be sent to the connected peer, thereafter index updates whenever the local // folder changes. func (m *Model) AddConnection(conn connections.Connection, hello protocol.HelloResult) { deviceID := conn.ID() m.pmut.Lock() if oldConn, ok := m.conn[deviceID]; ok { l.Infoln("Replacing old connection", oldConn, "with", conn, "for", deviceID) // There is an existing connection to this device that we are // replacing. We must close the existing connection and wait for the // close to complete before adding the new connection. We do the // actual close without holding pmut as the connection will call // back into Closed() for the cleanup. closed := m.closed[deviceID] m.pmut.Unlock() closeRawConn(oldConn) <-closed m.pmut.Lock() } m.conn[deviceID] = conn m.closed[deviceID] = make(chan struct{}) m.deviceDownloads[deviceID] = newDeviceDownloadState() m.helloMessages[deviceID] = hello event := map[string]string{ "id": deviceID.String(), "deviceName": hello.DeviceName, "clientName": hello.ClientName, "clientVersion": hello.ClientVersion, "type": conn.Type, } addr := conn.RemoteAddr() if addr != nil { event["addr"] = addr.String() } events.Default.Log(events.DeviceConnected, event) l.Infof(`Device %s client is "%s %s" named "%s"`, deviceID, hello.ClientName, hello.ClientVersion, hello.DeviceName) conn.Start() cm := m.generateClusterConfig(deviceID) conn.ClusterConfig(cm) m.pmut.Unlock() device, ok := m.cfg.Devices()[deviceID] if ok && (device.Name == "" || m.cfg.Options().OverwriteRemoteDevNames) { device.Name = hello.DeviceName m.cfg.SetDevice(device) m.cfg.Save() } m.deviceWasSeen(deviceID) } func (m *Model) PauseDevice(device protocol.DeviceID) { m.pmut.Lock() m.devicePaused[device] = true conn, ok := m.conn[device] m.pmut.Unlock() if ok { closeRawConn(conn) } events.Default.Log(events.DevicePaused, map[string]string{"device": device.String()}) } func (m *Model) DownloadProgress(device protocol.DeviceID, folder string, updates []protocol.FileDownloadProgressUpdate) { if !m.folderSharedWith(folder, device) { return } m.fmut.RLock() cfg, ok := m.folderCfgs[folder] m.fmut.RUnlock() if !ok || cfg.Type == config.FolderTypeReadOnly || cfg.DisableTempIndexes { return } m.pmut.RLock() m.deviceDownloads[device].Update(folder, updates) state := m.deviceDownloads[device].GetBlockCounts(folder) m.pmut.RUnlock() events.Default.Log(events.RemoteDownloadProgress, map[string]interface{}{ "device": device.String(), "folder": folder, "state": state, }) } func (m *Model) ResumeDevice(device protocol.DeviceID) { m.pmut.Lock() m.devicePaused[device] = false m.pmut.Unlock() events.Default.Log(events.DeviceResumed, map[string]string{"device": device.String()}) } func (m *Model) IsPaused(device protocol.DeviceID) bool { m.pmut.Lock() paused := m.devicePaused[device] m.pmut.Unlock() return paused } func (m *Model) deviceStatRef(deviceID protocol.DeviceID) *stats.DeviceStatisticsReference { m.fmut.Lock() defer m.fmut.Unlock() if sr, ok := m.deviceStatRefs[deviceID]; ok { return sr } sr := stats.NewDeviceStatisticsReference(m.db, deviceID.String()) m.deviceStatRefs[deviceID] = sr return sr } func (m *Model) deviceWasSeen(deviceID protocol.DeviceID) { m.deviceStatRef(deviceID).WasSeen() } func (m *Model) folderStatRef(folder string) *stats.FolderStatisticsReference { m.fmut.Lock() defer m.fmut.Unlock() sr, ok := m.folderStatRefs[folder] if !ok { sr = stats.NewFolderStatisticsReference(m.db, folder) m.folderStatRefs[folder] = sr } return sr } func (m *Model) receivedFile(folder string, file protocol.FileInfo) { m.folderStatRef(folder).ReceivedFile(file.Name, file.IsDeleted()) } func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, ignores *ignore.Matcher, startSequence int64, dbLocation string) { deviceID := conn.ID() name := conn.Name() var err error l.Debugf("sendIndexes for %s-%s/%q starting (slv=%d)", deviceID, name, folder, startSequence) defer l.Debugf("sendIndexes for %s-%s/%q exiting: %v", deviceID, name, folder, err) minSequence, err := sendIndexTo(startSequence, conn, folder, fs, ignores, dbLocation) // Subscribe to LocalIndexUpdated (we have new information to send) and // DeviceDisconnected (it might be us who disconnected, so we should // exit). sub := events.Default.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected) defer events.Default.Unsubscribe(sub) for err == nil { if conn.Closed() { // Our work is done. return } // While we have sent a sequence at least equal to the one // currently in the database, wait for the local index to update. The // local index may update for other folders than the one we are // sending for. if fs.Sequence(protocol.LocalDeviceID) <= minSequence { sub.Poll(time.Minute) continue } minSequence, err = sendIndexTo(minSequence, conn, folder, fs, ignores, dbLocation) // Wait a short amount of time before entering the next loop. If there // are continuous changes happening to the local index, this gives us // time to batch them up a little. time.Sleep(250 * time.Millisecond) } } func sendIndexTo(minSequence int64, conn protocol.Connection, folder string, fs *db.FileSet, ignores *ignore.Matcher, dbLocation string) (int64, error) { deviceID := conn.ID() name := conn.Name() batch := make([]protocol.FileInfo, 0, indexBatchSize) currentBatchSize := 0 initial := minSequence == 0 maxSequence := minSequence var err error sorter := NewIndexSorter(dbLocation) defer sorter.Close() fs.WithHave(protocol.LocalDeviceID, func(fi db.FileIntf) bool { f := fi.(protocol.FileInfo) if f.Sequence <= minSequence { return true } if f.Sequence > maxSequence { maxSequence = f.Sequence } sorter.Append(f) return true }) sorter.Sorted(func(f protocol.FileInfo) bool { if len(batch) == indexBatchSize || currentBatchSize > indexTargetSize { if initial { if err = conn.Index(folder, batch); err != nil { return false } l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (initial index)", deviceID, name, folder, len(batch), currentBatchSize) initial = false } else { if err = conn.IndexUpdate(folder, batch); err != nil { return false } l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (batched update)", deviceID, name, folder, len(batch), currentBatchSize) } batch = make([]protocol.FileInfo, 0, indexBatchSize) currentBatchSize = 0 } batch = append(batch, f) currentBatchSize += f.ProtoSize() return true }) if initial && err == nil { err = conn.Index(folder, batch) if err == nil { l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", deviceID, name, folder, len(batch)) } } else if len(batch) > 0 && err == nil { err = conn.IndexUpdate(folder, batch) if err == nil { l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", deviceID, name, folder, len(batch)) } } return maxSequence, err } func (m *Model) updateLocalsFromScanning(folder string, fs []protocol.FileInfo) { m.updateLocals(folder, fs) m.fmut.RLock() folderCfg := m.folderCfgs[folder] m.fmut.RUnlock() // Fire the LocalChangeDetected event to notify listeners about local updates. m.localChangeDetected(folderCfg, fs) } func (m *Model) updateLocalsFromPulling(folder string, fs []protocol.FileInfo) { m.updateLocals(folder, fs) } func (m *Model) updateLocals(folder string, fs []protocol.FileInfo) { m.fmut.RLock() files := m.folderFiles[folder] m.fmut.RUnlock() if files == nil { // The folder doesn't exist. return } files.Update(protocol.LocalDeviceID, fs) filenames := make([]string, len(fs)) for i, file := range fs { filenames[i] = file.Name } events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{ "folder": folder, "items": len(fs), "filenames": filenames, "version": files.Sequence(protocol.LocalDeviceID), }) } func (m *Model) localChangeDetected(folderCfg config.FolderConfiguration, files []protocol.FileInfo) { path := strings.Replace(folderCfg.Path(), `\\?\`, "", 1) for _, file := range files { objType := "file" action := "modified" // If our local vector is version 1 AND it is the only version // vector so far seen for this file then it is a new file. Else if // it is > 1 it's not new, and if it is 1 but another shortId // version vector exists then it is new for us but created elsewhere // so the file is still not new but modified by us. Only if it is // truly new do we change this to 'added', else we leave it as // 'modified'. if len(file.Version.Counters) == 1 && file.Version.Counters[0].Value == 1 { action = "added" } if file.IsDirectory() { objType = "dir" } if file.IsDeleted() { action = "deleted" } // The full file path, adjusted to the local path separator character. Also // for windows paths, strip unwanted chars from the front. path := filepath.Join(path, filepath.FromSlash(file.Name)) events.Default.Log(events.LocalChangeDetected, map[string]string{ "folderID": folderCfg.ID, "label": folderCfg.Label, "action": action, "type": objType, "path": path, }) } } func (m *Model) requestGlobal(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) { m.pmut.RLock() nc, ok := m.conn[deviceID] m.pmut.RUnlock() if !ok { return nil, fmt.Errorf("requestGlobal: no such device: %s", deviceID) } l.Debugf("%v REQ(out): %s: %q / %q o=%d s=%d h=%x ft=%t", m, deviceID, folder, name, offset, size, hash, fromTemporary) return nc.Request(folder, name, offset, size, hash, fromTemporary) } func (m *Model) ScanFolders() map[string]error { m.fmut.RLock() folders := make([]string, 0, len(m.folderCfgs)) for folder := range m.folderCfgs { folders = append(folders, folder) } m.fmut.RUnlock() errors := make(map[string]error, len(m.folderCfgs)) errorsMut := sync.NewMutex() wg := sync.NewWaitGroup() wg.Add(len(folders)) for _, folder := range folders { folder := folder go func() { err := m.ScanFolder(folder) if err != nil { errorsMut.Lock() errors[folder] = err errorsMut.Unlock() // Potentially sets the error twice, once in the scanner just // by doing a check, and once here, if the error returned is // the same one as returned by CheckFolderHealth, though // duplicate set is handled by setError. m.fmut.RLock() srv := m.folderRunners[folder] m.fmut.RUnlock() srv.setError(err) } wg.Done() }() } wg.Wait() return errors } func (m *Model) ScanFolder(folder string) error { return m.ScanFolderSubdirs(folder, nil) } func (m *Model) ScanFolderSubdirs(folder string, subs []string) error { m.fmut.Lock() runner, ok := m.folderRunners[folder] m.fmut.Unlock() // Folders are added to folderRunners only when they are started. We can't // scan them before they have started, so that's what we need to check for // here. if !ok { return errors.New("no such folder") } return runner.Scan(subs) } func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error { for i, sub := range subDirs { sub = osutil.NativeFilename(sub) if p := filepath.Clean(filepath.Join(folder, sub)); !strings.HasPrefix(p, folder) { return errors.New("invalid subpath") } subDirs[i] = sub } m.fmut.Lock() fs := m.folderFiles[folder] folderCfg := m.folderCfgs[folder] ignores := m.folderIgnores[folder] runner, ok := m.folderRunners[folder] m.fmut.Unlock() mtimefs := fs.MtimeFS() // Check if the ignore patterns changed as part of scanning this folder. // If they did we should schedule a pull of the folder so that we // request things we might have suddenly become unignored and so on. oldHash := ignores.Hash() defer func() { if ignores.Hash() != oldHash { l.Debugln("Folder", folder, "ignore patterns changed; triggering puller") runner.IndexUpdated() } }() // Folders are added to folderRunners only when they are started. We can't // scan them before they have started, so that's what we need to check for // here. if !ok { return errors.New("no such folder") } if err := m.CheckFolderHealth(folder); err != nil { runner.setError(err) l.Infof("Stopping folder %s due to error: %s", folder, err) return err } if err := ignores.Load(filepath.Join(folderCfg.Path(), ".stignore")); err != nil && !os.IsNotExist(err) { err = fmt.Errorf("loading ignores: %v", err) runner.setError(err) l.Infof("Stopping folder %s due to error: %s", folder, err) return err } // Clean the list of subitems to ensure that we start at a known // directory, and don't scan subdirectories of things we've already // scanned. subDirs = unifySubs(subDirs, func(f string) bool { _, ok := fs.Get(protocol.LocalDeviceID, f) return ok }) // The cancel channel is closed whenever we return (such as from an error), // to signal the potentially still running walker to stop. cancel := make(chan struct{}) defer close(cancel) runner.setState(FolderScanning) fchan, err := scanner.Walk(scanner.Config{ Folder: folderCfg.ID, Dir: folderCfg.Path(), Subs: subDirs, Matcher: ignores, BlockSize: protocol.BlockSize, TempNamer: defTempNamer, TempLifetime: time.Duration(m.cfg.Options().KeepTemporariesH) * time.Hour, CurrentFiler: cFiler{m, folder}, Lstater: mtimefs, IgnorePerms: folderCfg.IgnorePerms, AutoNormalize: folderCfg.AutoNormalize, Hashers: m.numHashers(folder), ShortID: m.shortID, ProgressTickIntervalS: folderCfg.ScanProgressIntervalS, Cancel: cancel, }) if err != nil { // The error we get here is likely an OS level error, which might not be // as readable as our health check errors. Check if we can get a health // check error first, and use that if it's available. if ferr := m.CheckFolderHealth(folder); ferr != nil { err = ferr } runner.setError(err) return err } batchSizeFiles := 100 batchSizeBlocks := 2048 // about 256 MB batch := make([]protocol.FileInfo, 0, batchSizeFiles) blocksHandled := 0 for f := range fchan { if len(batch) == batchSizeFiles || blocksHandled > batchSizeBlocks { if err := m.CheckFolderHealth(folder); err != nil { l.Infof("Stopping folder %s mid-scan due to folder error: %s", folder, err) return err } m.updateLocalsFromScanning(folder, batch) batch = batch[:0] blocksHandled = 0 } batch = append(batch, f) blocksHandled += len(f.Blocks) } if err := m.CheckFolderHealth(folder); err != nil { l.Infof("Stopping folder %s mid-scan due to folder error: %s", folder, err) return err } else if len(batch) > 0 { m.updateLocalsFromScanning(folder, batch) } if len(subDirs) == 0 { // If we have no specific subdirectories to traverse, set it to one // empty prefix so we traverse the entire folder contents once. subDirs = []string{""} } // Do a scan of the database for each prefix, to check for deleted and // ignored files. batch = batch[:0] for _, sub := range subDirs { var iterError error fs.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi db.FileIntf) bool { f := fi.(db.FileInfoTruncated) if len(batch) == batchSizeFiles { if err := m.CheckFolderHealth(folder); err != nil { iterError = err return false } m.updateLocalsFromScanning(folder, batch) batch = batch[:0] } switch { case !f.IsInvalid() && (ignores.Match(f.Name).IsIgnored() || symlinkInvalid(folder, f)): // File was valid at last pass but has been ignored or is an // unsupported symlink. Set invalid bit. l.Debugln("setting invalid bit on ignored", f) nf := protocol.FileInfo{ Name: f.Name, Type: f.Type, Size: f.Size, ModifiedS: f.ModifiedS, ModifiedNs: f.ModifiedNs, Permissions: f.Permissions, NoPermissions: f.NoPermissions, Invalid: true, Version: f.Version, // The file is still the same, so don't bump version } batch = append(batch, nf) case !f.IsInvalid() && !f.IsDeleted(): // The file is valid and not deleted. Lets check if it's // still here. if _, err := mtimefs.Lstat(filepath.Join(folderCfg.Path(), f.Name)); err != nil { // We don't specifically verify that the error is // os.IsNotExist because there is a corner case when a // directory is suddenly transformed into a file. When that // happens, files that were in the directory (that is now a // file) are deleted but will return a confusing error ("not a // directory") when we try to Lstat() them. nf := protocol.FileInfo{ Name: f.Name, Type: f.Type, Size: 0, ModifiedS: f.ModifiedS, ModifiedNs: f.ModifiedNs, Deleted: true, Version: f.Version.Update(m.shortID), } batch = append(batch, nf) } } return true }) if iterError != nil { l.Infof("Stopping folder %s mid-scan due to folder error: %s", folder, iterError) return iterError } } if err := m.CheckFolderHealth(folder); err != nil { l.Infof("Stopping folder %s mid-scan due to folder error: %s", folder, err) return err } else if len(batch) > 0 { m.updateLocalsFromScanning(folder, batch) } m.folderStatRef(folder).ScanCompleted() runner.setState(FolderIdle) return nil } func (m *Model) DelayScan(folder string, next time.Duration) { m.fmut.Lock() runner, ok := m.folderRunners[folder] m.fmut.Unlock() if !ok { return } runner.DelayScan(next) } // numHashers returns the number of hasher routines to use for a given folder, // taking into account configuration and available CPU cores. func (m *Model) numHashers(folder string) int { m.fmut.Lock() folderCfg := m.folderCfgs[folder] numFolders := len(m.folderCfgs) m.fmut.Unlock() if folderCfg.Hashers > 0 { // Specific value set in the config, use that. return folderCfg.Hashers } if runtime.GOOS == "windows" || runtime.GOOS == "darwin" { // Interactive operating systems; don't load the system too heavily by // default. return 1 } // For other operating systems and architectures, lets try to get some // work done... Divide the available CPU cores among the configured // folders. if perFolder := runtime.GOMAXPROCS(-1) / numFolders; perFolder > 0 { return perFolder } return 1 } // generateClusterConfig returns a ClusterConfigMessage that is correct for // the given peer device func (m *Model) generateClusterConfig(device protocol.DeviceID) protocol.ClusterConfig { var message protocol.ClusterConfig m.fmut.RLock() for _, folder := range m.deviceFolders[device] { folderCfg := m.cfg.Folders()[folder] fs := m.folderFiles[folder] protocolFolder := protocol.Folder{ ID: folder, Label: folderCfg.Label, ReadOnly: folderCfg.Type == config.FolderTypeReadOnly, IgnorePermissions: folderCfg.IgnorePerms, IgnoreDelete: folderCfg.IgnoreDelete, DisableTempIndexes: folderCfg.DisableTempIndexes, } for _, device := range m.folderDevices[folder] { // DeviceID is a value type, but with an underlying array. Copy it // so we don't grab aliases to the same array later on in device[:] device := device // TODO: Set read only bit when relevant, and when we have per device // access controls. deviceCfg := m.cfg.Devices()[device] var indexID protocol.IndexID var maxSequence int64 if device == m.id { indexID = fs.IndexID(protocol.LocalDeviceID) maxSequence = fs.Sequence(protocol.LocalDeviceID) } else { indexID = fs.IndexID(device) maxSequence = fs.Sequence(device) } protocolDevice := protocol.Device{ ID: device[:], Name: deviceCfg.Name, Addresses: deviceCfg.Addresses, Compression: deviceCfg.Compression, CertName: deviceCfg.CertName, Introducer: deviceCfg.Introducer, IndexID: indexID, MaxSequence: maxSequence, } protocolFolder.Devices = append(protocolFolder.Devices, protocolDevice) } message.Folders = append(message.Folders, protocolFolder) } m.fmut.RUnlock() return message } func (m *Model) State(folder string) (string, time.Time, error) { m.fmut.RLock() runner, ok := m.folderRunners[folder] m.fmut.RUnlock() if !ok { // The returned error should be an actual folder error, so returning // errors.New("does not exist") or similar here would be // inappropriate. return "", time.Time{}, nil } state, changed, err := runner.getState() return state.String(), changed, err } func (m *Model) Override(folder string) { m.fmut.RLock() fs, ok := m.folderFiles[folder] runner := m.folderRunners[folder] m.fmut.RUnlock() if !ok { return } runner.setState(FolderScanning) batch := make([]protocol.FileInfo, 0, indexBatchSize) fs.WithNeed(protocol.LocalDeviceID, func(fi db.FileIntf) bool { need := fi.(protocol.FileInfo) if len(batch) == indexBatchSize { m.updateLocalsFromScanning(folder, batch) batch = batch[:0] } have, ok := fs.Get(protocol.LocalDeviceID, need.Name) if !ok || have.Name != need.Name { // We are missing the file need.Deleted = true need.Blocks = nil need.Version = need.Version.Update(m.shortID) need.Size = 0 } else { // We have the file, replace with our version have.Version = have.Version.Merge(need.Version).Update(m.shortID) need = have } need.Sequence = 0 batch = append(batch, need) return true }) if len(batch) > 0 { m.updateLocalsFromScanning(folder, batch) } runner.setState(FolderIdle) } // CurrentSequence returns the change version for the given folder. // This is guaranteed to increment if the contents of the local folder has // changed. func (m *Model) CurrentSequence(folder string) (int64, bool) { m.fmut.RLock() fs, ok := m.folderFiles[folder] m.fmut.RUnlock() if !ok { // The folder might not exist, since this can be called with a user // specified folder name from the REST interface. return 0, false } return fs.Sequence(protocol.LocalDeviceID), true } // RemoteSequence returns the change version for the given folder, as // sent by remote peers. This is guaranteed to increment if the contents of // the remote or global folder has changed. func (m *Model) RemoteSequence(folder string) (int64, bool) { m.fmut.RLock() defer m.fmut.RUnlock() fs, ok := m.folderFiles[folder] if !ok { // The folder might not exist, since this can be called with a user // specified folder name from the REST interface. return 0, false } var ver int64 for _, n := range m.folderDevices[folder] { ver += fs.Sequence(n) } return ver, true } func (m *Model) GlobalDirectoryTree(folder, prefix string, levels int, dirsonly bool) map[string]interface{} { m.fmut.RLock() files, ok := m.folderFiles[folder] m.fmut.RUnlock() if !ok { return nil } output := make(map[string]interface{}) sep := string(filepath.Separator) prefix = osutil.NativeFilename(prefix) if prefix != "" && !strings.HasSuffix(prefix, sep) { prefix = prefix + sep } files.WithPrefixedGlobalTruncated(prefix, func(fi db.FileIntf) bool { f := fi.(db.FileInfoTruncated) if f.IsInvalid() || f.IsDeleted() || f.Name == prefix { return true } f.Name = strings.Replace(f.Name, prefix, "", 1) var dir, base string if f.IsDirectory() && !f.IsSymlink() { dir = f.Name } else { dir = filepath.Dir(f.Name) base = filepath.Base(f.Name) } if levels > -1 && strings.Count(f.Name, sep) > levels { return true } last := output if dir != "." { for _, path := range strings.Split(dir, sep) { directory, ok := last[path] if !ok { newdir := make(map[string]interface{}) last[path] = newdir last = newdir } else { last = directory.(map[string]interface{}) } } } if !dirsonly && base != "" { last[base] = []interface{}{ f.ModTime(), f.FileSize(), } } return true }) return output } func (m *Model) Availability(folder, file string, version protocol.Vector, block protocol.BlockInfo) []Availability { // Acquire this lock first, as the value returned from foldersFiles can // get heavily modified on Close() m.pmut.RLock() defer m.pmut.RUnlock() m.fmut.RLock() fs, ok := m.folderFiles[folder] devices := m.folderDevices[folder] m.fmut.RUnlock() if !ok { return nil } var availabilities []Availability for _, device := range fs.Availability(file) { _, ok := m.conn[device] if ok { availabilities = append(availabilities, Availability{ID: device, FromTemporary: false}) } } for _, device := range devices { if m.deviceDownloads[device].Has(folder, file, version, int32(block.Offset/protocol.BlockSize)) { availabilities = append(availabilities, Availability{ID: device, FromTemporary: true}) } } return availabilities } // BringToFront bumps the given files priority in the job queue. func (m *Model) BringToFront(folder, file string) { m.pmut.RLock() defer m.pmut.RUnlock() runner, ok := m.folderRunners[folder] if ok { runner.BringToFront(file) } } // CheckFolderHealth checks the folder for common errors and returns the // current folder error, or nil if the folder is healthy. func (m *Model) CheckFolderHealth(id string) error { folder, ok := m.cfg.Folders()[id] if !ok { return errors.New("folder does not exist") } // Check for folder errors, with the most serious and specific first and // generic ones like out of space on the home disk later. Note the // inverted error flow (err==nil checks) here. err := m.checkFolderPath(folder) if err == nil { err = m.checkFolderFreeSpace(folder) } if err == nil { err = m.checkHomeDiskFree() } // Set or clear the error on the runner, which also does logging and // generates events and stuff. m.runnerExchangeError(folder, err) return err } // checkFolderPath returns nil if the folder path exists and has the marker file. func (m *Model) checkFolderPath(folder config.FolderConfiguration) error { if folder.Path() == "" { return errFolderPathEmpty } if fi, err := os.Stat(folder.Path()); err != nil || !fi.IsDir() { return errFolderPathMissing } if !folder.HasMarker() { return errFolderMarkerMissing } return nil } // checkFolderFreeSpace returns nil if the folder has the required amount of // free space, or if folder free space checking is disabled. func (m *Model) checkFolderFreeSpace(folder config.FolderConfiguration) error { if folder.MinDiskFreePct <= 0 { return nil } free, err := osutil.DiskFreePercentage(folder.Path()) if err == nil && free < folder.MinDiskFreePct { return errFolderNoSpace } return nil } // checkHomeDiskFree returns nil if the home disk has the required amount of // free space, or if home disk free space checking is disabled. func (m *Model) checkHomeDiskFree() error { minFree := m.cfg.Options().MinHomeDiskFreePct if minFree <= 0 { return nil } free, err := osutil.DiskFreePercentage(m.cfg.ConfigPath()) if err == nil && free < minFree { return errHomeDiskNoSpace } return nil } // runnerExchangeError sets the given error (which way be nil) on the folder // runner. If the error differs from any previous error, logging and events // happen. func (m *Model) runnerExchangeError(folder config.FolderConfiguration, err error) { m.fmut.RLock() runner, runnerExists := m.folderRunners[folder.ID] m.fmut.RUnlock() var oldErr error if runnerExists { _, _, oldErr = runner.getState() } if err != nil { if oldErr != nil && oldErr.Error() != err.Error() { l.Infof("Folder %q error changed: %q -> %q", folder.ID, oldErr, err) } else if oldErr == nil { l.Warnf("Stopping folder %q - %v", folder.ID, err) } if runnerExists { runner.setError(err) } } else if oldErr != nil { l.Infof("Folder %q error is cleared, restarting", folder.ID) if runnerExists { runner.clearError() } } } func (m *Model) ResetFolder(folder string) { l.Infof("Cleaning data for folder %q", folder) db.DropFolder(m.db, folder) } func (m *Model) String() string { return fmt.Sprintf("model@%p", m) } func (m *Model) VerifyConfiguration(from, to config.Configuration) error { return nil } func (m *Model) CommitConfiguration(from, to config.Configuration) bool { // TODO: This should not use reflect, and should take more care to try to handle stuff without restart. // Go through the folder configs and figure out if we need to restart or not. fromFolders := mapFolders(from.Folders) toFolders := mapFolders(to.Folders) for folderID, cfg := range toFolders { if _, ok := fromFolders[folderID]; !ok { // A folder was added. l.Debugln(m, "adding folder", folderID) m.AddFolder(cfg) m.StartFolder(folderID) // Drop connections to all devices that can now share the new // folder. m.pmut.Lock() for _, dev := range cfg.DeviceIDs() { if conn, ok := m.conn[dev]; ok { closeRawConn(conn) } } m.pmut.Unlock() } } for folderID, fromCfg := range fromFolders { toCfg, ok := toFolders[folderID] if !ok { // The folder was removed. m.RemoveFolder(folderID) continue } // This folder exists on both sides. Settings might have changed. // Check if anything differs, apart from the label. toCfgCopy := toCfg fromCfgCopy := fromCfg fromCfgCopy.Label = "" toCfgCopy.Label = "" if !reflect.DeepEqual(fromCfgCopy, toCfgCopy) { m.RestartFolder(toCfg) } } // Removing a device. We actually don't need to do anything. // Because folder config has changed (since the device lists do not match) // Folders for that had device got "restarted", which involves killing // connections to all devices that we were sharing the folder with. // At some point model.Close() will get called for that device which will // clean residue device state that is not part of any folder. // Some options don't require restart as those components handle it fine // by themselves. from.Options.URAccepted = to.Options.URAccepted from.Options.URUniqueID = to.Options.URUniqueID from.Options.ListenAddresses = to.Options.ListenAddresses from.Options.RelaysEnabled = to.Options.RelaysEnabled from.Options.UnackedNotificationIDs = to.Options.UnackedNotificationIDs // All of the other generic options require restart. Or at least they may; // removing this check requires going through those options carefully and // making sure there are individual services that handle them correctly. // This code is the "original" requires-restart check and protects other // components that haven't yet been converted to VerifyConfig/CommitConfig // handling. if !reflect.DeepEqual(from.Options, to.Options) { l.Debugln(m, "requires restart, options differ") return false } return true } // mapFolders returns a map of folder ID to folder configuration for the given // slice of folder configurations. func mapFolders(folders []config.FolderConfiguration) map[string]config.FolderConfiguration { m := make(map[string]config.FolderConfiguration, len(folders)) for _, cfg := range folders { m[cfg.ID] = cfg } return m } // mapDevices returns a map of device ID to nothing for the given slice of // device IDs. func mapDevices(devices []protocol.DeviceID) map[protocol.DeviceID]struct{} { m := make(map[protocol.DeviceID]struct{}, len(devices)) for _, dev := range devices { m[dev] = struct{}{} } return m } func symlinkInvalid(folder string, fi db.FileIntf) bool { if !symlinks.Supported && fi.IsSymlink() && !fi.IsInvalid() && !fi.IsDeleted() { symlinkWarning.Do(func() { l.Warnln("Symlinks are disabled, unsupported or require Administrator privileges. This might cause your folder to appear out of sync.") }) // Need to type switch for the concrete type to be able to access fields... var name string switch fi := fi.(type) { case protocol.FileInfo: name = fi.Name case db.FileInfoTruncated: name = fi.Name } l.Infoln("Unsupported symlink", name, "in folder", folder) return true } return false } // Skips `skip` elements and retrieves up to `get` elements from a given slice. // Returns the resulting slice, plus how much elements are left to skip or // copy to satisfy the values which were provided, given the slice is not // big enough. func getChunk(data []string, skip, get int) ([]string, int, int) { l := len(data) if l <= skip { return []string{}, skip - l, get } else if l < skip+get { return data[skip:l], 0, get - (l - skip) } return data[skip : skip+get], 0, 0 } func closeRawConn(conn io.Closer) error { if conn, ok := conn.(*tls.Conn); ok { // If the underlying connection is a *tls.Conn, Close() does more // than it says on the tin. Specifically, it sends a TLS alert // message, which might block forever if the connection is dead // and we don't have a deadline set. conn.SetWriteDeadline(time.Now().Add(250 * time.Millisecond)) } return conn.Close() } func stringSliceWithout(ss []string, s string) []string { for i := range ss { if ss[i] == s { copy(ss[i:], ss[i+1:]) ss = ss[:len(ss)-1] return ss } } return ss } func readOffsetIntoBuf(file string, offset int64, buf []byte) error { fd, err := os.Open(file) if err != nil { l.Debugln("readOffsetIntoBuf.Open", file, err) return err } defer fd.Close() _, err = fd.ReadAt(buf, offset) if err != nil { l.Debugln("readOffsetIntoBuf.ReadAt", file, err) } return err } // The exists function is expected to return true for all known paths // (excluding "" and ".") func unifySubs(dirs []string, exists func(dir string) bool) []string { subs := trimUntilParentKnown(dirs, exists) sort.Strings(subs) return simplifySortedPaths(subs) } func trimUntilParentKnown(dirs []string, exists func(dir string) bool) []string { var subs []string for _, sub := range dirs { for sub != "" && sub != ".stfolder" && sub != ".stignore" { sub = filepath.Clean(sub) parent := filepath.Dir(sub) if parent == "." || exists(parent) { break } sub = parent if sub == "." || sub == string(filepath.Separator) { // Shortcut. We are going to scan the full folder, so we can // just return an empty list of subs at this point. return nil } } if sub == "" { return nil } subs = append(subs, sub) } return subs } func simplifySortedPaths(subs []string) []string { var cleaned []string next: for _, sub := range subs { for _, existing := range cleaned { if sub == existing || strings.HasPrefix(sub, existing+string(os.PathSeparator)) { continue next } } cleaned = append(cleaned, sub) } return cleaned } // makeForgetUpdate takes an index update and constructs a download progress update // causing to forget any progress for files which we've just been sent. func makeForgetUpdate(files []protocol.FileInfo) []protocol.FileDownloadProgressUpdate { updates := make([]protocol.FileDownloadProgressUpdate, 0, len(files)) for _, file := range files { if file.IsSymlink() || file.IsDirectory() || file.IsDeleted() { continue } updates = append(updates, protocol.FileDownloadProgressUpdate{ Name: file.Name, Version: file.Version, UpdateType: protocol.UpdateTypeForget, }) } return updates } // shouldIgnore returns true when a file should be excluded from processing func shouldIgnore(file db.FileIntf, matcher *ignore.Matcher, ignoreDelete bool) bool { // We check things in a certain order here... switch { case ignoreDelete && file.IsDeleted(): // ignoreDelete first because it's a very cheap test so a win if it // succeeds, and we might in the long run accumulate quite a few // deleted files. return true case matcher.Match(file.FileName()).IsIgnored(): // ignore patterns second because ignoring them is a valid way to // silence warnings about them being invalid and so on. return true } return false }