syncthing/model.go

528 lines
11 KiB
Go
Raw Normal View History

2013-12-15 11:43:31 +01:00
package main
/*
Locking
=======
The model has read and write locks. These must be acquired as appropriate by
public methods. To prevent deadlock situations, private methods should never
acquire locks, but document what locks they require.
*/
import (
2013-12-23 18:12:44 +01:00
"fmt"
"io"
2013-12-15 11:43:31 +01:00
"os"
"path"
"sync"
"time"
"github.com/calmh/syncthing/buffers"
"github.com/calmh/syncthing/protocol"
)
type Model struct {
sync.RWMutex
2013-12-30 15:30:29 +01:00
dir string
global map[string]File // the latest version of each file as it exists in the cluster
local map[string]File // the files we currently have locally on disk
remote map[string]map[string]File
need map[string]bool // the files we need to update
nodes map[string]*protocol.Connection
rawConn map[string]io.ReadWriteCloser
2013-12-30 15:30:29 +01:00
updatedLocal int64 // timestamp of last update to local
updateGlobal int64 // timestamp of last update to remote
2013-12-24 21:21:03 +01:00
lastIdxBcast time.Time
lastIdxBcastRequest time.Time
2013-12-15 11:43:31 +01:00
}
const (
FlagDeleted = 1 << 12
2013-12-24 21:21:03 +01:00
2013-12-28 14:10:36 +01:00
idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
2013-12-15 11:43:31 +01:00
)
func NewModel(dir string) *Model {
m := &Model{
dir: dir,
global: make(map[string]File),
local: make(map[string]File),
remote: make(map[string]map[string]File),
need: make(map[string]bool),
nodes: make(map[string]*protocol.Connection),
rawConn: make(map[string]io.ReadWriteCloser),
lastIdxBcast: time.Now(),
2013-12-15 11:43:31 +01:00
}
2013-12-30 15:30:29 +01:00
go m.printStatsLoop()
2013-12-24 21:21:03 +01:00
go m.broadcastIndexLoop()
2013-12-15 11:43:31 +01:00
return m
}
func (m *Model) Start() {
go m.puller()
}
2013-12-30 15:30:29 +01:00
func (m *Model) printStatsLoop() {
var lastUpdated int64
2013-12-23 18:12:44 +01:00
for {
2013-12-24 17:45:16 +01:00
time.Sleep(60 * time.Second)
2013-12-23 18:12:44 +01:00
m.RLock()
2013-12-30 15:30:29 +01:00
m.printConnectionStats()
if m.updatedLocal+m.updateGlobal > lastUpdated {
m.printModelStats()
lastUpdated = m.updatedLocal + m.updateGlobal
2013-12-23 18:12:44 +01:00
}
m.RUnlock()
}
}
2013-12-30 15:30:29 +01:00
func (m *Model) printConnectionStats() {
for node, conn := range m.nodes {
stats := conn.Statistics()
2013-12-31 02:52:36 +01:00
if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 {
2013-12-30 15:30:29 +01:00
infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec))
}
}
}
func (m *Model) printModelStats() {
var tot int
for _, f := range m.global {
tot += f.Size()
}
infof("%6d files, %8sB in cluster", len(m.global), toSI(tot))
if len(m.need) > 0 {
tot = 0
for _, f := range m.local {
tot += f.Size()
}
infof("%6d files, %8sB in local repo", len(m.local), toSI(tot))
tot = 0
for n := range m.need {
tot += m.global[n].Size()
}
infof("%6d files, %8sB to synchronize", len(m.need), toSI(tot))
}
}
2013-12-23 18:12:44 +01:00
func toSI(n int) string {
if n > 1<<30 {
return fmt.Sprintf("%.02f G", float64(n)/(1<<30))
}
if n > 1<<20 {
return fmt.Sprintf("%.02f M", float64(n)/(1<<20))
}
if n > 1<<10 {
return fmt.Sprintf("%.01f K", float64(n)/(1<<10))
}
return fmt.Sprintf("%d ", n)
}
2013-12-30 15:30:29 +01:00
// Index is called when a new node is connected and we receive their full index.
2013-12-15 11:43:31 +01:00
func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
m.Lock()
defer m.Unlock()
2013-12-18 19:36:28 +01:00
if opts.Debug.TraceNet {
2013-12-15 11:43:31 +01:00
debugf("NET IDX(in): %s: %d files", nodeID, len(fs))
}
m.remote[nodeID] = make(map[string]File)
for _, f := range fs {
2013-12-18 19:36:28 +01:00
if f.Flags&FlagDeleted != 0 && !opts.Delete {
2013-12-15 11:43:31 +01:00
// Files marked as deleted do not even enter the model
continue
}
2013-12-30 02:33:57 +01:00
m.remote[nodeID][f.Name] = fileFromFileInfo(f)
2013-12-28 14:10:36 +01:00
}
m.recomputeGlobal()
m.recomputeNeed()
2013-12-30 15:30:29 +01:00
m.printModelStats()
2013-12-28 14:10:36 +01:00
}
2013-12-30 15:30:29 +01:00
// IndexUpdate is called for incremental updates to connected nodes' indexes.
2013-12-28 14:10:36 +01:00
func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
m.Lock()
defer m.Unlock()
if opts.Debug.TraceNet {
debugf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
}
repo, ok := m.remote[nodeID]
if !ok {
return
}
for _, f := range fs {
if f.Flags&FlagDeleted != 0 && !opts.Delete {
// Files marked as deleted do not even enter the model
continue
2013-12-15 11:43:31 +01:00
}
2013-12-30 02:33:57 +01:00
repo[f.Name] = fileFromFileInfo(f)
2013-12-15 11:43:31 +01:00
}
m.recomputeGlobal()
m.recomputeNeed()
}
2013-12-30 15:30:29 +01:00
// SeedIndex is called when our previously cached index is loaded from disk at startup.
2013-12-15 11:43:31 +01:00
func (m *Model) SeedIndex(fs []protocol.FileInfo) {
m.Lock()
defer m.Unlock()
m.local = make(map[string]File)
for _, f := range fs {
2013-12-30 02:33:57 +01:00
m.local[f.Name] = fileFromFileInfo(f)
2013-12-15 11:43:31 +01:00
}
m.recomputeGlobal()
m.recomputeNeed()
2013-12-30 15:30:29 +01:00
m.printModelStats()
2013-12-15 11:43:31 +01:00
}
func (m *Model) Close(node string, err error) {
2013-12-15 11:43:31 +01:00
m.Lock()
defer m.Unlock()
conn, ok := m.rawConn[node]
if ok {
conn.Close()
} else {
warnln("Close on unknown connection for node", node)
}
if err != nil {
warnf("Disconnected from node %s: %v", node, err)
} else {
infoln("Disconnected from node", node)
}
2013-12-15 11:43:31 +01:00
delete(m.remote, node)
delete(m.nodes, node)
delete(m.rawConn, node)
2013-12-15 11:43:31 +01:00
m.recomputeGlobal()
m.recomputeNeed()
}
func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
2013-12-18 19:36:28 +01:00
if opts.Debug.TraceNet && nodeID != "<local>" {
2013-12-15 11:43:31 +01:00
debugf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
}
fn := path.Join(m.dir, name)
fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
if err != nil {
return nil, err
}
defer fd.Close()
buf := buffers.Get(int(size))
_, err = fd.ReadAt(buf, int64(offset))
if err != nil {
return nil, err
}
return buf, nil
}
func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
m.RLock()
2013-12-28 14:10:36 +01:00
nc, ok := m.nodes[nodeID]
2013-12-15 11:43:31 +01:00
m.RUnlock()
2013-12-28 14:10:36 +01:00
if !ok {
2013-12-31 02:55:33 +01:00
return nil, fmt.Errorf("RequestGlobal: no such node: %s", nodeID)
2013-12-28 14:10:36 +01:00
}
2013-12-15 11:43:31 +01:00
2013-12-18 19:36:28 +01:00
if opts.Debug.TraceNet {
2013-12-15 11:43:31 +01:00
debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
}
return nc.Request(name, offset, size, hash)
}
func (m *Model) ReplaceLocal(fs []File) {
m.Lock()
defer m.Unlock()
var updated bool
var newLocal = make(map[string]File)
for _, f := range fs {
newLocal[f.Name] = f
if ef := m.local[f.Name]; ef.Modified != f.Modified {
updated = true
}
}
if m.markDeletedLocals(newLocal) {
updated = true
}
if len(newLocal) != len(m.local) {
updated = true
}
if updated {
m.local = newLocal
m.recomputeGlobal()
m.recomputeNeed()
2013-12-30 15:30:29 +01:00
m.updatedLocal = time.Now().Unix()
2013-12-25 02:31:25 +01:00
m.lastIdxBcastRequest = time.Now()
2013-12-15 11:43:31 +01:00
}
}
2013-12-24 21:21:03 +01:00
func (m *Model) broadcastIndexLoop() {
for {
m.RLock()
bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
2013-12-28 14:10:36 +01:00
m.RUnlock()
2013-12-24 21:21:03 +01:00
maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
2013-12-28 14:10:36 +01:00
m.Lock()
var indexWg sync.WaitGroup
indexWg.Add(len(m.nodes))
2013-12-24 21:21:03 +01:00
idx := m.protocolIndex()
2013-12-28 14:10:36 +01:00
m.lastIdxBcast = time.Now()
2013-12-24 21:21:03 +01:00
for _, node := range m.nodes {
node := node
if opts.Debug.TraceNet {
2013-12-30 15:30:29 +01:00
debugf("NET IDX(out/loop): %s: %d files", node.ID, len(idx))
2013-12-24 21:21:03 +01:00
}
2013-12-28 14:10:36 +01:00
go func() {
node.Index(idx)
indexWg.Done()
}()
2013-12-24 21:21:03 +01:00
}
2013-12-28 14:10:36 +01:00
m.Unlock()
indexWg.Wait()
2013-12-15 11:43:31 +01:00
}
2013-12-28 14:10:36 +01:00
time.Sleep(idxBcastHoldtime)
2013-12-15 11:43:31 +01:00
}
}
// markDeletedLocals sets the deleted flag on files that have gone missing locally.
// Must be called with the write lock held.
func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
// For every file in the existing local table, check if they are also
// present in the new local table. If they are not, check that we already
// had the newest version available according to the global table and if so
// note the file as having been deleted.
var updated bool
for n, f := range m.local {
if _, ok := newLocal[n]; !ok {
if gf := m.global[n]; gf.Modified <= f.Modified {
if f.Flags&FlagDeleted == 0 {
f.Flags = FlagDeleted
f.Modified = f.Modified + 1
f.Blocks = nil
updated = true
}
newLocal[n] = f
}
}
}
return updated
}
func (m *Model) UpdateLocal(f File) {
m.Lock()
defer m.Unlock()
if ef, ok := m.local[f.Name]; !ok || ef.Modified != f.Modified {
m.local[f.Name] = f
m.recomputeGlobal()
m.recomputeNeed()
2013-12-30 15:30:29 +01:00
m.updatedLocal = time.Now().Unix()
2013-12-25 02:31:25 +01:00
m.lastIdxBcastRequest = time.Now()
2013-12-15 11:43:31 +01:00
}
}
func (m *Model) Dir() string {
m.RLock()
defer m.RUnlock()
return m.dir
}
func (m *Model) HaveFiles() []File {
m.RLock()
defer m.RUnlock()
var files []File
for _, file := range m.local {
files = append(files, file)
}
return files
}
func (m *Model) LocalFile(name string) (File, bool) {
m.RLock()
defer m.RUnlock()
f, ok := m.local[name]
return f, ok
}
func (m *Model) GlobalFile(name string) (File, bool) {
m.RLock()
defer m.RUnlock()
f, ok := m.global[name]
return f, ok
}
// Must be called with the write lock held.
func (m *Model) recomputeGlobal() {
var newGlobal = make(map[string]File)
for n, f := range m.local {
newGlobal[n] = f
}
for _, fs := range m.remote {
for n, f := range fs {
if cf, ok := newGlobal[n]; !ok || cf.Modified < f.Modified {
newGlobal[n] = f
}
}
}
2013-12-30 15:30:29 +01:00
// Figure out if anything actually changed
var updated bool
if len(newGlobal) != len(m.global) {
updated = true
} else {
for n, f0 := range newGlobal {
if f1, ok := m.global[n]; !ok || f0.Modified != f1.Modified {
updated = true
break
}
}
}
if updated {
m.updateGlobal = time.Now().Unix()
m.global = newGlobal
}
2013-12-15 11:43:31 +01:00
}
// Must be called with the write lock held.
func (m *Model) recomputeNeed() {
m.need = make(map[string]bool)
for n, f := range m.global {
hf, ok := m.local[n]
if !ok || f.Modified > hf.Modified {
m.need[n] = true
}
}
}
// Must be called with the read lock held.
func (m *Model) whoHas(name string) []string {
var remote []string
gf := m.global[name]
for node, files := range m.remote {
if file, ok := files[name]; ok && file.Modified == gf.Modified {
remote = append(remote, node)
}
}
return remote
}
func (m *Model) ConnectedTo(nodeID string) bool {
m.RLock()
defer m.RUnlock()
_, ok := m.nodes[nodeID]
return ok
}
func (m *Model) ProtocolIndex() []protocol.FileInfo {
m.RLock()
defer m.RUnlock()
return m.protocolIndex()
}
// Must be called with the read lock held.
func (m *Model) protocolIndex() []protocol.FileInfo {
var index []protocol.FileInfo
for _, f := range m.local {
2013-12-30 02:33:57 +01:00
mf := fileInfoFromFile(f)
2013-12-18 19:36:28 +01:00
if opts.Debug.TraceIdx {
2013-12-15 11:43:31 +01:00
var flagComment string
if mf.Flags&FlagDeleted != 0 {
flagComment = " (deleted)"
}
debugf("IDX: %q m=%d f=%o%s (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, len(mf.Blocks))
}
index = append(index, mf)
}
return index
}
func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
node := protocol.NewConnection(nodeID, conn, conn, m)
2013-12-15 11:43:31 +01:00
m.Lock()
m.nodes[nodeID] = node
m.rawConn[nodeID] = conn
2013-12-15 11:43:31 +01:00
m.Unlock()
infoln("Connected to node", nodeID)
2013-12-15 11:43:31 +01:00
m.RLock()
idx := m.protocolIndex()
m.RUnlock()
go func() {
node.Index(idx)
infoln("Sent initial index to node", nodeID)
}()
2013-12-15 11:43:31 +01:00
}
2013-12-28 14:10:36 +01:00
2013-12-30 02:33:57 +01:00
func fileFromFileInfo(f protocol.FileInfo) File {
2013-12-30 01:49:40 +01:00
var blocks []Block
2013-12-28 14:10:36 +01:00
var offset uint64
for _, b := range f.Blocks {
2013-12-30 01:49:40 +01:00
blocks = append(blocks, Block{
2013-12-28 14:10:36 +01:00
Offset: offset,
Length: b.Length,
Hash: b.Hash,
})
offset += uint64(b.Length)
}
2013-12-30 01:49:40 +01:00
return File{
Name: f.Name,
Flags: f.Flags,
Modified: int64(f.Modified),
Blocks: blocks,
}
2013-12-28 14:10:36 +01:00
}
2013-12-30 02:33:57 +01:00
func fileInfoFromFile(f File) protocol.FileInfo {
var blocks []protocol.BlockInfo
for _, b := range f.Blocks {
blocks = append(blocks, protocol.BlockInfo{
Length: b.Length,
Hash: b.Hash,
})
}
return protocol.FileInfo{
Name: f.Name,
Flags: f.Flags,
Modified: int64(f.Modified),
Blocks: blocks,
}
}