all: Simultaneously walk fs and db on scan (fixes #2571, fixes #4573) (#4584)

When scanner.Walk detects a change, it now returns the new file info as well as the old file info. It also finds deleted and ignored files while scanning.
Also directory deletions are now always committed to db after their children to prevent temporary failure on remote due to non-empty directory.
This commit is contained in:
Simon Frei 2018-02-10 16:56:53 +01:00 committed by GitHub
parent b97d5bcca8
commit 6d3f9d5154
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 685 additions and 262 deletions

View File

@ -194,9 +194,9 @@ func (s *FileSet) WithHaveTruncated(device protocol.DeviceID, fn Iterator) {
s.db.withHave([]byte(s.folder), device[:], nil, true, nativeFileIterator(fn))
}
func (s *FileSet) WithPrefixedHaveTruncated(device protocol.DeviceID, prefix string, fn Iterator) {
l.Debugf("%s WithPrefixedHaveTruncated(%v)", s.folder, device)
s.db.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), true, nativeFileIterator(fn))
func (s *FileSet) WithPrefixedHave(device protocol.DeviceID, prefix string, fn Iterator) {
l.Debugf("%s WithPrefixedHave(%v)", s.folder, device)
s.db.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), false, nativeFileIterator(fn))
}
func (s *FileSet) WithGlobal(fn Iterator) {
l.Debugf("%s WithGlobal()", s.folder)

View File

@ -10,7 +10,10 @@
package fs
import "path/filepath"
import (
"path/filepath"
"sort"
)
// WalkFunc is the type of the function called for each file or directory
// visited by Walk. The path argument contains the argument to Walk as a
@ -54,6 +57,7 @@ func (f *walkFilesystem) walk(path string, info FileInfo, walkFn WalkFunc) error
if err != nil {
return walkFn(path, info, err)
}
sort.Strings(names)
for _, name := range names {
filename := filepath.Join(path, name)

View File

@ -1395,14 +1395,21 @@ func (m *Model) CurrentGlobalFile(folder string, file string) (protocol.FileInfo
return fs.GetGlobal(file)
}
type cFiler struct {
m *Model
r string
type haveWalker struct {
fset *db.FileSet
}
// Implements scanner.CurrentFiler
func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) {
return cf.m.CurrentFolderFile(cf.r, file)
func (h haveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) {
ctxChan := ctx.Done()
h.fset.WithPrefixedHave(protocol.LocalDeviceID, prefix, func(fi db.FileIntf) bool {
f := fi.(protocol.FileInfo)
select {
case out <- f:
case <-ctxChan:
return false
}
return true
})
}
// Connection returns the current connection for device, and a boolean wether a connection was found.
@ -1955,13 +1962,14 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
runner.setState(FolderScanning)
fchan := scanner.Walk(ctx, scanner.Config{
haveWalker := haveWalker{fset}
rchan := scanner.Walk(ctx, scanner.Config{
Folder: folderCfg.ID,
Subs: subDirs,
Matcher: ignores,
BlockSize: protocol.BlockSize,
TempLifetime: time.Duration(m.cfg.Options().KeepTemporariesH) * time.Hour,
CurrentFiler: cFiler{m, folder},
Have: haveWalker,
Filesystem: mtimefs,
IgnorePerms: folderCfg.IgnorePerms,
AutoNormalize: folderCfg.AutoNormalize,
@ -1978,6 +1986,17 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
batchSizeBytes := 0
changes := 0
checkBatch := func() error {
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
if err := runner.CheckHealth(); err != nil {
return err
}
m.updateLocalsFromScanning(folder, batch)
batch = batch[:0]
batchSizeBytes = 0
}
return nil
}
// Schedule a pull after scanning, but only if we actually detected any
// changes.
@ -1987,98 +2006,49 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
}
}()
for f := range fchan {
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
if err := runner.CheckHealth(); err != nil {
var delDirStack []protocol.FileInfo
for r := range rchan {
if err := checkBatch(); err != nil {
l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err)
return err
}
// Append deleted dirs from stack if the current file isn't a child,
// which means all children were already processed.
for len(delDirStack) != 0 && !strings.HasPrefix(r.New.Name, delDirStack[len(delDirStack)-1].Name+string(fs.PathSeparator)) {
lastDelDir := delDirStack[len(delDirStack)-1]
batch = append(batch, lastDelDir)
batchSizeBytes += lastDelDir.ProtoSize()
changes++
if err := checkBatch(); err != nil {
l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err)
return err
}
m.updateLocalsFromScanning(folder, batch)
batch = batch[:0]
batchSizeBytes = 0
delDirStack = delDirStack[:len(delDirStack)-1]
}
batch = append(batch, f)
batchSizeBytes += f.ProtoSize()
// Delay appending deleted dirs until all its children are processed
if r.Old.IsDirectory() && (r.New.Deleted || !r.New.IsDirectory()) {
delDirStack = append(delDirStack, r.New)
continue
}
l.Debugln("Appending", r)
batch = append(batch, r.New)
batchSizeBytes += r.New.ProtoSize()
changes++
}
if err := runner.CheckHealth(); err != nil {
l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err)
return err
} else if len(batch) > 0 {
m.updateLocalsFromScanning(folder, batch)
}
if len(subDirs) == 0 {
// If we have no specific subdirectories to traverse, set it to one
// empty prefix so we traverse the entire folder contents once.
subDirs = []string{""}
}
// Do a scan of the database for each prefix, to check for deleted and
// ignored files.
batch = batch[:0]
batchSizeBytes = 0
for _, sub := range subDirs {
var iterError error
fset.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi db.FileIntf) bool {
f := fi.(db.FileInfoTruncated)
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
if err := runner.CheckHealth(); err != nil {
iterError = err
return false
}
m.updateLocalsFromScanning(folder, batch)
batch = batch[:0]
batchSizeBytes = 0
}
switch {
case !f.IsInvalid() && ignores.Match(f.Name).IsIgnored():
// File was valid at last pass but has been ignored. Set invalid bit.
l.Debugln("setting invalid bit on ignored", f)
nf := f.ConvertToInvalidFileInfo(m.id.Short())
batch = append(batch, nf)
batchSizeBytes += nf.ProtoSize()
changes++
case !f.IsInvalid() && !f.IsDeleted():
// The file is valid and not deleted. Lets check if it's
// still here.
if _, err := mtimefs.Lstat(f.Name); err != nil {
// We don't specifically verify that the error is
// fs.IsNotExist because there is a corner case when a
// directory is suddenly transformed into a file. When that
// happens, files that were in the directory (that is now a
// file) are deleted but will return a confusing error ("not a
// directory") when we try to Lstat() them.
nf := protocol.FileInfo{
Name: f.Name,
Type: f.Type,
Size: 0,
ModifiedS: f.ModifiedS,
ModifiedNs: f.ModifiedNs,
ModifiedBy: m.id.Short(),
Deleted: true,
Version: f.Version.Update(m.shortID),
}
batch = append(batch, nf)
batchSizeBytes += nf.ProtoSize()
changes++
}
}
return true
})
if iterError != nil {
l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), iterError)
return iterError
// Append remaining deleted dirs.
for i := len(delDirStack) - 1; i >= 0; i-- {
if err := checkBatch(); err != nil {
l.Debugln("Stopping scan of folder %s due to: %s", folderCfg.Description(), err)
return err
}
batch = append(batch, delDirStack[i])
batchSizeBytes += delDirStack[i].ProtoSize()
changes++
}
if err := runner.CheckHealth(); err != nil {

View File

@ -2808,6 +2808,185 @@ func TestNoRequestsFromPausedDevices(t *testing.T) {
}
}
// TestIssue2571 tests replacing a directory with content with a symlink
func TestIssue2571(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Symlinks aren't supported by fs and scanner on windows")
}
err := defaultFs.MkdirAll("replaceDir", 0755)
if err != nil {
t.Fatal(err)
}
defer func() {
defaultFs.RemoveAll("replaceDir")
}()
testFs := fs.NewFilesystem(fs.FilesystemTypeBasic, filepath.Join(defaultFs.URI(), "replaceDir"))
for _, dir := range []string{"toLink", "linkTarget"} {
err := testFs.MkdirAll(dir, 0775)
if err != nil {
t.Fatal(err)
}
fd, err := testFs.Create(filepath.Join(dir, "a"))
if err != nil {
t.Fatal(err)
}
fd.Close()
}
dbi := db.OpenMemory()
m := NewModel(defaultConfig, protocol.LocalDeviceID, "syncthing", "dev", dbi, nil)
m.AddFolder(defaultFolderConfig)
m.StartFolder("default")
m.ServeBackground()
defer m.Stop()
m.ScanFolder("default")
if err = testFs.RemoveAll("toLink"); err != nil {
t.Fatal(err)
}
if err := osutil.DebugSymlinkForTestsOnly(filepath.Join(testFs.URI(), "linkTarget"), filepath.Join(testFs.URI(), "toLink")); err != nil {
t.Fatal(err)
}
m.ScanFolder("default")
if dir, ok := m.CurrentFolderFile("default", filepath.Join("replaceDir", "toLink")); !ok {
t.Fatalf("Dir missing in db")
} else if !dir.IsSymlink() {
t.Errorf("Dir wasn't changed to symlink")
}
if file, ok := m.CurrentFolderFile("default", filepath.Join("replaceDir", "toLink", "a")); !ok {
t.Fatalf("File missing in db")
} else if !file.Deleted {
t.Errorf("File below symlink has not been marked as deleted")
}
}
// TestIssue4573 tests that contents of an unavailable dir aren't marked deleted
func TestIssue4573(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Can't make the dir inaccessible on windows")
}
err := defaultFs.MkdirAll("inaccessible", 0755)
if err != nil {
t.Fatal(err)
}
defer func() {
defaultFs.Chmod("inaccessible", 0777)
defaultFs.RemoveAll("inaccessible")
}()
file := filepath.Join("inaccessible", "a")
fd, err := defaultFs.Create(file)
if err != nil {
t.Fatal(err)
}
fd.Close()
dbi := db.OpenMemory()
m := NewModel(defaultConfig, protocol.LocalDeviceID, "syncthing", "dev", dbi, nil)
m.AddFolder(defaultFolderConfig)
m.StartFolder("default")
m.ServeBackground()
defer m.Stop()
m.ScanFolder("default")
err = defaultFs.Chmod("inaccessible", 0000)
if err != nil {
t.Fatal(err)
}
m.ScanFolder("default")
if file, ok := m.CurrentFolderFile("default", file); !ok {
t.Fatalf("File missing in db")
} else if file.Deleted {
t.Errorf("Inaccessible file has been marked as deleted.")
}
}
// TestInternalScan checks whether various fs operations are correctly represented
// in the db after scanning.
func TestInternalScan(t *testing.T) {
err := defaultFs.MkdirAll("internalScan", 0755)
if err != nil {
t.Fatal(err)
}
defer func() {
defaultFs.RemoveAll("internalScan")
}()
testFs := fs.NewFilesystem(fs.FilesystemTypeBasic, filepath.Join(defaultFs.URI(), "internalScan"))
testCases := map[string]func(protocol.FileInfo) bool{
"removeDir": func(f protocol.FileInfo) bool {
return !f.Deleted
},
"dirToFile": func(f protocol.FileInfo) bool {
return f.Deleted || f.IsDirectory()
},
}
baseDirs := []string{"dirToFile", "removeDir"}
for _, dir := range baseDirs {
sub := filepath.Join(dir, "subDir")
for _, dir := range []string{dir, sub} {
err := testFs.MkdirAll(dir, 0775)
if err != nil {
t.Fatalf("%v: %v", dir, err)
}
}
testCases[sub] = func(f protocol.FileInfo) bool {
return !f.Deleted
}
for _, dir := range []string{dir, sub} {
file := filepath.Join(dir, "a")
fd, err := testFs.Create(file)
if err != nil {
t.Fatal(err)
}
fd.Close()
testCases[file] = func(f protocol.FileInfo) bool {
return !f.Deleted
}
}
}
dbi := db.OpenMemory()
m := NewModel(defaultConfig, protocol.LocalDeviceID, "syncthing", "dev", dbi, nil)
m.AddFolder(defaultFolderConfig)
m.StartFolder("default")
m.ServeBackground()
defer m.Stop()
m.ScanFolder("default")
for _, dir := range baseDirs {
if err = testFs.RemoveAll(dir); err != nil {
t.Fatal(err)
}
}
fd, err := testFs.Create("dirToFile")
if err != nil {
t.Fatal(err)
}
fd.Close()
m.ScanFolder("default")
for path, cond := range testCases {
if f, ok := m.CurrentFolderFile("default", filepath.Join("internalScan", path)); !ok {
t.Fatalf("%v missing in db", path)
} else if cond(f) {
t.Errorf("Incorrect db entry for %v", path)
}
}
}
func TestCustomMarkerName(t *testing.T) {
ldb := db.OpenMemory()
set := db.NewFileSet("default", defaultFs, ldb)

View File

@ -117,6 +117,10 @@ func (f FileInfo) WinsConflict(other FileInfo) bool {
return f.Version.Compare(other.Version) == ConcurrentGreater
}
func (f FileInfo) IsEmpty() bool {
return f.Version.Counters == nil
}
func (f *FileInfo) Invalidate(invalidatedBy ShortID) {
f.Invalid = true
f.ModifiedBy = invalidatedBy
@ -124,6 +128,23 @@ func (f *FileInfo) Invalidate(invalidatedBy ShortID) {
f.Sequence = 0
}
func (f *FileInfo) InvalidatedCopy(invalidatedBy ShortID) FileInfo {
copy := *f
copy.Invalidate(invalidatedBy)
return copy
}
func (f *FileInfo) DeletedCopy(deletedBy ShortID) FileInfo {
copy := *f
copy.Size = 0
copy.ModifiedBy = deletedBy
copy.Deleted = true
copy.Version = f.Version.Update(deletedBy)
copy.Sequence = 0
copy.Blocks = nil
return copy
}
func (b BlockInfo) String() string {
return fmt.Sprintf("Block{%d/%d/%d/%x}", b.Offset, b.Size, b.WeakHash, b.Hash)
}

View File

@ -24,6 +24,10 @@ var (
quickCfg = &quick.Config{}
)
const (
fileSize = 1 << 40
)
func TestPing(t *testing.T) {
ar, aw := io.Pipe()
br, bw := io.Pipe()
@ -243,12 +247,6 @@ func TestMarshalledIndexMessageSize(t *testing.T) {
return
}
const (
maxMessageSize = MaxMessageLen
fileSize = 1 << 40
blockSize = BlockSize
)
f := FileInfo{
Name: "a normal length file name withoout any weird stuff.txt",
Type: FileInfoTypeFile,
@ -256,12 +254,12 @@ func TestMarshalledIndexMessageSize(t *testing.T) {
Permissions: 0666,
ModifiedS: time.Now().Unix(),
Version: Vector{Counters: []Counter{{ID: 1 << 60, Value: 1}, {ID: 2 << 60, Value: 1}}},
Blocks: make([]BlockInfo, fileSize/blockSize),
Blocks: make([]BlockInfo, fileSize/BlockSize),
}
for i := 0; i < fileSize/blockSize; i++ {
f.Blocks[i].Offset = int64(i) * blockSize
f.Blocks[i].Size = blockSize
for i := 0; i < fileSize/BlockSize; i++ {
f.Blocks[i].Offset = int64(i) * BlockSize
f.Blocks[i].Size = BlockSize
f.Blocks[i].Hash = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 1, 2, 3, 4, 5, 6, 7, 8, 9, 30, 1, 2}
}
@ -271,8 +269,8 @@ func TestMarshalledIndexMessageSize(t *testing.T) {
}
msgSize := idx.ProtoSize()
if msgSize > maxMessageSize {
t.Errorf("Message size %d bytes is larger than max %d", msgSize, maxMessageSize)
if msgSize > MaxMessageLen {
t.Errorf("Message size %d bytes is larger than max %d", msgSize, MaxMessageLen)
}
}
@ -400,3 +398,31 @@ func TestCheckConsistency(t *testing.T) {
}
}
}
func TestCopyFileInfo(t *testing.T) {
f := FileInfo{
Name: "a normal length file name withoout any weird stuff.txt",
Type: FileInfoTypeFile,
Size: fileSize,
Permissions: 0666,
ModifiedS: time.Now().Unix(),
Version: Vector{Counters: []Counter{{ID: 1 << 60, Value: 1}, {ID: 2 << 60, Value: 1}}},
Blocks: make([]BlockInfo, fileSize/BlockSize),
}
del := f.DeletedCopy(LocalDeviceID.Short())
if f.Deleted {
t.Errorf("Source file info was deleted on copy")
}
if !del.Deleted {
t.Errorf("Returned file info was not deleted on copy")
}
inv := f.InvalidatedCopy(LocalDeviceID.Short())
if f.Invalid {
t.Errorf("Source file info was invalid on copy")
}
if !inv.Invalid {
t.Errorf("Returned file info was not invalid on copy")
}
}

View File

@ -65,15 +65,15 @@ type parallelHasher struct {
fs fs.Filesystem
blockSize int
workers int
outbox chan<- protocol.FileInfo
inbox <-chan protocol.FileInfo
outbox chan<- ScanResult
inbox <-chan ScanResult
counter Counter
done chan<- struct{}
useWeakHashes bool
wg sync.WaitGroup
}
func newParallelHasher(ctx context.Context, fs fs.Filesystem, blockSize, workers int, outbox chan<- protocol.FileInfo, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}, useWeakHashes bool) {
func newParallelHasher(ctx context.Context, fs fs.Filesystem, blockSize, workers int, outbox chan<- ScanResult, inbox <-chan ScanResult, counter Counter, done chan<- struct{}, useWeakHashes bool) {
ph := &parallelHasher{
fs: fs,
blockSize: blockSize,
@ -104,25 +104,25 @@ func (ph *parallelHasher) hashFiles(ctx context.Context) {
return
}
if f.IsDirectory() || f.IsDeleted() {
if f.New.IsDirectory() || f.New.IsDeleted() {
panic("Bug. Asked to hash a directory or a deleted file.")
}
blocks, err := HashFile(ctx, ph.fs, f.Name, ph.blockSize, ph.counter, ph.useWeakHashes)
blocks, err := HashFile(ctx, ph.fs, f.New.Name, ph.blockSize, ph.counter, ph.useWeakHashes)
if err != nil {
l.Debugln("hash error:", f.Name, err)
l.Debugln("hash error:", f.New.Name, err)
continue
}
f.Blocks = blocks
f.New.Blocks = blocks
// The size we saw when initially deciding to hash the file
// might not have been the size it actually had when we hashed
// it. Update the size from the block list.
f.Size = 0
f.New.Size = 0
for _, b := range blocks {
f.Size += int64(b.Size)
f.New.Size += int64(b.Size)
}
select {

View File

@ -8,7 +8,9 @@ package scanner
import (
"context"
"errors"
"runtime"
"strings"
"sync/atomic"
"time"
"unicode/utf8"
@ -48,8 +50,8 @@ type Config struct {
Matcher *ignore.Matcher
// Number of hours to keep temporary files for
TempLifetime time.Duration
// If CurrentFiler is not nil, it is queried for the current file before rescanning.
CurrentFiler CurrentFiler
// Walks over file infos as present in the db before the scan alphabetically.
Have haveWalker
// The Filesystem provides an abstraction on top of the actual filesystem.
Filesystem fs.Filesystem
// If IgnorePerms is true, changes to permission bits will not be
@ -70,16 +72,28 @@ type Config struct {
UseWeakHashes bool
}
type CurrentFiler interface {
// CurrentFile returns the file as seen at last scan.
CurrentFile(name string) (protocol.FileInfo, bool)
type haveWalker interface {
// Walk passes all local file infos from the db which start with prefix
// to out and aborts early if ctx is cancelled.
Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo)
}
func Walk(ctx context.Context, cfg Config) chan protocol.FileInfo {
type fsWalkResult struct {
path string
info fs.FileInfo
err error
}
type ScanResult struct {
New protocol.FileInfo
Old protocol.FileInfo
}
func Walk(ctx context.Context, cfg Config) chan ScanResult {
w := walker{cfg}
if w.CurrentFiler == nil {
w.CurrentFiler = noCurrentFiler{}
if w.Have == nil {
w.Have = noHaveWalker{}
}
if w.Filesystem == nil {
panic("no filesystem specified")
@ -97,25 +111,19 @@ type walker struct {
// Walk returns the list of files found in the local folder by scanning the
// file system. Files are blockwise hashed.
func (w *walker) walk(ctx context.Context) chan protocol.FileInfo {
func (w *walker) walk(ctx context.Context) chan ScanResult {
l.Debugln("Walk", w.Subs, w.BlockSize, w.Matcher)
toHashChan := make(chan protocol.FileInfo)
finishedChan := make(chan protocol.FileInfo)
haveChan := make(chan protocol.FileInfo)
haveCtx, haveCancel := context.WithCancel(ctx)
go w.dbWalkerRoutine(haveCtx, haveChan)
// A routine which walks the filesystem tree, and sends files which have
// been modified to the counter routine.
go func() {
hashFiles := w.walkAndHashFiles(ctx, toHashChan, finishedChan)
if len(w.Subs) == 0 {
w.Filesystem.Walk(".", hashFiles)
} else {
for _, sub := range w.Subs {
w.Filesystem.Walk(sub, hashFiles)
}
}
close(toHashChan)
}()
fsChan := make(chan fsWalkResult)
go w.fsWalkerRoutine(ctx, fsChan, haveCancel)
toHashChan := make(chan ScanResult)
finishedChan := make(chan ScanResult)
go w.processWalkResults(ctx, fsChan, haveChan, toHashChan, finishedChan)
// We're not required to emit scan progress events, just kick off hashers,
// and feed inputs directly from the walker.
@ -139,15 +147,15 @@ func (w *walker) walk(ctx context.Context) chan protocol.FileInfo {
// Parallel hasher is stopped by this routine when we close the channel over
// which it receives the files we ask it to hash.
go func() {
var filesToHash []protocol.FileInfo
var filesToHash []ScanResult
var total int64 = 1
for file := range toHashChan {
filesToHash = append(filesToHash, file)
total += file.Size
total += file.New.Size
}
realToHashChan := make(chan protocol.FileInfo)
realToHashChan := make(chan ScanResult)
done := make(chan struct{})
progress := newByteCounter()
@ -183,7 +191,7 @@ func (w *walker) walk(ctx context.Context) chan protocol.FileInfo {
loop:
for _, file := range filesToHash {
l.Debugln("real to hash:", file.Name)
l.Debugln("real to hash:", file.New.Name)
select {
case realToHashChan <- file:
case <-ctx.Done():
@ -196,15 +204,49 @@ func (w *walker) walk(ctx context.Context) chan protocol.FileInfo {
return finishedChan
}
func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protocol.FileInfo) fs.WalkFunc {
now := time.Now()
return func(path string, info fs.FileInfo, err error) error {
// dbWalkerRoutine walks the db and sends back file infos to be compared to scan results.
func (w *walker) dbWalkerRoutine(ctx context.Context, haveChan chan<- protocol.FileInfo) {
defer close(haveChan)
if len(w.Subs) == 0 {
w.Have.Walk("", ctx, haveChan)
return
}
for _, sub := range w.Subs {
select {
case <-ctx.Done():
return ctx.Err()
return
default:
}
w.Have.Walk(sub, ctx, haveChan)
}
}
// fsWalkerRoutine walks the filesystem tree and sends back file infos and potential
// errors at paths that need to be processed.
func (w *walker) fsWalkerRoutine(ctx context.Context, fsChan chan<- fsWalkResult, haveCancel context.CancelFunc) {
defer close(fsChan)
walkFn := w.createFSWalkFn(ctx, fsChan)
if len(w.Subs) == 0 {
if err := w.Filesystem.Walk(".", walkFn); err != nil {
haveCancel()
}
return
}
for _, sub := range w.Subs {
if err := w.Filesystem.Walk(sub, walkFn); err != nil {
haveCancel()
break
}
}
}
func (w *walker) createFSWalkFn(ctx context.Context, fsChan chan<- fsWalkResult) fs.WalkFunc {
now := time.Now()
return func(path string, info fs.FileInfo, err error) error {
// Return value used when we are returning early and don't want to
// process the item. For directories, this means do-not-descend.
var skip error // nil
@ -213,21 +255,14 @@ func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protoco
skip = fs.SkipDir
}
if err != nil {
l.Debugln("error:", path, info, err)
return skip
}
if path == "." {
if err != nil {
fsWalkError(ctx, fsChan, path, err)
return skip
}
return nil
}
info, err = w.Filesystem.Lstat(path)
// An error here would be weird as we've already gotten to this point, but act on it nonetheless
if err != nil {
return skip
}
if fs.IsTemporary(path) {
l.Debugln("temporary:", path)
if info.IsRegular() && info.ModTime().Add(w.TempLifetime).Before(now) {
@ -238,48 +273,177 @@ func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protoco
}
if fs.IsInternal(path) {
l.Debugln("ignored (internal):", path)
l.Debugln("skip walking (internal):", path)
return skip
}
if w.Matcher.Match(path).IsIgnored() {
l.Debugln("ignored (patterns):", path)
l.Debugln("skip walking (patterns):", path)
return skip
}
if err != nil {
if sendErr := fsWalkError(ctx, fsChan, path, err); sendErr != nil {
return sendErr
}
return skip
}
if !utf8.ValidString(path) {
if err := fsWalkError(ctx, fsChan, path, errors.New("path isn't a valid utf8 string")); err != nil {
return err
}
l.Warnf("File name %q is not in UTF8 encoding; skipping.", path)
return skip
}
path, shouldSkip := w.normalizePath(path, info)
if shouldSkip {
if err := fsWalkError(ctx, fsChan, path, errors.New("failed to normalize path")); err != nil {
return err
}
return skip
}
switch {
case info.IsSymlink():
if err := w.walkSymlink(ctx, path, dchan); err != nil {
return err
}
if info.IsDir() {
// under no circumstances shall we descend into a symlink
return fs.SkipDir
}
return nil
select {
case fsChan <- fsWalkResult{
path: path,
info: info,
err: nil,
}:
case <-ctx.Done():
return ctx.Err()
}
case info.IsDir():
err = w.walkDir(ctx, path, info, dchan)
case info.IsRegular():
err = w.walkRegular(ctx, path, info, fchan)
// under no circumstances shall we descend into a symlink
if info.IsSymlink() && info.IsDir() {
l.Debugln("skip walking (symlinked directory):", path)
return skip
}
return err
}
}
func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileInfo, fchan chan protocol.FileInfo) error {
func fsWalkError(ctx context.Context, dst chan<- fsWalkResult, path string, err error) error {
select {
case dst <- fsWalkResult{
path: path,
info: nil,
err: err,
}:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (w *walker) processWalkResults(ctx context.Context, fsChan <-chan fsWalkResult, haveChan <-chan protocol.FileInfo, toHashChan, finishedChan chan<- ScanResult) {
ctxChan := ctx.Done()
fsRes, fsChanOpen := <-fsChan
currDBFile, haveChanOpen := <-haveChan
for fsChanOpen {
if haveChanOpen {
// File infos below an error walking the filesystem tree
// may be marked as ignored but should not be deleted.
if fsRes.err != nil && (strings.HasPrefix(currDBFile.Name, fsRes.path+string(fs.PathSeparator)) || fsRes.path == ".") {
w.checkIgnoredAndInvalidate(currDBFile, finishedChan, ctxChan)
currDBFile, haveChanOpen = <-haveChan
continue
}
// Delete file infos that were not encountered when
// walking the filesystem tree, except on error (see
// above) or if they are ignored.
if currDBFile.Name < fsRes.path {
w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan)
currDBFile, haveChanOpen = <-haveChan
continue
}
}
var oldFile protocol.FileInfo
if haveChanOpen && currDBFile.Name == fsRes.path {
oldFile = currDBFile
currDBFile, haveChanOpen = <-haveChan
}
if fsRes.err != nil {
if fs.IsNotExist(fsRes.err) && !oldFile.IsEmpty() && !oldFile.Deleted {
select {
case finishedChan <- ScanResult{
New: oldFile.DeletedCopy(w.ShortID),
Old: oldFile,
}:
case <-ctx.Done():
return
}
}
fsRes, fsChanOpen = <-fsChan
continue
}
switch {
case fsRes.info.IsDir():
w.walkDir(ctx, fsRes.path, fsRes.info, oldFile, finishedChan)
case fsRes.info.IsSymlink():
w.walkSymlink(ctx, fsRes.path, oldFile, finishedChan)
case fsRes.info.IsRegular():
w.walkRegular(ctx, fsRes.path, fsRes.info, oldFile, toHashChan)
}
fsRes, fsChanOpen = <-fsChan
}
// Filesystem tree walking finished, if there is anything left in the
// db, mark it as deleted, except when it's ignored.
if haveChanOpen {
w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan)
for currDBFile = range haveChan {
w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan)
}
}
close(toHashChan)
}
func (w *walker) checkIgnoredAndDelete(f protocol.FileInfo, finishedChan chan<- ScanResult, done <-chan struct{}) {
if w.checkIgnoredAndInvalidate(f, finishedChan, done) {
return
}
if !f.Invalid && !f.Deleted {
select {
case finishedChan <- ScanResult{
New: f.DeletedCopy(w.ShortID),
Old: f,
}:
case <-done:
}
}
}
func (w *walker) checkIgnoredAndInvalidate(f protocol.FileInfo, finishedChan chan<- ScanResult, done <-chan struct{}) bool {
if !w.Matcher.Match(f.Name).IsIgnored() {
return false
}
if !f.Invalid {
select {
case finishedChan <- ScanResult{
New: f.InvalidatedCopy(w.ShortID),
Old: f,
}:
case <-done:
}
}
return true
}
func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileInfo, cf protocol.FileInfo, toHashChan chan<- ScanResult) {
curMode := uint32(info.Mode())
if runtime.GOOS == "windows" && osutil.IsWindowsExecutable(relPath) {
curMode |= 0111
@ -294,40 +458,38 @@ func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileIn
// - was not a symlink (since it's a file now)
// - was not invalid (since it looks valid now)
// - has the same size as previously
cf, ok := w.CurrentFiler.CurrentFile(relPath)
permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, curMode)
if ok && permUnchanged && !cf.IsDeleted() && cf.ModTime().Equal(info.ModTime()) && !cf.IsDirectory() &&
!cf.IsSymlink() && !cf.IsInvalid() && cf.Size == info.Size() {
return nil
}
if ok {
if !cf.IsEmpty() {
permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, curMode)
if permUnchanged && !cf.IsDeleted() && cf.ModTime().Equal(info.ModTime()) && !cf.IsDirectory() &&
!cf.IsSymlink() && !cf.IsInvalid() && cf.Size == info.Size() {
return
}
l.Debugln("rescan:", cf, info.ModTime().Unix(), info.Mode()&fs.ModePerm)
}
f := protocol.FileInfo{
Name: relPath,
Type: protocol.FileInfoTypeFile,
Version: cf.Version.Update(w.ShortID),
Permissions: curMode & uint32(maskModePerm),
NoPermissions: w.IgnorePerms,
ModifiedS: info.ModTime().Unix(),
ModifiedNs: int32(info.ModTime().Nanosecond()),
ModifiedBy: w.ShortID,
Size: info.Size(),
f := ScanResult{
New: protocol.FileInfo{
Name: relPath,
Type: protocol.FileInfoTypeFile,
Version: cf.Version.Update(w.ShortID),
Permissions: curMode & uint32(maskModePerm),
NoPermissions: w.IgnorePerms,
ModifiedS: info.ModTime().Unix(),
ModifiedNs: int32(info.ModTime().Nanosecond()),
ModifiedBy: w.ShortID,
Size: info.Size(),
},
Old: cf,
}
l.Debugln("to hash:", relPath, f)
select {
case fchan <- f:
case toHashChan <- f:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, dchan chan protocol.FileInfo) error {
func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, cf protocol.FileInfo, finishedChan chan<- ScanResult) {
// A directory is "unchanged", if it
// - exists
// - has the same permissions as previously, unless we are ignoring permissions
@ -335,40 +497,41 @@ func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo,
// - was a directory previously (not a file or something else)
// - was not a symlink (since it's a directory now)
// - was not invalid (since it looks valid now)
cf, ok := w.CurrentFiler.CurrentFile(relPath)
permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, uint32(info.Mode()))
if ok && permUnchanged && !cf.IsDeleted() && cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() {
return nil
if !cf.IsEmpty() {
permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, uint32(info.Mode()))
if permUnchanged && !cf.IsDeleted() && cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() {
return
}
}
f := protocol.FileInfo{
Name: relPath,
Type: protocol.FileInfoTypeDirectory,
Version: cf.Version.Update(w.ShortID),
Permissions: uint32(info.Mode() & maskModePerm),
NoPermissions: w.IgnorePerms,
ModifiedS: info.ModTime().Unix(),
ModifiedNs: int32(info.ModTime().Nanosecond()),
ModifiedBy: w.ShortID,
f := ScanResult{
New: protocol.FileInfo{
Name: relPath,
Type: protocol.FileInfoTypeDirectory,
Version: cf.Version.Update(w.ShortID),
Permissions: uint32(info.Mode() & maskModePerm),
NoPermissions: w.IgnorePerms,
ModifiedS: info.ModTime().Unix(),
ModifiedNs: int32(info.ModTime().Nanosecond()),
ModifiedBy: w.ShortID,
},
Old: cf,
}
l.Debugln("dir:", relPath, f)
select {
case dchan <- f:
case finishedChan <- f:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// walkSymlink returns nil or an error, if the error is of the nature that
// it should stop the entire walk.
func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan protocol.FileInfo) error {
func (w *walker) walkSymlink(ctx context.Context, relPath string, cf protocol.FileInfo, finishedChan chan<- ScanResult) {
// Symlinks are not supported on Windows. We ignore instead of returning
// an error.
if runtime.GOOS == "windows" {
return nil
return
}
// We always rehash symlinks as they have no modtime or
@ -379,7 +542,7 @@ func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan pro
target, err := w.Filesystem.ReadSymlink(relPath)
if err != nil {
l.Debugln("readlink error:", relPath, err)
return nil
return
}
// A symlink is "unchanged", if
@ -388,28 +551,27 @@ func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan pro
// - it was a symlink
// - it wasn't invalid
// - the target was the same
cf, ok := w.CurrentFiler.CurrentFile(relPath)
if ok && !cf.IsDeleted() && cf.IsSymlink() && !cf.IsInvalid() && cf.SymlinkTarget == target {
return nil
if !cf.IsEmpty() && !cf.IsDeleted() && cf.IsSymlink() && !cf.IsInvalid() && cf.SymlinkTarget == target {
return
}
f := protocol.FileInfo{
Name: relPath,
Type: protocol.FileInfoTypeSymlink,
Version: cf.Version.Update(w.ShortID),
NoPermissions: true, // Symlinks don't have permissions of their own
SymlinkTarget: target,
f := ScanResult{
New: protocol.FileInfo{
Name: relPath,
Type: protocol.FileInfoTypeSymlink,
Version: cf.Version.Update(w.ShortID),
NoPermissions: true, // Symlinks don't have permissions of their own
SymlinkTarget: target,
},
Old: cf,
}
l.Debugln("symlink changedb:", relPath, f)
select {
case dchan <- f:
case finishedChan <- f:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// normalizePath returns the normalized relative path (possibly after fixing
@ -532,10 +694,6 @@ func (c *byteCounter) Close() {
close(c.stop)
}
// A no-op CurrentFiler
type noHaveWalker struct{}
type noCurrentFiler struct{}
func (noCurrentFiler) CurrentFile(name string) (protocol.FileInfo, bool) {
return protocol.FileInfo{}, false
}
func (noHaveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) {}

View File

@ -69,7 +69,7 @@ func TestWalkSub(t *testing.T) {
Matcher: ignores,
Hashers: 2,
})
var files []protocol.FileInfo
var files []ScanResult
for f := range fchan {
files = append(files, f)
}
@ -80,10 +80,10 @@ func TestWalkSub(t *testing.T) {
if len(files) != 2 {
t.Fatalf("Incorrect length %d != 2", len(files))
}
if files[0].Name != "dir2" {
if files[0].New.Name != "dir2" {
t.Errorf("Incorrect file %v != dir2", files[0])
}
if files[1].Name != filepath.Join("dir2", "cfile") {
if files[1].New.Name != filepath.Join("dir2", "cfile") {
t.Errorf("Incorrect file %v != dir2/cfile", files[1])
}
}
@ -103,7 +103,7 @@ func TestWalk(t *testing.T) {
Hashers: 2,
})
var tmp []protocol.FileInfo
var tmp []ScanResult
for f := range fchan {
tmp = append(tmp, f)
}
@ -251,9 +251,9 @@ func TestNormalization(t *testing.T) {
}
func TestIssue1507(t *testing.T) {
w := &walker{}
c := make(chan protocol.FileInfo, 100)
fn := w.walkAndHashFiles(context.TODO(), c, c)
w := &walker{Config{Matcher: ignore.New(fs.NewFilesystem(fs.FilesystemTypeBasic, "."))}}
c := make(chan fsWalkResult, 100)
fn := w.createFSWalkFn(context.TODO(), c)
fn("", nil, protocol.ErrClosed)
}
@ -274,15 +274,14 @@ func TestWalkSymlinkUnix(t *testing.T) {
// Scan it
files, _ := walkDir(fs.NewFilesystem(fs.FilesystemTypeBasic, "_symlinks"), path)
// Verify that we got one symlink and with the correct attributes
if len(files) != 1 {
t.Errorf("expected 1 symlink, not %d", len(files))
}
if len(files[0].Blocks) != 0 {
t.Errorf("expected zero blocks for symlink, not %d", len(files[0].Blocks))
if len(files[0].New.Blocks) != 0 {
t.Errorf("expected zero blocks for symlink, not %d", len(files[0].New.Blocks))
}
if files[0].SymlinkTarget != "../testdata" {
t.Errorf("expected symlink to have target destination, not %q", files[0].SymlinkTarget)
if files[0].New.SymlinkTarget != "../testdata" {
t.Errorf("expected symlink to have target destination, not %q", files[0].New.SymlinkTarget)
}
}
}
@ -342,7 +341,7 @@ func TestWalkRootSymlink(t *testing.T) {
}
}
func walkDir(fs fs.Filesystem, dir string) ([]protocol.FileInfo, error) {
func walkDir(fs fs.Filesystem, dir string) ([]ScanResult, error) {
fchan := Walk(context.TODO(), Config{
Filesystem: fs,
Subs: []string{dir},
@ -351,7 +350,7 @@ func walkDir(fs fs.Filesystem, dir string) ([]protocol.FileInfo, error) {
Hashers: 2,
})
var tmp []protocol.FileInfo
var tmp []ScanResult
for f := range fchan {
tmp = append(tmp, f)
}
@ -360,14 +359,14 @@ func walkDir(fs fs.Filesystem, dir string) ([]protocol.FileInfo, error) {
return tmp, nil
}
type fileList []protocol.FileInfo
type fileList []ScanResult
func (l fileList) Len() int {
return len(l)
}
func (l fileList) Less(a, b int) bool {
return l[a].Name < l[b].Name
return l[a].New.Name < l[b].New.Name
}
func (l fileList) Swap(a, b int) {
@ -377,12 +376,12 @@ func (l fileList) Swap(a, b int) {
func (l fileList) testfiles() testfileList {
testfiles := make(testfileList, len(l))
for i, f := range l {
if len(f.Blocks) > 1 {
if len(f.New.Blocks) > 1 {
panic("simple test case stuff only supports a single block per file")
}
testfiles[i] = testfile{name: f.Name, length: f.FileSize()}
if len(f.Blocks) == 1 {
testfiles[i].hash = fmt.Sprintf("%x", f.Blocks[0].Hash)
testfiles[i] = testfile{name: f.New.Name, length: f.New.FileSize()}
if len(f.New.Blocks) == 1 {
testfiles[i].hash = fmt.Sprintf("%x", f.New.Blocks[0].Hash)
}
}
return testfiles
@ -465,13 +464,13 @@ func TestStopWalk(t *testing.T) {
for {
f := <-fchan
t.Log("Scanned", f)
if f.IsDirectory() {
if len(f.Name) == 0 || f.Permissions == 0 {
if f.New.IsDirectory() {
if len(f.New.Name) == 0 || f.New.Permissions == 0 {
t.Error("Bad directory entry", f)
}
dirs++
} else {
if len(f.Name) == 0 || len(f.Blocks) == 0 || f.Permissions == 0 {
if len(f.New.Name) == 0 || len(f.New.Blocks) == 0 || f.New.Permissions == 0 {
t.Error("Bad file entry", f)
}
files++
@ -529,3 +528,69 @@ func verify(r io.Reader, blocksize int, blocks []protocol.BlockInfo) error {
return nil
}
// The following (randomish) scenario produced an error uncovered by integration tests
func TestWalkIntegration(t *testing.T) {
tmpDir, err := ioutil.TempDir(".", "_request-")
if err != nil {
panic("Failed to create temporary testing dir")
}
defer os.RemoveAll(tmpDir)
fs := fs.NewFilesystem(fs.FilesystemTypeBasic, tmpDir)
fs.Mkdir("a", 0777)
toDel := filepath.Join("a", "b")
for _, f := range []string{"b", toDel} {
fi, err := fs.Create(f)
if err != nil {
panic(err)
}
fi.Close()
}
conf := Config{
Filesystem: fs,
BlockSize: 128 * 1024,
Hashers: 2,
}
rchan := Walk(context.TODO(), conf)
var res []ScanResult
for r := range rchan {
res = append(res, r)
}
sort.Sort(fileList(res))
thw := make([]protocol.FileInfo, 0, len(res))
for _, r := range res {
thw = append(thw, r.New)
}
conf.Have = testHaveWalker(thw)
if err = fs.Remove(toDel); err != nil {
panic(err)
}
rchan = Walk(context.TODO(), conf)
for r := range rchan {
if r.New.Name != toDel {
t.Fatalf("Received unexpected result %v", r)
}
}
}
type testHaveWalker []protocol.FileInfo
func (thw testHaveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) {
if prefix != "" {
panic("cannot walk with prefix")
}
for _, f := range thw {
select {
case out <- f:
case <-ctx.Done():
return
}
}
}