Merge pull request #1588 from calmh/dbcommitter

Use separate routine for database updates in puller
This commit is contained in:
Audrius Butkevicius 2015-04-05 20:43:46 +01:00
commit 23dab30ca5
4 changed files with 88 additions and 28 deletions

View File

@ -1027,17 +1027,14 @@ func sendIndexTo(initial bool, minLocalVer int64, conn protocol.Connection, fold
return maxLocalVer, err
}
func (m *Model) updateLocal(folder string, f protocol.FileInfo) {
f.LocalVersion = 0
func (m *Model) updateLocals(folder string, fs []protocol.FileInfo) {
m.fmut.RLock()
m.folderFiles[folder].Update(protocol.LocalDeviceID, []protocol.FileInfo{f})
m.folderFiles[folder].Update(protocol.LocalDeviceID, fs)
m.fmut.RUnlock()
events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
"folder": folder,
"name": f.Name,
"modified": time.Unix(f.Modified, 0),
"flags": fmt.Sprintf("0%o", f.Flags),
"size": f.Size(),
"numFiles": len(fs),
})
}

View File

@ -69,8 +69,9 @@ type rwFolder struct {
copiers int
pullers int
stop chan struct{}
queue *jobQueue
stop chan struct{}
queue *jobQueue
dbUpdates chan protocol.FileInfo
}
func newRWFolder(m *Model, cfg config.FolderConfiguration) *rwFolder {
@ -276,6 +277,7 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
copyChan := make(chan copyBlocksState)
finisherChan := make(chan *sharedPullerState)
var updateWg sync.WaitGroup
var copyWg sync.WaitGroup
var pullWg sync.WaitGroup
var doneWg sync.WaitGroup
@ -284,6 +286,14 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
l.Debugln(p, "c", p.copiers, "p", p.pullers)
}
p.dbUpdates = make(chan protocol.FileInfo)
updateWg.Add(1)
go func() {
// dbUpdaterRoutine finishes when p.dbUpdates is closed
p.dbUpdaterRoutine()
updateWg.Done()
}()
for i := 0; i < p.copiers; i++ {
copyWg.Add(1)
go func() {
@ -453,6 +463,10 @@ nextFile:
p.deleteDir(dir)
}
// Wait for db updates to complete
close(p.dbUpdates)
updateWg.Wait()
return changed
}
@ -510,7 +524,7 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) {
}
if err = osutil.InWritableDir(mkdir, realName); err == nil {
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
} else {
l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
}
@ -527,9 +541,9 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) {
// It's OK to change mode bits on stuff within non-writable directories.
if p.ignorePerms {
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
} else if err := os.Chmod(realName, mode); err == nil {
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
} else {
l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
}
@ -564,7 +578,7 @@ func (p *rwFolder) deleteDir(file protocol.FileInfo) {
}
err = osutil.InWritableDir(os.Remove, realName)
if err == nil || os.IsNotExist(err) {
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
} else {
l.Infof("Puller (folder %q, dir %q): delete: %v", p.folder, file.Name, err)
}
@ -601,7 +615,7 @@ func (p *rwFolder) deleteFile(file protocol.FileInfo) {
if err != nil && !os.IsNotExist(err) {
l.Infof("Puller (folder %q, file %q): delete: %v", p.folder, file.Name, err)
} else {
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
}
}
@ -653,7 +667,7 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
// of the source and the creation of the target. Fix-up the metadata,
// and update the local index of the target file.
p.model.updateLocal(p.folder, source)
p.dbUpdates <- source
err = p.shortcutFile(target)
if err != nil {
@ -671,7 +685,7 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
return
}
p.model.updateLocal(p.folder, source)
p.dbUpdates <- source
}
}
@ -802,7 +816,7 @@ func (p *rwFolder) shortcutFile(file protocol.FileInfo) (err error) {
}
}
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
return
}
@ -810,7 +824,7 @@ func (p *rwFolder) shortcutFile(file protocol.FileInfo) (err error) {
func (p *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) {
err = symlinks.ChangeType(filepath.Join(p.dir, file.Name), file.Flags)
if err == nil {
p.model.updateLocal(p.folder, file)
p.dbUpdates <- file
} else {
l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", p.folder, file.Name, err)
}
@ -1048,7 +1062,7 @@ func (p *rwFolder) performFinish(state *sharedPullerState) {
}
// Record the updated file in the index
p.model.updateLocal(p.folder, state.file)
p.dbUpdates <- state.file
}
func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) {
@ -1089,6 +1103,47 @@ func (p *rwFolder) Jobs() ([]string, []string) {
return p.queue.Jobs()
}
// dbUpdaterRoutine aggregates db updates and commits them in batches no
// larger than 1000 items, and no more delayed than 2 seconds.
func (p *rwFolder) dbUpdaterRoutine() {
const (
maxBatchSize = 1000
maxBatchTime = 2 * time.Second
)
batch := make([]protocol.FileInfo, 0, maxBatchSize)
tick := time.NewTicker(maxBatchTime)
defer tick.Stop()
loop:
for {
select {
case file, ok := <-p.dbUpdates:
if !ok {
break loop
}
file.LocalVersion = 0
batch = append(batch, file)
if len(batch) == maxBatchSize {
p.model.updateLocals(p.folder, batch)
batch = batch[:0]
}
case <-tick.C:
if len(batch) > 0 {
p.model.updateLocals(p.folder, batch)
batch = batch[:0]
}
}
}
if len(batch) > 0 {
p.model.updateLocals(p.folder, batch)
}
}
func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
for i := range cfg.Folders {
folder := &cfg.Folders[i]

View File

@ -70,7 +70,7 @@ func TestHandleFile(t *testing.T) {
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db)
m.AddFolder(defaultFolderConfig)
// Update index
m.updateLocal("default", existingFile)
m.updateLocals("default", []protocol.FileInfo{existingFile})
p := rwFolder{
folder: "default",
@ -124,7 +124,7 @@ func TestHandleFileWithTemp(t *testing.T) {
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db)
m.AddFolder(defaultFolderConfig)
// Update index
m.updateLocal("default", existingFile)
m.updateLocals("default", []protocol.FileInfo{existingFile})
p := rwFolder{
folder: "default",
@ -184,7 +184,7 @@ func TestCopierFinder(t *testing.T) {
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db)
m.AddFolder(defaultFolderConfig)
// Update index
m.updateLocal("default", existingFile)
m.updateLocals("default", []protocol.FileInfo{existingFile})
iterFn := func(folder, file string, index int32) bool {
return true
@ -268,7 +268,7 @@ func TestCopierCleanup(t *testing.T) {
}
// Add file to index
m.updateLocal("default", file)
m.updateLocals("default", []protocol.FileInfo{file})
if !m.finder.Iterate(blocks[0].Hash, iterFn) {
t.Error("Expected block not found")
@ -277,7 +277,7 @@ func TestCopierCleanup(t *testing.T) {
file.Blocks = []protocol.BlockInfo{blocks[1]}
file.Version = file.Version.Update(protocol.LocalDeviceID.Short())
// Update index (removing old blocks)
m.updateLocal("default", file)
m.updateLocals("default", []protocol.FileInfo{file})
if m.finder.Iterate(blocks[0].Hash, iterFn) {
t.Error("Unexpected block found")
@ -290,7 +290,7 @@ func TestCopierCleanup(t *testing.T) {
file.Blocks = []protocol.BlockInfo{blocks[0]}
file.Version = file.Version.Update(protocol.LocalDeviceID.Short())
// Update index (removing old blocks)
m.updateLocal("default", file)
m.updateLocals("default", []protocol.FileInfo{file})
if !m.finder.Iterate(blocks[0].Hash, iterFn) {
t.Error("Unexpected block found")
@ -316,7 +316,7 @@ func TestLastResortPulling(t *testing.T) {
Modified: 0,
Blocks: []protocol.BlockInfo{blocks[0]},
}
m.updateLocal("default", file)
m.updateLocals("default", []protocol.FileInfo{file})
// Pretend that we are handling a new file of the same content but
// with a different name (causing to copy that particular block)

View File

@ -14,7 +14,15 @@ import (
"time"
)
func TestBenchmarkTransfer(t *testing.T) {
func TestBenchmarkTransferManyFiles(t *testing.T) {
benchmarkTransfer(t, 50000, 15)
}
func TestBenchmarkTransferLargeFiles(t *testing.T) {
benchmarkTransfer(t, 200, 24)
}
func benchmarkTransfer(t *testing.T, files, sizeExp int) {
log.Println("Cleaning...")
err := removeAll("s1", "s2", "h1/index*", "h2/index*")
if err != nil {
@ -22,7 +30,7 @@ func TestBenchmarkTransfer(t *testing.T) {
}
log.Println("Generating files...")
err = generateFiles("s1", 10000, 22, "../LICENSE")
err = generateFiles("s1", files, sizeExp, "../LICENSE")
if err != nil {
t.Fatal(err)
}