lib/fs, lib/model, lib/scanner: Make scans cancellable (fixes #3965)

The folder already knew how to stop properly, but the fs.Walk() didn't
and can potentially take a very long time. This adds context support to
Walk and the underlying scanning stuff, and passes in an appropriate
context from above. The stop channel in model.folder is replaced with a
context for this purpose.

To test I added an infiniteFS that represents a large amount of data
(not actually infinite, but close) and verify that walking it is
properly stopped. For that to be implemented smoothly I moved out the
Walk function to it's own type, as typically the implementer of a new
filesystem type might not need or want to reimplement Walk.

It's somewhat tricky to test that this actually works properly on the
actual sendReceiveFolder and so on, as those are started from inside the
model and the filesystem isn't easily pluggable etc. Instead I've tested
that part manually by adding a huge folder and verifying that pause,
resume and reconfig do the right things by looking at debug output.

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4117
This commit is contained in:
Jakob Borg 2017-04-26 00:15:23 +00:00 committed by Audrius Butkevicius
parent bdaef44765
commit d6fbfc3545
17 changed files with 282 additions and 74 deletions

View File

@ -7,6 +7,7 @@
package main
import (
"context"
"flag"
"log"
"os"
@ -70,7 +71,7 @@ func main() {
if *standardBlocks || blockSize < protocol.BlockSize {
blockSize = protocol.BlockSize
}
bs, err := scanner.Blocks(fd, blockSize, fi.Size(), nil, true)
bs, err := scanner.Blocks(context.TODO(), fd, blockSize, fi.Size(), nil, true)
if err != nil {
log.Fatal(err)
}

View File

@ -8,6 +8,7 @@ package main
import (
"bytes"
"context"
"crypto/rand"
"crypto/tls"
"encoding/json"
@ -309,7 +310,7 @@ func cpuBenchOnce(duration time.Duration, useWeakHash bool, bs []byte) float64 {
b := 0
for time.Since(t0) < duration {
r := bytes.NewReader(bs)
blocksResult, _ = scanner.Blocks(r, protocol.BlockSize, int64(len(bs)), nil, useWeakHash)
blocksResult, _ = scanner.Blocks(context.TODO(), r, protocol.BlockSize, int64(len(bs)), nil, useWeakHash)
b += len(bs)
}
d := time.Since(t0)

View File

@ -7,6 +7,7 @@
package fs
import (
"errors"
"os"
"time"
)
@ -86,6 +87,11 @@ func (f *BasicFilesystem) Create(name string) (File, error) {
return fsFile{fd}, err
}
func (f *BasicFilesystem) Walk(root string, walkFn WalkFunc) error {
// implemented in WalkFilesystem
return errors.New("not implemented")
}
// fsFile implements the fs.File interface on top of an os.File
type fsFile struct {
*os.File

View File

@ -64,7 +64,7 @@ const ModePerm = FileMode(os.ModePerm)
// DefaultFilesystem is the fallback to use when nothing explicitly has
// been passed.
var DefaultFilesystem Filesystem = NewBasicFilesystem()
var DefaultFilesystem Filesystem = NewWalkFilesystem(NewBasicFilesystem())
// SkipDir is used as a return value from WalkFuncs to indicate that
// the directory named in the call is to be skipped. It is not returned

View File

@ -28,8 +28,16 @@ import "path/filepath"
// Walk skips the remaining files in the containing directory.
type WalkFunc func(path string, info FileInfo, err error) error
type WalkFilesystem struct {
Filesystem
}
func NewWalkFilesystem(next Filesystem) *WalkFilesystem {
return &WalkFilesystem{next}
}
// walk recursively descends path, calling walkFn.
func (f *BasicFilesystem) walk(path string, info FileInfo, walkFn WalkFunc) error {
func (f *WalkFilesystem) walk(path string, info FileInfo, walkFn WalkFunc) error {
err := walkFn(path, info, nil)
if err != nil {
if info.IsDir() && err == SkipDir {
@ -72,7 +80,7 @@ func (f *BasicFilesystem) walk(path string, info FileInfo, walkFn WalkFunc) erro
// order, which makes the output deterministic but means that for very
// large directories Walk can be inefficient.
// Walk does not follow symbolic links.
func (f *BasicFilesystem) Walk(root string, walkFn WalkFunc) error {
func (f *WalkFilesystem) Walk(root string, walkFn WalkFunc) error {
info, err := f.Lstat(root)
if err != nil {
return walkFn(root, nil, err)

View File

@ -6,14 +6,18 @@
package model
import "time"
import (
"context"
"time"
)
type folder struct {
stateTracker
scan folderScanner
model *Model
stop chan struct{}
ctx context.Context
cancel context.CancelFunc
initialScanFinished chan struct{}
}
@ -28,8 +32,9 @@ func (f *folder) Scan(subdirs []string) error {
<-f.initialScanFinished
return f.scan.Scan(subdirs)
}
func (f *folder) Stop() {
close(f.stop)
f.cancel()
}
func (f *folder) Jobs() ([]string, []string) {
@ -39,7 +44,7 @@ func (f *folder) Jobs() ([]string, []string) {
func (f *folder) BringToFront(string) {}
func (f *folder) scanSubdirs(subDirs []string) error {
if err := f.model.internalScanFolderSubdirs(f.folderID, subDirs); err != nil {
if err := f.model.internalScanFolderSubdirs(f.ctx, f.folderID, subDirs); err != nil {
// Potentially sets the error twice, once in the scanner just
// by doing a check, and once here, if the error returned is
// the same one as returned by CheckFolderHealth, though

View File

@ -7,6 +7,7 @@
package model
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
@ -1715,7 +1716,7 @@ func (m *Model) ScanFolderSubdirs(folder string, subs []string) error {
return runner.Scan(subs)
}
func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error {
func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, subDirs []string) error {
for i := 0; i < len(subDirs); i++ {
sub := osutil.NativeFilename(subDirs[i])
@ -1785,14 +1786,9 @@ func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error
return ok
})
// The cancel channel is closed whenever we return (such as from an error),
// to signal the potentially still running walker to stop.
cancel := make(chan struct{})
defer close(cancel)
runner.setState(FolderScanning)
fchan, err := scanner.Walk(scanner.Config{
fchan, err := scanner.Walk(ctx, scanner.Config{
Folder: folderCfg.ID,
Dir: folderCfg.Path(),
Subs: subDirs,
@ -1806,7 +1802,6 @@ func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error
Hashers: m.numHashers(folder),
ShortID: m.shortID,
ProgressTickIntervalS: folderCfg.ScanProgressIntervalS,
Cancel: cancel,
UseWeakHashes: weakhash.Enabled,
})

View File

@ -8,6 +8,7 @@ package model
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
@ -317,7 +318,7 @@ func (f *fakeConnection) addFile(name string, flags uint32, ftype protocol.FileI
f.mut.Lock()
defer f.mut.Unlock()
blocks, _ := scanner.Blocks(bytes.NewReader(data), protocol.BlockSize, int64(len(data)), nil, true)
blocks, _ := scanner.Blocks(context.TODO(), bytes.NewReader(data), protocol.BlockSize, int64(len(data)), nil, true)
var version protocol.Vector
version = version.Update(f.id.Short())

View File

@ -7,6 +7,7 @@
package model
import (
"context"
"fmt"
"github.com/syncthing/syncthing/lib/config"
@ -24,11 +25,14 @@ type sendOnlyFolder struct {
}
func newSendOnlyFolder(model *Model, cfg config.FolderConfiguration, _ versioner.Versioner, _ *fs.MtimeFS) service {
ctx, cancel := context.WithCancel(context.Background())
return &sendOnlyFolder{
folder: folder{
stateTracker: newStateTracker(cfg.ID),
scan: newFolderScanner(cfg),
stop: make(chan struct{}),
ctx: ctx,
cancel: cancel,
model: model,
initialScanFinished: make(chan struct{}),
},
@ -46,7 +50,7 @@ func (f *sendOnlyFolder) Serve() {
for {
select {
case <-f.stop:
case <-f.ctx.Done():
return
case <-f.scan.timer.C:

View File

@ -7,6 +7,7 @@
package model
import (
"context"
"errors"
"fmt"
"math/rand"
@ -99,11 +100,14 @@ type sendReceiveFolder struct {
}
func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Versioner, mtimeFS *fs.MtimeFS) service {
ctx, cancel := context.WithCancel(context.Background())
f := &sendReceiveFolder{
folder: folder{
stateTracker: newStateTracker(cfg.ID),
scan: newFolderScanner(cfg),
stop: make(chan struct{}),
ctx: ctx,
cancel: cancel,
model: model,
initialScanFinished: make(chan struct{}),
},
@ -171,7 +175,7 @@ func (f *sendReceiveFolder) Serve() {
for {
select {
case <-f.stop:
case <-f.ctx.Done():
return
case <-f.remoteIndex:
@ -492,7 +496,7 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher) int {
nextFile:
for {
select {
case <-f.stop:
case <-f.ctx.Done():
// Stop processing files if the puller has been told to stop.
break nextFile
default:
@ -1076,7 +1080,7 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c
// Check for an old temporary file which might have some blocks we could
// reuse.
tempBlocks, err := scanner.HashFile(fs.DefaultFilesystem, tempName, protocol.BlockSize, nil, false)
tempBlocks, err := scanner.HashFile(f.ctx, fs.DefaultFilesystem, tempName, protocol.BlockSize, nil, false)
if err == nil {
// Check for any reusable blocks in the temp file
tempCopyBlocks, _ := scanner.BlockDiff(tempBlocks, file.Blocks)

View File

@ -7,6 +7,7 @@
package model
import (
"context"
"crypto/rand"
"io"
"os"
@ -83,6 +84,7 @@ func setUpSendReceiveFolder(model *Model) *sendReceiveFolder {
stateTracker: newStateTracker("default"),
model: model,
initialScanFinished: make(chan struct{}),
ctx: context.TODO(),
},
mtimeFS: fs.NewMtimeFS(fs.DefaultFilesystem, db.NewNamespacedKV(model.db, "mtime")),
@ -244,7 +246,7 @@ func TestCopierFinder(t *testing.T) {
}
// Verify that the fetched blocks have actually been written to the temp file
blks, err := scanner.HashFile(fs.DefaultFilesystem, tempFile, protocol.BlockSize, nil, false)
blks, err := scanner.HashFile(context.TODO(), fs.DefaultFilesystem, tempFile, protocol.BlockSize, nil, false)
if err != nil {
t.Log(err)
}
@ -297,7 +299,7 @@ func TestWeakHash(t *testing.T) {
// File 1: abcdefgh
// File 2: xyabcdef
f.Seek(0, os.SEEK_SET)
existing, err := scanner.Blocks(f, protocol.BlockSize, size, nil, true)
existing, err := scanner.Blocks(context.TODO(), f, protocol.BlockSize, size, nil, true)
if err != nil {
t.Error(err)
}
@ -306,7 +308,7 @@ func TestWeakHash(t *testing.T) {
remainder := io.LimitReader(f, size-shift)
prefix := io.LimitReader(rand.Reader, shift)
nf := io.MultiReader(prefix, remainder)
desired, err := scanner.Blocks(nf, protocol.BlockSize, size, nil, true)
desired, err := scanner.Blocks(context.TODO(), nf, protocol.BlockSize, size, nil, true)
if err != nil {
t.Error(err)
}

View File

@ -7,6 +7,7 @@
package scanner
import (
"context"
"errors"
"path/filepath"
@ -16,7 +17,7 @@ import (
)
// HashFile hashes the files and returns a list of blocks representing the file.
func HashFile(fs fs.Filesystem, path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) {
func HashFile(ctx context.Context, fs fs.Filesystem, path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) {
fd, err := fs.Open(path)
if err != nil {
l.Debugln("open:", err)
@ -36,7 +37,7 @@ func HashFile(fs fs.Filesystem, path string, blockSize int, counter Counter, use
// Hash the file. This may take a while for large files.
blocks, err := Blocks(fd, blockSize, size, counter, useWeakHashes)
blocks, err := Blocks(ctx, fd, blockSize, size, counter, useWeakHashes)
if err != nil {
l.Debugln("blocks:", err)
return nil, err
@ -70,12 +71,11 @@ type parallelHasher struct {
inbox <-chan protocol.FileInfo
counter Counter
done chan<- struct{}
cancel <-chan struct{}
useWeakHashes bool
wg sync.WaitGroup
}
func newParallelHasher(fs fs.Filesystem, dir string, blockSize, workers int, outbox chan<- protocol.FileInfo, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}, cancel <-chan struct{}, useWeakHashes bool) {
func newParallelHasher(ctx context.Context, fs fs.Filesystem, dir string, blockSize, workers int, outbox chan<- protocol.FileInfo, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}, useWeakHashes bool) {
ph := &parallelHasher{
fs: fs,
dir: dir,
@ -85,20 +85,19 @@ func newParallelHasher(fs fs.Filesystem, dir string, blockSize, workers int, out
inbox: inbox,
counter: counter,
done: done,
cancel: cancel,
useWeakHashes: useWeakHashes,
wg: sync.NewWaitGroup(),
}
for i := 0; i < workers; i++ {
ph.wg.Add(1)
go ph.hashFiles()
go ph.hashFiles(ctx)
}
go ph.closeWhenDone()
}
func (ph *parallelHasher) hashFiles() {
func (ph *parallelHasher) hashFiles(ctx context.Context) {
defer ph.wg.Done()
for {
@ -112,7 +111,7 @@ func (ph *parallelHasher) hashFiles() {
panic("Bug. Asked to hash a directory or a deleted file.")
}
blocks, err := HashFile(ph.fs, filepath.Join(ph.dir, f.Name), ph.blockSize, ph.counter, ph.useWeakHashes)
blocks, err := HashFile(ctx, ph.fs, filepath.Join(ph.dir, f.Name), ph.blockSize, ph.counter, ph.useWeakHashes)
if err != nil {
l.Debugln("hash error:", f.Name, err)
continue
@ -131,11 +130,11 @@ func (ph *parallelHasher) hashFiles() {
select {
case ph.outbox <- f:
case <-ph.cancel:
case <-ctx.Done():
return
}
case <-ph.cancel:
case <-ctx.Done():
return
}
}

View File

@ -8,6 +8,7 @@ package scanner
import (
"bytes"
"context"
"fmt"
"hash"
"io"
@ -24,7 +25,7 @@ type Counter interface {
}
// Blocks returns the blockwise hash of the reader.
func Blocks(r io.Reader, blocksize int, sizehint int64, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) {
func Blocks(ctx context.Context, r io.Reader, blocksize int, sizehint int64, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) {
hf := sha256.New()
hashLength := hf.Size()
@ -57,6 +58,12 @@ func Blocks(r io.Reader, blocksize int, sizehint int64, counter Counter, useWeak
var offset int64
lr := io.LimitReader(r, int64(blocksize)).(*io.LimitedReader)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
lr.N = int64(blocksize)
n, err := io.CopyBuffer(mhf, lr, buf)
if err != nil {

View File

@ -8,6 +8,7 @@ package scanner
import (
"bytes"
"context"
"crypto/rand"
"fmt"
origAdler32 "hash/adler32"
@ -68,7 +69,7 @@ var blocksTestData = []struct {
func TestBlocks(t *testing.T) {
for testNo, test := range blocksTestData {
buf := bytes.NewBuffer(test.data)
blocks, err := Blocks(buf, test.blocksize, -1, nil, true)
blocks, err := Blocks(context.TODO(), buf, test.blocksize, -1, nil, true)
if err != nil {
t.Fatal(err)
@ -125,8 +126,8 @@ var diffTestData = []struct {
func TestDiff(t *testing.T) {
for i, test := range diffTestData {
a, _ := Blocks(bytes.NewBufferString(test.a), test.s, -1, nil, false)
b, _ := Blocks(bytes.NewBufferString(test.b), test.s, -1, nil, false)
a, _ := Blocks(context.TODO(), bytes.NewBufferString(test.a), test.s, -1, nil, false)
b, _ := Blocks(context.TODO(), bytes.NewBufferString(test.b), test.s, -1, nil, false)
_, d := BlockDiff(a, b)
if len(d) != len(test.d) {
t.Fatalf("Incorrect length for diff %d; %d != %d", i, len(d), len(test.d))

View File

@ -0,0 +1,103 @@
// Copyright (C) 2017 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package scanner
import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/syncthing/syncthing/lib/fs"
)
type infiniteFS struct {
width int // number of files and directories per level
depth int // number of tree levels to simulate
filesize int64 // size of each file in bytes
}
var errNotSupp = errors.New("not supported")
func (i infiniteFS) Lstat(name string) (fs.FileInfo, error) {
return fakeInfo{name, i.filesize}, nil
}
func (i infiniteFS) DirNames(name string) ([]string, error) {
// Returns a list of fake files and directories. Names are such that
// files appear before directories - this makes it so the scanner will
// actually see a few files without having to reach the max depth.
var names []string
for j := 0; j < i.width; j++ {
names = append(names, fmt.Sprintf("aa-file-%d", j))
}
if len(strings.Split(name, string(os.PathSeparator))) < i.depth {
for j := 0; j < i.width; j++ {
names = append(names, fmt.Sprintf("zz-dir-%d", j))
}
}
return names, nil
}
func (i infiniteFS) Open(name string) (fs.File, error) {
return &fakeFile{name, i.filesize, 0}, nil
}
func (infiniteFS) Chmod(name string, mode fs.FileMode) error { return errNotSupp }
func (infiniteFS) Chtimes(name string, atime time.Time, mtime time.Time) error { return errNotSupp }
func (infiniteFS) Create(name string) (fs.File, error) { return nil, errNotSupp }
func (infiniteFS) CreateSymlink(name, target string) error { return errNotSupp }
func (infiniteFS) Mkdir(name string, perm fs.FileMode) error { return errNotSupp }
func (infiniteFS) ReadSymlink(name string) (string, error) { return "", errNotSupp }
func (infiniteFS) Remove(name string) error { return errNotSupp }
func (infiniteFS) Rename(oldname, newname string) error { return errNotSupp }
func (infiniteFS) Stat(name string) (fs.FileInfo, error) { return nil, errNotSupp }
func (infiniteFS) SymlinksSupported() bool { return false }
func (infiniteFS) Walk(root string, walkFn fs.WalkFunc) error { return errNotSupp }
type fakeInfo struct {
name string
size int64
}
func (f fakeInfo) Name() string { return f.name }
func (f fakeInfo) Mode() fs.FileMode { return 0755 }
func (f fakeInfo) Size() int64 { return f.size }
func (f fakeInfo) ModTime() time.Time { return time.Unix(1234567890, 0) }
func (f fakeInfo) IsDir() bool { return strings.Contains(filepath.Base(f.name), "dir") }
func (f fakeInfo) IsRegular() bool { return !f.IsDir() }
func (f fakeInfo) IsSymlink() bool { return false }
type fakeFile struct {
name string
size int64
readOffset int64
}
func (f *fakeFile) Read(bs []byte) (int, error) {
remaining := f.size - f.readOffset
if remaining == 0 {
return 0, io.EOF
}
if remaining < int64(len(bs)) {
f.readOffset = f.size
return int(remaining), nil
}
f.readOffset += int64(len(bs))
return len(bs), nil
}
func (f *fakeFile) Stat() (fs.FileInfo, error) {
return fakeInfo{f.name, f.size}, nil
}
func (f *fakeFile) WriteAt(bs []byte, offs int64) (int, error) { return 0, errNotSupp }
func (f *fakeFile) Close() error { return nil }
func (f *fakeFile) Truncate(size int64) error { return errNotSupp }

View File

@ -7,6 +7,7 @@
package scanner
import (
"context"
"errors"
"path/filepath"
"runtime"
@ -69,8 +70,6 @@ type Config struct {
// Optional progress tick interval which defines how often FolderScanProgress
// events are emitted. Negative number means disabled.
ProgressTickIntervalS int
// Signals cancel from the outside - when closed, we should stop walking.
Cancel chan struct{}
// Whether or not we should also compute weak hashes
UseWeakHashes bool
}
@ -80,7 +79,7 @@ type CurrentFiler interface {
CurrentFile(name string) (protocol.FileInfo, bool)
}
func Walk(cfg Config) (chan protocol.FileInfo, error) {
func Walk(ctx context.Context, cfg Config) (chan protocol.FileInfo, error) {
w := walker{cfg}
if w.CurrentFiler == nil {
@ -90,7 +89,7 @@ func Walk(cfg Config) (chan protocol.FileInfo, error) {
w.Filesystem = fs.DefaultFilesystem
}
return w.walk()
return w.walk(ctx)
}
type walker struct {
@ -99,7 +98,7 @@ type walker struct {
// Walk returns the list of files found in the local folder by scanning the
// file system. Files are blockwise hashed.
func (w *walker) walk() (chan protocol.FileInfo, error) {
func (w *walker) walk(ctx context.Context) (chan protocol.FileInfo, error) {
l.Debugln("Walk", w.Dir, w.Subs, w.BlockSize, w.Matcher)
if err := w.checkDir(); err != nil {
@ -112,7 +111,7 @@ func (w *walker) walk() (chan protocol.FileInfo, error) {
// A routine which walks the filesystem tree, and sends files which have
// been modified to the counter routine.
go func() {
hashFiles := w.walkAndHashFiles(toHashChan, finishedChan)
hashFiles := w.walkAndHashFiles(ctx, toHashChan, finishedChan)
if len(w.Subs) == 0 {
w.Filesystem.Walk(w.Dir, hashFiles)
} else {
@ -126,7 +125,7 @@ func (w *walker) walk() (chan protocol.FileInfo, error) {
// We're not required to emit scan progress events, just kick off hashers,
// and feed inputs directly from the walker.
if w.ProgressTickIntervalS < 0 {
newParallelHasher(w.Filesystem, w.Dir, w.BlockSize, w.Hashers, finishedChan, toHashChan, nil, nil, w.Cancel, w.UseWeakHashes)
newParallelHasher(ctx, w.Filesystem, w.Dir, w.BlockSize, w.Hashers, finishedChan, toHashChan, nil, nil, w.UseWeakHashes)
return finishedChan, nil
}
@ -157,7 +156,7 @@ func (w *walker) walk() (chan protocol.FileInfo, error) {
done := make(chan struct{})
progress := newByteCounter()
newParallelHasher(w.Filesystem, w.Dir, w.BlockSize, w.Hashers, finishedChan, realToHashChan, progress, done, w.Cancel, w.UseWeakHashes)
newParallelHasher(ctx, w.Filesystem, w.Dir, w.BlockSize, w.Hashers, finishedChan, realToHashChan, progress, done, w.UseWeakHashes)
// A routine which actually emits the FolderScanProgress events
// every w.ProgressTicker ticks, until the hasher routines terminate.
@ -180,7 +179,7 @@ func (w *walker) walk() (chan protocol.FileInfo, error) {
"total": total,
"rate": rate, // bytes per second
})
case <-w.Cancel:
case <-ctx.Done():
ticker.Stop()
return
}
@ -192,7 +191,7 @@ func (w *walker) walk() (chan protocol.FileInfo, error) {
l.Debugln("real to hash:", file.Name)
select {
case realToHashChan <- file:
case <-w.Cancel:
case <-ctx.Done():
break loop
}
}
@ -202,9 +201,15 @@ func (w *walker) walk() (chan protocol.FileInfo, error) {
return finishedChan, nil
}
func (w *walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) fs.WalkFunc {
func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protocol.FileInfo) fs.WalkFunc {
now := time.Now()
return func(absPath string, info fs.FileInfo, err error) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Return value used when we are returning early and don't want to
// process the item. For directories, this means do-not-descend.
var skip error // nil
@ -265,7 +270,7 @@ func (w *walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) fs.WalkFu
switch {
case info.IsSymlink():
if err := w.walkSymlink(absPath, relPath, dchan); err != nil {
if err := w.walkSymlink(ctx, absPath, relPath, dchan); err != nil {
return err
}
if info.IsDir() {
@ -275,17 +280,17 @@ func (w *walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) fs.WalkFu
return nil
case info.IsDir():
err = w.walkDir(relPath, info, dchan)
err = w.walkDir(ctx, relPath, info, dchan)
case info.IsRegular():
err = w.walkRegular(relPath, info, fchan)
err = w.walkRegular(ctx, relPath, info, fchan)
}
return err
}
}
func (w *walker) walkRegular(relPath string, info fs.FileInfo, fchan chan protocol.FileInfo) error {
func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileInfo, fchan chan protocol.FileInfo) error {
curMode := uint32(info.Mode())
if runtime.GOOS == "windows" && osutil.IsWindowsExecutable(relPath) {
curMode |= 0111
@ -326,14 +331,14 @@ func (w *walker) walkRegular(relPath string, info fs.FileInfo, fchan chan protoc
select {
case fchan <- f:
case <-w.Cancel:
return errors.New("cancelled")
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (w *walker) walkDir(relPath string, info fs.FileInfo, dchan chan protocol.FileInfo) error {
func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, dchan chan protocol.FileInfo) error {
// A directory is "unchanged", if it
// - exists
// - has the same permissions as previously, unless we are ignoring permissions
@ -361,8 +366,8 @@ func (w *walker) walkDir(relPath string, info fs.FileInfo, dchan chan protocol.F
select {
case dchan <- f:
case <-w.Cancel:
return errors.New("cancelled")
case <-ctx.Done():
return ctx.Err()
}
return nil
@ -370,7 +375,7 @@ func (w *walker) walkDir(relPath string, info fs.FileInfo, dchan chan protocol.F
// walkSymlink returns nil or an error, if the error is of the nature that
// it should stop the entire walk.
func (w *walker) walkSymlink(absPath, relPath string, dchan chan protocol.FileInfo) error {
func (w *walker) walkSymlink(ctx context.Context, absPath, relPath string, dchan chan protocol.FileInfo) error {
// Symlinks are not supported on Windows. We ignore instead of returning
// an error.
if runtime.GOOS == "windows" {
@ -412,8 +417,8 @@ func (w *walker) walkSymlink(absPath, relPath string, dchan chan protocol.FileIn
select {
case dchan <- f:
case <-w.Cancel:
return errors.New("cancelled")
case <-ctx.Done():
return ctx.Err()
}
return nil

View File

@ -8,6 +8,7 @@ package scanner
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io"
@ -59,7 +60,7 @@ func TestWalkSub(t *testing.T) {
t.Fatal(err)
}
fchan, err := Walk(Config{
fchan, err := Walk(context.TODO(), Config{
Dir: "testdata",
Subs: []string{"dir2"},
BlockSize: 128 * 1024,
@ -96,7 +97,7 @@ func TestWalk(t *testing.T) {
}
t.Log(ignores)
fchan, err := Walk(Config{
fchan, err := Walk(context.TODO(), Config{
Dir: "testdata",
BlockSize: 128 * 1024,
Matcher: ignores,
@ -120,7 +121,7 @@ func TestWalk(t *testing.T) {
}
func TestWalkError(t *testing.T) {
_, err := Walk(Config{
_, err := Walk(context.TODO(), Config{
Dir: "testdata-missing",
BlockSize: 128 * 1024,
Hashers: 2,
@ -130,7 +131,7 @@ func TestWalkError(t *testing.T) {
t.Error("no error from missing directory")
}
_, err = Walk(Config{
_, err = Walk(context.TODO(), Config{
Dir: "testdata/bar",
BlockSize: 128 * 1024,
})
@ -148,7 +149,7 @@ func TestVerify(t *testing.T) {
progress := newByteCounter()
defer progress.Close()
blocks, err := Blocks(buf, blocksize, -1, progress, false)
blocks, err := Blocks(context.TODO(), buf, blocksize, -1, progress, false)
if err != nil {
t.Fatal(err)
}
@ -276,7 +277,7 @@ func TestNormalization(t *testing.T) {
func TestIssue1507(t *testing.T) {
w := &walker{}
c := make(chan protocol.FileInfo, 100)
fn := w.walkAndHashFiles(c, c)
fn := w.walkAndHashFiles(context.TODO(), c, c)
fn("", nil, protocol.ErrClosed)
}
@ -297,7 +298,7 @@ func TestWalkSymlinkUnix(t *testing.T) {
// Scan it
fchan, err := Walk(Config{
fchan, err := Walk(context.TODO(), Config{
Dir: "_symlinks",
BlockSize: 128 * 1024,
})
@ -342,7 +343,7 @@ func TestWalkSymlinkWindows(t *testing.T) {
// Scan it
fchan, err := Walk(Config{
fchan, err := Walk(context.TODO(), Config{
Dir: "_symlinks",
BlockSize: 128 * 1024,
})
@ -364,7 +365,7 @@ func TestWalkSymlinkWindows(t *testing.T) {
}
func walkDir(dir string) ([]protocol.FileInfo, error) {
fchan, err := Walk(Config{
fchan, err := Walk(context.TODO(), Config{
Dir: dir,
BlockSize: 128 * 1024,
AutoNormalize: true,
@ -434,7 +435,7 @@ func BenchmarkHashFile(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := HashFile(fs.DefaultFilesystem, testdataName, protocol.BlockSize, nil, true); err != nil {
if _, err := HashFile(context.TODO(), fs.DefaultFilesystem, testdataName, protocol.BlockSize, nil, true); err != nil {
b.Fatal(err)
}
}
@ -458,3 +459,68 @@ func initTestFile() {
panic(err)
}
}
func TestStopWalk(t *testing.T) {
// Create tree that is 100 levels deep, with each level containing 100
// files (each 1 MB) and 100 directories (in turn containing 100 files
// and 100 directories, etc). That is, in total > 100^100 files and as
// many directories. It'll take a while to scan, giving us time to
// cancel it and make sure the scan stops.
fs := fs.NewWalkFilesystem(&infiniteFS{100, 100, 1e6})
const numHashers = 4
ctx, cancel := context.WithCancel(context.Background())
fchan, err := Walk(ctx, Config{
Dir: "testdir",
BlockSize: 128 * 1024,
Hashers: numHashers,
Filesystem: fs,
ProgressTickIntervalS: -1, // Don't attempt to build the full list of files before starting to scan...
})
if err != nil {
t.Fatal(err)
}
// Receive a few entries to make sure the walker is up and running,
// scanning both files and dirs. Do some quick sanity tests on the
// returned file entries to make sure we are not just reading crap from
// a closed channel or something.
dirs := 0
files := 0
for {
f := <-fchan
t.Log("Scanned", f)
if f.IsDirectory() {
if len(f.Name) == 0 || f.Permissions == 0 {
t.Error("Bad directory entry", f)
}
dirs++
} else {
if len(f.Name) == 0 || len(f.Blocks) == 0 || f.Permissions == 0 {
t.Error("Bad file entry", f)
}
files++
}
if dirs > 5 && files > 5 {
break
}
}
// Cancel the walker.
cancel()
// Empty out any waiting entries and wait for the channel to close.
// Count them, they should be zero or very few - essentially, each
// hasher has the choice of returning a fully handled entry or
// cancelling, but they should not start on another item.
extra := 0
for range fchan {
extra++
}
t.Log("Extra entries:", extra)
if extra > numHashers {
t.Error("unexpected extra entries received after cancel")
}
}