lib/db, lib/model: Always use reasonable sized batches (fixes #2250, fixes #4112)

Harmonize how we use batches in the model, using ProtoSize() to judge
the actual weight of the entire batch instead of estimating. Use smaller
batches in the block map - I think we might have though that batch.Len()
in the leveldb was the batch size in bytes, but it's actually number of
operations.

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4114
This commit is contained in:
Jakob Borg 2017-04-22 14:23:33 +00:00 committed by Audrius Butkevicius
parent 10894695c6
commit e9f05d138f
2 changed files with 26 additions and 22 deletions

View File

@ -19,7 +19,7 @@ import (
var blockFinder *BlockFinder
const maxBatchSize = 256 << 10
const maxBatchSize = 1000
type BlockMap struct {
db *Instance

View File

@ -40,8 +40,8 @@ import (
// How many files to send in each Index/IndexUpdate message.
const (
indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
indexBatchSize = 1000 // Either way, don't include more files than this
maxBatchSizeBytes = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
maxBatchSizeFiles = 1000 // Either way, don't include more files than this
)
type service interface {
@ -1498,8 +1498,8 @@ func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, ignore
func sendIndexTo(minSequence int64, conn protocol.Connection, folder string, fs *db.FileSet, ignores *ignore.Matcher, dbLocation string, dropSymlinks bool) (int64, error) {
deviceID := conn.ID()
name := conn.Name()
batch := make([]protocol.FileInfo, 0, indexBatchSize)
currentBatchSize := 0
batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
batchSizeBytes := 0
initial := minSequence == 0
maxSequence := minSequence
var err error
@ -1530,26 +1530,26 @@ func sendIndexTo(minSequence int64, conn protocol.Connection, folder string, fs
})
sorter.Sorted(func(f protocol.FileInfo) bool {
if len(batch) == indexBatchSize || currentBatchSize > indexTargetSize {
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
if initial {
if err = conn.Index(folder, batch); err != nil {
return false
}
l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (initial index)", deviceID, name, folder, len(batch), currentBatchSize)
l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (initial index)", deviceID, name, folder, len(batch), batchSizeBytes)
initial = false
} else {
if err = conn.IndexUpdate(folder, batch); err != nil {
return false
}
l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (batched update)", deviceID, name, folder, len(batch), currentBatchSize)
l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (batched update)", deviceID, name, folder, len(batch), batchSizeBytes)
}
batch = make([]protocol.FileInfo, 0, indexBatchSize)
currentBatchSize = 0
batch = make([]protocol.FileInfo, 0, maxBatchSizeFiles)
batchSizeBytes = 0
}
batch = append(batch, f)
currentBatchSize += f.ProtoSize()
batchSizeBytes += f.ProtoSize()
return true
})
@ -1830,24 +1830,21 @@ func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error
return err
}
batchSizeFiles := 100
batchSizeBlocks := 2048 // about 256 MB
batch := make([]protocol.FileInfo, 0, batchSizeFiles)
blocksHandled := 0
batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
batchSizeBytes := 0
for f := range fchan {
if len(batch) == batchSizeFiles || blocksHandled > batchSizeBlocks {
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
if err := m.CheckFolderHealth(folder); err != nil {
l.Infof("Stopping folder %s mid-scan due to folder error: %s", folderCfg.Description(), err)
return err
}
m.updateLocalsFromScanning(folder, batch)
batch = batch[:0]
blocksHandled = 0
batchSizeBytes = 0
}
batch = append(batch, f)
blocksHandled += len(f.Blocks)
batchSizeBytes += f.ProtoSize()
}
if err := m.CheckFolderHealth(folder); err != nil {
@ -1866,18 +1863,20 @@ func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error
// Do a scan of the database for each prefix, to check for deleted and
// ignored files.
batch = batch[:0]
batchSizeBytes = 0
for _, sub := range subDirs {
var iterError error
fs.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi db.FileIntf) bool {
f := fi.(db.FileInfoTruncated)
if len(batch) == batchSizeFiles {
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
if err := m.CheckFolderHealth(folder); err != nil {
iterError = err
return false
}
m.updateLocalsFromScanning(folder, batch)
batch = batch[:0]
batchSizeBytes = 0
}
switch {
@ -1897,6 +1896,7 @@ func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error
Version: f.Version, // The file is still the same, so don't bump version
}
batch = append(batch, nf)
batchSizeBytes += nf.ProtoSize()
case !f.IsInvalid() && !f.IsDeleted():
// The file is valid and not deleted. Lets check if it's
@ -1922,6 +1922,7 @@ func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error
}
batch = append(batch, nf)
batchSizeBytes += nf.ProtoSize()
}
}
return true
@ -2067,12 +2068,14 @@ func (m *Model) Override(folder string) {
}
runner.setState(FolderScanning)
batch := make([]protocol.FileInfo, 0, indexBatchSize)
batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
batchSizeBytes := 0
fs.WithNeed(protocol.LocalDeviceID, func(fi db.FileIntf) bool {
need := fi.(protocol.FileInfo)
if len(batch) == indexBatchSize {
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
m.updateLocalsFromScanning(folder, batch)
batch = batch[:0]
batchSizeBytes = 0
}
have, ok := fs.Get(protocol.LocalDeviceID, need.Name)
@ -2089,6 +2092,7 @@ func (m *Model) Override(folder string) {
}
need.Sequence = 0
batch = append(batch, need)
batchSizeBytes += need.ProtoSize()
return true
})
if len(batch) > 0 {