Add Local Version field to files, send index in segments.

This commit is contained in:
Jakob Borg 2014-07-15 13:04:37 +02:00
parent fccdd85cc1
commit 8b349945de
16 changed files with 291 additions and 288 deletions

View File

@ -180,7 +180,7 @@ func restGetModelVersion(m *model.Model, w http.ResponseWriter, r *http.Request)
var repo = qs.Get("repo")
var res = make(map[string]interface{})
res["version"] = m.Version(repo)
res["version"] = m.LocalVersion(repo)
w.Header().Set("Content-Type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(res)
@ -210,7 +210,7 @@ func restGetModel(m *model.Model, w http.ResponseWriter, r *http.Request) {
res["inSyncFiles"], res["inSyncBytes"] = globalFiles-needFiles, globalBytes-needBytes
res["state"] = m.State(repo)
res["version"] = m.Version(repo)
res["version"] = m.LocalVersion(repo)
w.Header().Set("Content-Type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(res)

View File

@ -290,11 +290,6 @@ func main() {
rateBucket = ratelimit.NewBucketWithRate(float64(1000*cfg.Options.MaxSendKbps), int64(5*1000*cfg.Options.MaxSendKbps))
}
havePersistentIndex := false
if fi, err := os.Stat(filepath.Join(confDir, "index")); err == nil && fi.IsDir() {
havePersistentIndex = true
}
db, err := leveldb.OpenFile(filepath.Join(confDir, "index"), nil)
if err != nil {
l.Fatalln("leveldb.OpenFile():", err)
@ -364,11 +359,6 @@ nextRepo:
// Walk the repository and update the local model before establishing any
// connections to other nodes.
if !havePersistentIndex {
// There's no new style index, load old ones
l.Infoln("Loading legacy index files")
m.LoadIndexes(confDir)
}
m.CleanRepos()
l.Infoln("Performing initial repository scan")
m.ScanRepos()
@ -645,9 +635,10 @@ next:
if rateBucket != nil {
wr = &limitedWriter{conn, rateBucket}
}
protoConn := protocol.NewConnection(remoteID, conn, wr, m)
name := fmt.Sprintf("%s-%s", conn.LocalAddr(), conn.RemoteAddr())
protoConn := protocol.NewConnection(remoteID, conn, wr, m, name)
l.Infof("Established secure connection to %s at %v", remoteID, conn.RemoteAddr())
l.Infof("Established secure connection to %s at %s", remoteID, name)
if debugNet {
l.Debugf("cipher suite %04X", conn.ConnectionState().CipherSuite)
}

View File

@ -3,6 +3,7 @@ package files
import (
"bytes"
"sort"
"sync"
"github.com/calmh/syncthing/lamport"
"github.com/calmh/syncthing/protocol"
@ -12,6 +13,22 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
var (
clockTick uint64
clockMut sync.Mutex
)
func clock(v uint64) uint64 {
clockMut.Lock()
defer clockMut.Unlock()
if v > clockTick {
clockTick = v + 1
} else {
clockTick++
}
return clockTick
}
const (
keyTypeNode = iota
keyTypeGlobal
@ -91,11 +108,11 @@ func globalKeyName(key []byte) []byte {
return key[1+64:]
}
type deletionHandler func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) bool
type deletionHandler func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64
type fileIterator func(f protocol.FileInfo) bool
func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo, deleteFn deletionHandler) bool {
func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo, deleteFn deletionHandler) uint64 {
sort.Sort(fileList(fs)) // sort list on name, same as on disk
start := nodeKey(repo, node, nil) // before all repo/node files
@ -112,7 +129,7 @@ func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo
moreDb := dbi.Next()
fsi := 0
changed := false
var maxLocalVer uint64
for {
var newName, oldName []byte
@ -144,9 +161,10 @@ func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo
switch {
case moreFs && (!moreDb || cmp == -1):
changed = true
// Disk is missing this file. Insert it.
ldbInsert(batch, repo, node, newName, fs[fsi])
if lv := ldbInsert(batch, repo, node, newName, fs[fsi]); lv > maxLocalVer {
maxLocalVer = lv
}
ldbUpdateGlobal(snap, batch, repo, node, newName, fs[fsi].Version)
fsi++
@ -155,9 +173,10 @@ func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo
var ef protocol.FileInfo
ef.UnmarshalXDR(dbi.Value())
if fs[fsi].Version > ef.Version {
ldbInsert(batch, repo, node, newName, fs[fsi])
if lv := ldbInsert(batch, repo, node, newName, fs[fsi]); lv > maxLocalVer {
maxLocalVer = lv
}
ldbUpdateGlobal(snap, batch, repo, node, newName, fs[fsi].Version)
changed = true
}
// Iterate both sides.
fsi++
@ -165,8 +184,8 @@ func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo
case moreDb && (!moreFs || cmp == 1):
if deleteFn != nil {
if deleteFn(snap, batch, repo, node, oldName, dbi) {
changed = true
if lv := deleteFn(snap, batch, repo, node, oldName, dbi); lv > maxLocalVer {
maxLocalVer = lv
}
}
moreDb = dbi.Next()
@ -178,23 +197,24 @@ func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo
panic(err)
}
return changed
return maxLocalVer
}
func ldbReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) bool {
return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) bool {
func ldbReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
// TODO: Return the remaining maxLocalVer?
return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 {
// Disk has files that we are missing. Remove it.
if debug {
l.Debugf("delete; repo=%q node=%x name=%q", repo, node, name)
}
batch.Delete(dbi.Key())
ldbRemoveFromGlobal(db, batch, repo, node, name)
return true
return 0
})
}
func ldbReplaceWithDelete(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) bool {
return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) bool {
func ldbReplaceWithDelete(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 {
var f protocol.FileInfo
err := f.UnmarshalXDR(dbi.Value())
if err != nil {
@ -204,18 +224,20 @@ func ldbReplaceWithDelete(db *leveldb.DB, repo, node []byte, fs []protocol.FileI
if debug {
l.Debugf("mark deleted; repo=%q node=%x name=%q", repo, node, name)
}
ts := clock(f.LocalVersion)
f.Blocks = nil
f.Version = lamport.Default.Tick(f.Version)
f.Flags |= protocol.FlagDeleted
f.LocalVersion = ts
batch.Put(dbi.Key(), f.MarshalXDR())
ldbUpdateGlobal(db, batch, repo, node, nodeKeyName(dbi.Key()), f.Version)
return true
return ts
}
return false
return 0
})
}
func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) bool {
func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
batch := new(leveldb.Batch)
snap, err := db.GetSnapshot()
if err != nil {
@ -223,12 +245,15 @@ func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) bool {
}
defer snap.Release()
var maxLocalVer uint64
for _, f := range fs {
name := []byte(f.Name)
fk := nodeKey(repo, node, name)
bs, err := snap.Get(fk, nil)
if err == leveldb.ErrNotFound {
ldbInsert(batch, repo, node, name, f)
if lv := ldbInsert(batch, repo, node, name, f); lv > maxLocalVer {
maxLocalVer = lv
}
ldbUpdateGlobal(snap, batch, repo, node, name, f.Version)
continue
}
@ -239,7 +264,9 @@ func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) bool {
panic(err)
}
if ef.Version != f.Version {
ldbInsert(batch, repo, node, name, f)
if lv := ldbInsert(batch, repo, node, name, f); lv > maxLocalVer {
maxLocalVer = lv
}
ldbUpdateGlobal(snap, batch, repo, node, name, f.Version)
}
}
@ -249,16 +276,22 @@ func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) bool {
panic(err)
}
return true
return maxLocalVer
}
func ldbInsert(batch dbWriter, repo, node, name []byte, file protocol.FileInfo) {
func ldbInsert(batch dbWriter, repo, node, name []byte, file protocol.FileInfo) uint64 {
if debug {
l.Debugf("insert; repo=%q node=%x %v", repo, node, file)
}
if file.LocalVersion == 0 {
file.LocalVersion = clock(0)
}
nk := nodeKey(repo, node, name)
batch.Put(nk, file.MarshalXDR())
return file.LocalVersion
}
// ldbUpdateGlobal adds this node+version to the version list for the given

View File

@ -21,18 +21,29 @@ type fileRecord struct {
type bitset uint64
type Set struct {
changes map[protocol.NodeID]uint64
mutex sync.Mutex
repo string
db *leveldb.DB
localVersion map[protocol.NodeID]uint64
mutex sync.Mutex
repo string
db *leveldb.DB
}
func NewSet(repo string, db *leveldb.DB) *Set {
var s = Set{
changes: make(map[protocol.NodeID]uint64),
repo: repo,
db: db,
localVersion: make(map[protocol.NodeID]uint64),
repo: repo,
db: db,
}
var lv uint64
ldbWithHave(db, []byte(repo), protocol.LocalNodeID[:], func(f protocol.FileInfo) bool {
if f.LocalVersion > lv {
lv = f.LocalVersion
}
return true
})
s.localVersion[protocol.LocalNodeID] = lv
clock(lv)
return &s
}
@ -42,8 +53,8 @@ func (s *Set) Replace(node protocol.NodeID, fs []protocol.FileInfo) {
}
s.mutex.Lock()
defer s.mutex.Unlock()
if ldbReplace(s.db, []byte(s.repo), node[:], fs) {
s.changes[node]++
if lv := ldbReplace(s.db, []byte(s.repo), node[:], fs); lv > s.localVersion[node] {
s.localVersion[node] = lv
}
}
@ -53,8 +64,8 @@ func (s *Set) ReplaceWithDelete(node protocol.NodeID, fs []protocol.FileInfo) {
}
s.mutex.Lock()
defer s.mutex.Unlock()
if ldbReplaceWithDelete(s.db, []byte(s.repo), node[:], fs) {
s.changes[node]++
if lv := ldbReplaceWithDelete(s.db, []byte(s.repo), node[:], fs); lv > s.localVersion[node] {
s.localVersion[node] = lv
}
}
@ -64,8 +75,8 @@ func (s *Set) Update(node protocol.NodeID, fs []protocol.FileInfo) {
}
s.mutex.Lock()
defer s.mutex.Unlock()
if ldbUpdate(s.db, []byte(s.repo), node[:], fs) {
s.changes[node]++
if lv := ldbUpdate(s.db, []byte(s.repo), node[:], fs); lv > s.localVersion[node] {
s.localVersion[node] = lv
}
}
@ -102,8 +113,8 @@ func (s *Set) Availability(file string) []protocol.NodeID {
return ldbAvailability(s.db, []byte(s.repo), []byte(file))
}
func (s *Set) Changes(node protocol.NodeID) uint64 {
func (s *Set) LocalVersion(node protocol.NodeID) uint64 {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.changes[node]
return s.localVersion[node]
}

View File

@ -554,7 +554,7 @@ func TestNeed(t *testing.T) {
}
}
func TestChanges(t *testing.T) {
func TestLocalVersion(t *testing.T) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
t.Fatal(err)
@ -578,17 +578,17 @@ func TestChanges(t *testing.T) {
}
m.ReplaceWithDelete(protocol.LocalNodeID, local1)
c0 := m.Changes(protocol.LocalNodeID)
c0 := m.LocalVersion(protocol.LocalNodeID)
m.ReplaceWithDelete(protocol.LocalNodeID, local2)
c1 := m.Changes(protocol.LocalNodeID)
c1 := m.LocalVersion(protocol.LocalNodeID)
if !(c1 > c0) {
t.Fatal("Change number should have incremented")
t.Fatal("Local version number should have incremented")
}
m.ReplaceWithDelete(protocol.LocalNodeID, local2)
c2 := m.Changes(protocol.LocalNodeID)
c2 := m.LocalVersion(protocol.LocalNodeID)
if c2 != c1 {
t.Fatal("Change number should be unchanged")
t.Fatal("Local version number should be unchanged")
}
}

View File

@ -1,8 +1,8 @@
package files
import (
"sort"
"github.com/calmh/syncthing/protocol"
"sort"
)
type SortBy func(p protocol.FileInfo) int

View File

@ -112,6 +112,9 @@ alterFiles() {
done
pkill -CONT syncthing
echo "Restarting instance 2"
curl -HX-API-Key:abc123 -X POST "http://localhost:8082/rest/restart"
}
rm -rf h?/*.idx.gz h?/index

View File

@ -17,8 +17,8 @@ var (
// Generate returns a check digit for the string s, which should be composed
// of characters from the Alphabet a.
func (a Alphabet) Generate(s string) (rune, error) {
if err:=a.check();err!=nil{
return 0,err
if err := a.check(); err != nil {
return 0, err
}
factor := 1

View File

@ -5,8 +5,6 @@
package model
import (
"compress/gzip"
"crypto/sha1"
"errors"
"fmt"
"io"
@ -21,7 +19,6 @@ import (
"github.com/calmh/syncthing/events"
"github.com/calmh/syncthing/files"
"github.com/calmh/syncthing/lamport"
"github.com/calmh/syncthing/osutil"
"github.com/calmh/syncthing/protocol"
"github.com/calmh/syncthing/scanner"
"github.com/syndtr/goleveldb/leveldb"
@ -42,6 +39,9 @@ const (
// transfer to bring the systems into synchronization.
const zeroEntrySize = 128
// How many files to send in each Index/IndexUpdate message.
const indexBatchSize = 1000
type Model struct {
indexDir string
cfg *config.Configuration
@ -65,6 +65,9 @@ type Model struct {
nodeVer map[protocol.NodeID]string
pmut sync.RWMutex // protects protoConn and rawConn
sentLocalVer map[protocol.NodeID]map[string]uint64
slMut sync.Mutex
sup suppressor
addedRepo bool
@ -95,6 +98,7 @@ func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVers
protoConn: make(map[protocol.NodeID]protocol.Connection),
rawConn: make(map[protocol.NodeID]io.Closer),
nodeVer: make(map[protocol.NodeID]string),
sentLocalVer: make(map[protocol.NodeID]map[string]uint64),
sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
}
@ -108,7 +112,6 @@ func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVers
deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
go m.broadcastIndexLoop()
return m
}
@ -392,13 +395,13 @@ func (m *Model) Close(node protocol.NodeID, err error) {
"error": err.Error(),
})
m.pmut.Lock()
m.rmut.RLock()
for _, repo := range m.nodeRepos[node] {
m.repoFiles[repo].Replace(node, nil)
}
m.rmut.RUnlock()
m.pmut.Lock()
conn, ok := m.rawConn[node]
if ok {
conn.Close()
@ -502,6 +505,7 @@ func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
// repository changes.
func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
nodeID := protoConn.ID()
m.pmut.Lock()
if _, ok := m.protoConn[nodeID]; ok {
panic("add existing node")
@ -511,48 +515,99 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection)
panic("add existing node")
}
m.rawConn[nodeID] = rawConn
m.pmut.Unlock()
cm := m.clusterConfig(nodeID)
protoConn.ClusterConfig(cm)
var idxToSend = make(map[string][]protocol.FileInfo)
m.rmut.RLock()
for _, repo := range m.nodeRepos[nodeID] {
idxToSend[repo] = m.protocolIndex(repo)
fs := m.repoFiles[repo]
go sendIndexes(protoConn, repo, fs)
}
m.rmut.RUnlock()
go func() {
for repo, idx := range idxToSend {
if debug {
l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
}
const batchSize = 1000
for i := 0; i < len(idx); i += batchSize {
if len(idx[i:]) < batchSize {
protoConn.Index(repo, idx[i:])
} else {
protoConn.Index(repo, idx[i:i+batchSize])
}
}
}
}()
m.pmut.Unlock()
}
// protocolIndex returns the current local index in protocol data types.
func (m *Model) protocolIndex(repo string) []protocol.FileInfo {
var fs []protocol.FileInfo
m.repoFiles[repo].WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
fs = append(fs, f)
return true
})
func sendIndexes(conn protocol.Connection, repo string, fs *files.Set) {
nodeID := conn.ID()
name := conn.Name()
return fs
if debug {
l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo)
}
initial := true
minLocalVer := uint64(0)
var err error
defer func() {
if debug {
l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err)
}
}()
for err == nil {
if !initial && fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer {
time.Sleep(1 * time.Second)
continue
}
batch := make([]protocol.FileInfo, 0, indexBatchSize)
maxLocalVer := uint64(0)
fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
if f.LocalVersion <= minLocalVer {
return true
}
if f.LocalVersion > maxLocalVer {
maxLocalVer = f.LocalVersion
}
if len(batch) == indexBatchSize {
if initial {
if err = conn.Index(repo, batch); err != nil {
return false
}
if debug {
l.Debugf("sendIndexes for %s-%s/%q: %d files (initial index)", nodeID, name, repo, len(batch))
}
initial = false
} else {
if err = conn.IndexUpdate(repo, batch); err != nil {
return false
}
if debug {
l.Debugf("sendIndexes for %s-%s/%q: %d files (batched update)", nodeID, name, repo, len(batch))
}
}
batch = make([]protocol.FileInfo, 0, indexBatchSize)
}
batch = append(batch, f)
return true
})
if initial {
err = conn.Index(repo, batch)
if debug && err == nil {
l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch))
}
initial = false
} else if len(batch) > 0 {
err = conn.IndexUpdate(repo, batch)
if debug && err == nil {
l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch))
}
}
minLocalVer = maxLocalVer
}
}
func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
f.LocalVersion = 0
m.rmut.RLock()
m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
m.rmut.RUnlock()
@ -575,49 +630,6 @@ func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset
return nc.Request(repo, name, offset, size)
}
func (m *Model) broadcastIndexLoop() {
// TODO: Rewrite to send index in segments
var lastChange = map[string]uint64{}
for {
time.Sleep(5 * time.Second)
m.pmut.RLock()
m.rmut.RLock()
var indexWg sync.WaitGroup
for repo, fs := range m.repoFiles {
repo := repo
c := fs.Changes(protocol.LocalNodeID)
if c == lastChange[repo] {
continue
}
lastChange[repo] = c
idx := m.protocolIndex(repo)
for _, nodeID := range m.repoNodes[repo] {
nodeID := nodeID
if conn, ok := m.protoConn[nodeID]; ok {
indexWg.Add(1)
if debug {
l.Debugf("IDX(out/loop): %s: %d files", nodeID, len(idx))
}
go func() {
conn.Index(repo, idx)
indexWg.Done()
}()
}
}
}
m.rmut.RUnlock()
m.pmut.RUnlock()
indexWg.Wait()
}
}
func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
if m.started {
panic("cannot add repo to started model")
@ -709,88 +721,6 @@ func (m *Model) ScanRepo(repo string) error {
return nil
}
func (m *Model) LoadIndexes(dir string) {
m.rmut.RLock()
for repo := range m.repoCfgs {
fs := m.loadIndex(repo, dir)
var sfs = make([]protocol.FileInfo, len(fs))
for i := 0; i < len(fs); i++ {
lamport.Default.Tick(fs[i].Version)
fs[i].Flags &= ^uint32(protocol.FlagInvalid) // we might have saved an index with files that were suppressed; the should not be on startup
}
m.repoFiles[repo].Replace(protocol.LocalNodeID, sfs)
}
m.rmut.RUnlock()
}
func (m *Model) saveIndex(repo string, dir string, fs []protocol.FileInfo) error {
id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory)))
name := id + ".idx.gz"
name = filepath.Join(dir, name)
tmp := fmt.Sprintf("%s.tmp.%d", name, time.Now().UnixNano())
idxf, err := os.OpenFile(tmp, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer os.Remove(tmp)
gzw := gzip.NewWriter(idxf)
n, err := protocol.IndexMessage{
Repository: repo,
Files: fs,
}.EncodeXDR(gzw)
if err != nil {
gzw.Close()
idxf.Close()
return err
}
err = gzw.Close()
if err != nil {
return err
}
err = idxf.Close()
if err != nil {
return err
}
if debug {
l.Debugln("wrote index,", n, "bytes uncompressed")
}
return osutil.Rename(tmp, name)
}
func (m *Model) loadIndex(repo string, dir string) []protocol.FileInfo {
id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory)))
name := id + ".idx.gz"
name = filepath.Join(dir, name)
idxf, err := os.Open(name)
if err != nil {
return nil
}
defer idxf.Close()
gzr, err := gzip.NewReader(idxf)
if err != nil {
return nil
}
defer gzr.Close()
var im protocol.IndexMessage
err = im.DecodeXDR(gzr)
if err != nil || im.Repository != repo {
return nil
}
return im.Files
}
// clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
cm := protocol.ClusterConfigMessage{
@ -868,12 +798,12 @@ func (m *Model) Override(repo string) {
// Version returns the change version for the given repository. This is
// guaranteed to increment if the contents of the local or global repository
// has changed.
func (m *Model) Version(repo string) uint64 {
func (m *Model) LocalVersion(repo string) uint64 {
var ver uint64
m.rmut.Lock()
for _, n := range m.repoNodes[repo] {
ver += m.repoFiles[repo].Changes(n)
ver += m.repoFiles[repo].LocalVersion(n)
}
m.rmut.Unlock()

View File

@ -193,7 +193,7 @@ func (p *puller) run() {
default:
}
if v := p.model.Version(p.repoCfg.ID); v > prevVer {
if v := p.model.LocalVersion(p.repoCfg.ID); v > prevVer {
// Queue more blocks to fetch, if any
p.queueNeededBlocks()
prevVer = v
@ -335,15 +335,21 @@ func (p *puller) handleRequestResult(res requestResult) {
return
}
_, of.err = of.file.WriteAt(res.data, res.offset)
if res.err != nil {
of.err = res.err
if debug {
l.Debugf("pull: not writing %q / %q offset %d: %v", p.repoCfg.ID, f.Name, res.offset, res.err)
}
} else {
_, of.err = of.file.WriteAt(res.data, res.offset)
if debug {
l.Debugf("pull: wrote %q / %q offset %d len %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, len(res.data), of.outstanding, of.done)
}
}
of.outstanding--
p.openFiles[f.Name] = of
if debug {
l.Debugf("pull: wrote %q / %q offset %d outstanding %d done %v", p.repoCfg.ID, f.Name, res.offset, of.outstanding, of.done)
}
if of.done && of.outstanding == 0 {
p.closeFile(f)
}
@ -526,7 +532,7 @@ func (p *puller) handleRequestBlock(b bqBlock) bool {
}
node := p.oustandingPerNode.leastBusyNode(of.availability)
if len(node) == 0 {
if node == (protocol.NodeID{}) {
of.err = errNoNode
if of.file != nil {
of.file.Close()
@ -662,7 +668,7 @@ func (p *puller) closeFile(f protocol.FileInfo) {
for i := range hb {
if bytes.Compare(hb[i].Hash, f.Blocks[i].Hash) != 0 {
l.Debugf("pull: %q / %q: block %d hash mismatch", p.repoCfg.ID, f.Name, i)
l.Debugf("pull: %q / %q: block %d hash mismatch\n\thave: %x\n\twant: %x", p.repoCfg.ID, f.Name, i, hb[i].Hash, f.Blocks[i].Hash)
return
}
}

View File

@ -182,7 +182,7 @@ Cluster Config messages MUST NOT be sent after the initial exchange.
| Flags |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ Max Version (64 bits) +
+ Max Local Version (64 bits) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
@ -255,13 +255,13 @@ The Node Flags field contains the following single bit flags:
Exactly one of the T, R or S bits MUST be set.
The Node Max Version field contains the highest file version number of
the files already known to be in the index sent by this node. If nothing
is known about the index of a given node, this field MUST be set to
zero. When receiving a Cluster Config message with a non-zero Max
The Node Max Local Version field contains the highest local file version
number of the files already known to be in the index sent by this node.
If nothing is known about the index of a given node, this field MUST be
set to zero. When receiving a Cluster Config message with a non-zero Max
Version for the local node ID, a node MAY elect to send an Index Update
message containing only files with higher version numbers in place of
the initial Index message.
message containing only files with higher local version numbers in place
of the initial Index message.
The Options field contain option values to be used in an implementation
specific manner. The options list is conceptually a map of Key => Value
@ -292,7 +292,7 @@ peers acting in a specific manner as a result of sent options.
struct Node {
string ID<>;
unsigned int Flags;
unsigned hyper MaxVersion;
unsigned hyper MaxLocalVersion;
}
struct Option {
@ -359,6 +359,10 @@ Index message MUST be sent. There is no response to the Index message.
+ Version (64 bits) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ Local Version (64 bits) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Blocks |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
@ -400,6 +404,10 @@ detected and received change. The combination of Repository, Name and
Version uniquely identifies the contents of a file at a given point in
time.
The Local Version field is the value of a node local monotonic clock at
the time of last local database update to a file. The clock ticks on
every local database update.
The Flags field is made up of the following single bit flags:
0 1 2 3
@ -458,6 +466,7 @@ block which may represent a smaller amount of data.
unsigned int Flags;
hyper Modified;
unsigned hyper Version;
unsigned hyper LocalVer;
BlockInfo Blocks<>;
}

View File

@ -12,11 +12,12 @@ type IndexMessage struct {
}
type FileInfo struct {
Name string // max:1024
Flags uint32
Modified int64
Version uint64
Blocks []BlockInfo // max:1000000
Name string // max:1024
Flags uint32
Modified int64
Version uint64
LocalVersion uint64
Blocks []BlockInfo // max:1000000
}
func (f FileInfo) String() string {
@ -69,9 +70,9 @@ type Repository struct {
}
type Node struct {
ID []byte // max:32
Flags uint32
MaxVersion uint64
ID []byte // max:32
Flags uint32
MaxLocalVersion uint64
}
type Option struct {

View File

@ -121,6 +121,10 @@ FileInfo Structure:
+ Version (64 bits) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ Local Version (64 bits) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Blocks |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
@ -134,6 +138,7 @@ struct FileInfo {
unsigned int Flags;
hyper Modified;
unsigned hyper Version;
unsigned hyper LocalVersion;
BlockInfo Blocks<1000000>;
}
@ -163,6 +168,7 @@ func (o FileInfo) encodeXDR(xw *xdr.Writer) (int, error) {
xw.WriteUint32(o.Flags)
xw.WriteUint64(uint64(o.Modified))
xw.WriteUint64(o.Version)
xw.WriteUint64(o.LocalVersion)
if len(o.Blocks) > 1000000 {
return xw.Tot(), xdr.ErrElementSizeExceeded
}
@ -189,6 +195,7 @@ func (o *FileInfo) decodeXDR(xr *xdr.Reader) error {
o.Flags = xr.ReadUint32()
o.Modified = int64(xr.ReadUint64())
o.Version = xr.ReadUint64()
o.LocalVersion = xr.ReadUint64()
_BlocksSize := int(xr.ReadUint32())
if _BlocksSize > 1000000 {
return xdr.ErrElementSizeExceeded
@ -567,7 +574,7 @@ Node Structure:
| Flags |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ Max Version (64 bits) +
+ Max Local Version (64 bits) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
@ -575,7 +582,7 @@ Node Structure:
struct Node {
opaque ID<32>;
unsigned int Flags;
unsigned hyper MaxVersion;
unsigned hyper MaxLocalVersion;
}
*/
@ -602,7 +609,7 @@ func (o Node) encodeXDR(xw *xdr.Writer) (int, error) {
}
xw.WriteBytes(o.ID)
xw.WriteUint32(o.Flags)
xw.WriteUint64(o.MaxVersion)
xw.WriteUint64(o.MaxLocalVersion)
return xw.Tot(), xw.Error()
}
@ -620,7 +627,7 @@ func (o *Node) UnmarshalXDR(bs []byte) error {
func (o *Node) decodeXDR(xr *xdr.Reader) error {
o.ID = xr.ReadBytesMax(32)
o.Flags = xr.ReadUint32()
o.MaxVersion = xr.ReadUint64()
o.MaxLocalVersion = xr.ReadUint64()
return xr.Error()
}

View File

@ -66,7 +66,9 @@ type Model interface {
type Connection interface {
ID() NodeID
Index(repo string, files []FileInfo)
Name() string
Index(repo string, files []FileInfo) error
IndexUpdate(repo string, files []FileInfo) error
Request(repo string, name string, offset int64, size int) ([]byte, error)
ClusterConfig(config ClusterConfigMessage)
Statistics() Statistics
@ -74,6 +76,7 @@ type Connection interface {
type rawConnection struct {
id NodeID
name string
receiver Model
state int
@ -87,8 +90,7 @@ type rawConnection struct {
awaiting []chan asyncResult
awaitingMut sync.Mutex
idxSent map[string]map[string]uint64
idxMut sync.Mutex // ensures serialization of Index calls
idxMut sync.Mutex // ensures serialization of Index calls
nextID chan int
outbox chan []encodable
@ -106,15 +108,16 @@ const (
pingIdleTime = 60 * time.Second
)
func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model) Connection {
func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model, name string) Connection {
cr := &countingReader{Reader: reader}
cw := &countingWriter{Writer: writer}
rb := bufio.NewReader(cr)
wb := bufio.NewWriter(cw)
wb := bufio.NewWriterSize(cw, 65536)
c := rawConnection{
id: nodeID,
name: name,
receiver: nativeModel{receiver},
state: stateInitial,
cr: cr,
@ -123,7 +126,6 @@ func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver M
wb: wb,
xw: xdr.NewWriter(wb),
awaiting: make([]chan asyncResult, 0x1000),
idxSent: make(map[string]map[string]uint64),
outbox: make(chan []encodable),
nextID: make(chan int),
closed: make(chan struct{}),
@ -142,36 +144,34 @@ func (c *rawConnection) ID() NodeID {
return c.id
}
func (c *rawConnection) Name() string {
return c.name
}
// Index writes the list of file information to the connected peer node
func (c *rawConnection) Index(repo string, idx []FileInfo) {
func (c *rawConnection) Index(repo string, idx []FileInfo) error {
select {
case <-c.closed:
return ErrClosed
default:
}
c.idxMut.Lock()
defer c.idxMut.Unlock()
c.send(header{0, -1, messageTypeIndex}, IndexMessage{repo, idx})
c.idxMut.Unlock()
return nil
}
var msgType int
if c.idxSent[repo] == nil {
// This is the first time we send an index.
msgType = messageTypeIndex
c.idxSent[repo] = make(map[string]uint64)
for _, f := range idx {
c.idxSent[repo][f.Name] = f.Version
}
} else {
// We have sent one full index. Only send updates now.
msgType = messageTypeIndexUpdate
var diff []FileInfo
for _, f := range idx {
if vs, ok := c.idxSent[repo][f.Name]; !ok || f.Version != vs {
diff = append(diff, f)
c.idxSent[repo][f.Name] = f.Version
}
}
idx = diff
}
if msgType == messageTypeIndex || len(idx) > 0 {
c.send(header{0, -1, msgType}, IndexMessage{repo, idx})
// IndexUpdate writes the list of file information to the connected peer node as an update
func (c *rawConnection) IndexUpdate(repo string, idx []FileInfo) error {
select {
case <-c.closed:
return ErrClosed
default:
}
c.idxMut.Lock()
c.send(header{0, -1, messageTypeIndexUpdate}, IndexMessage{repo, idx})
c.idxMut.Unlock()
return nil
}
// Request returns the bytes for the specified block after fetching them from the connected peer.
@ -445,15 +445,13 @@ func (c *rawConnection) send(h header, es ...encodable) bool {
}
func (c *rawConnection) writerLoop() {
var err error
for {
select {
case es := <-c.outbox:
for _, e := range es {
e.encodeXDR(c.xw)
}
if err = c.flush(); err != nil {
if err := c.flush(); err != nil {
c.close(err)
return
}
@ -471,11 +469,9 @@ func (c *rawConnection) flush() error {
if err := c.xw.Error(); err != nil {
return err
}
if err := c.wb.Flush(); err != nil {
return err
}
return nil
}

View File

@ -18,7 +18,11 @@ func (c wireFormatConnection) ID() NodeID {
return c.next.ID()
}
func (c wireFormatConnection) Index(repo string, fs []FileInfo) {
func (c wireFormatConnection) Name() string {
return c.next.Name()
}
func (c wireFormatConnection) Index(repo string, fs []FileInfo) error {
var myFs = make([]FileInfo, len(fs))
copy(myFs, fs)
@ -26,7 +30,18 @@ func (c wireFormatConnection) Index(repo string, fs []FileInfo) {
myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name))
}
c.next.Index(repo, myFs)
return c.next.Index(repo, myFs)
}
func (c wireFormatConnection) IndexUpdate(repo string, fs []FileInfo) error {
var myFs = make([]FileInfo, len(fs))
copy(myFs, fs)
for i := range fs {
myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name))
}
return c.next.IndexUpdate(repo, myFs)
}
func (c wireFormatConnection) Request(repo, name string, offset int64, size int) ([]byte, error) {

View File

@ -6,6 +6,7 @@ package scanner
import (
"bytes"
"code.google.com/p/go.text/unicode/norm"
"errors"
"fmt"
"io/ioutil"
@ -14,7 +15,6 @@ import (
"runtime"
"strings"
"time"
"code.google.com/p/go.text/unicode/norm"
"github.com/calmh/syncthing/lamport"
"github.com/calmh/syncthing/protocol"
@ -216,6 +216,7 @@ func (w *Walker) walkAndHashFiles(res *[]protocol.FileInfo, ign map[string][]str
l.Infof("Changes to %q are being temporarily suppressed because it changes too frequently.", p)
cf.Flags |= protocol.FlagInvalid
cf.Version = lamport.Default.Tick(cf.Version)
cf.LocalVersion = 0
if debug {
l.Debugln("suppressed:", cf)
}