all: Remove need to restart syncthing (#6883)

This commit is contained in:
Audrius Butkevicius 2020-08-18 08:26:33 +01:00 committed by GitHub
parent 8f5215878b
commit bf9ff17267
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 405 additions and 286 deletions

View File

@ -634,12 +634,12 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
os.Exit(1)
}
// Check if auto-upgrades should be done and if yes, do an initial
// Check if auto-upgrades is possible, and if yes, and it's enabled do an initial
// upgrade immedately. The auto-upgrade routine can only be started
// later after App is initialised.
shouldAutoUpgrade := shouldUpgrade(cfg, runtimeOptions)
if shouldAutoUpgrade {
autoUpgradePossible := autoUpgradePossible(runtimeOptions)
if autoUpgradePossible && cfg.Options().AutoUpgradeEnabled() {
// try to do upgrade directly and log the error if relevant.
release, err := initialAutoUpgradeCheck(db.NewMiscDataNamespace(ldb))
if err == nil {
@ -680,7 +680,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
app := syncthing.New(cfg, ldb, evLogger, cert, appOpts)
if shouldAutoUpgrade {
if autoUpgradePossible {
go autoUpgrade(cfg, app, evLogger)
}
@ -702,9 +702,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
}
}
if opts := cfg.Options(); opts.RestartOnWakeup {
go standbyMonitor(app)
}
go standbyMonitor(app, cfg)
if err := app.Start(); err != nil {
os.Exit(syncthing.ExitError.AsInt())
@ -818,12 +816,12 @@ func ensureDir(dir string, mode fs.FileMode) error {
return nil
}
func standbyMonitor(app *syncthing.App) {
func standbyMonitor(app *syncthing.App, cfg config.Wrapper) {
restartDelay := 60 * time.Second
now := time.Now()
for {
time.Sleep(10 * time.Second)
if time.Since(now) > 2*time.Minute {
if time.Since(now) > 2*time.Minute && cfg.Options().RestartOnWakeup {
l.Infof("Paused state detected, possibly woke up from standby. Restarting in %v.", restartDelay)
// We most likely just woke from standby. If we restart
@ -838,13 +836,10 @@ func standbyMonitor(app *syncthing.App) {
}
}
func shouldUpgrade(cfg config.Wrapper, runtimeOptions RuntimeOptions) bool {
func autoUpgradePossible(runtimeOptions RuntimeOptions) bool {
if upgrade.DisabledByCompilation {
return false
}
if !cfg.Options().ShouldAutoUpgrade() {
return false
}
if runtimeOptions.NoUpgrade {
l.Infof("No automatic upgrades; STNOUPGRADE environment variable defined.")
return false
@ -862,18 +857,19 @@ func autoUpgrade(cfg config.Wrapper, app *syncthing.App, evLogger events.Logger)
if !ok || data["clientName"] != "syncthing" || upgrade.CompareVersions(data["clientVersion"], build.Version) != upgrade.Newer {
continue
}
l.Infof("Connected to device %s with a newer version (current %q < remote %q). Checking for upgrades.", data["id"], build.Version, data["clientVersion"])
if cfg.Options().AutoUpgradeEnabled() {
l.Infof("Connected to device %s with a newer version (current %q < remote %q). Checking for upgrades.", data["id"], build.Version, data["clientVersion"])
}
case <-timer.C:
}
opts := cfg.Options()
checkInterval := time.Duration(opts.AutoUpgradeIntervalH) * time.Hour
if checkInterval < time.Hour {
// We shouldn't be here if AutoUpgradeIntervalH < 1, but for
// safety's sake.
checkInterval = time.Hour
if !opts.AutoUpgradeEnabled() {
timer.Reset(upgradeCheckInterval)
continue
}
checkInterval := time.Duration(opts.AutoUpgradeIntervalH) * time.Hour
rel, err := upgrade.LatestRelease(opts.ReleasesURL, build.Version, opts.UpgradeToPreReleases)
if err == upgrade.ErrUpgradeUnsupported {
sub.Unsubscribe()

View File

@ -77,7 +77,7 @@ type service struct {
eventSubs map[events.EventType]events.BufferedSubscription
eventSubsMut sync.Mutex
evLogger events.Logger
discoverer discover.CachingMux
discoverer discover.Manager
connectionsService connections.Service
fss model.FolderSummaryService
urService *ur.Service
@ -107,7 +107,7 @@ type Service interface {
WaitForStart() error
}
func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, evLogger events.Logger, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, contr Controller, noUpgrade bool) Service {
func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, evLogger events.Logger, discoverer discover.Manager, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, contr Controller, noUpgrade bool) Service {
s := &service{
id: id,
cfg: cfg,

View File

@ -8,7 +8,6 @@ package api
import (
"context"
"time"
"github.com/syncthing/syncthing/lib/discover"
"github.com/syncthing/syncthing/lib/protocol"
@ -43,10 +42,7 @@ func (m *mockedCachingMux) Cache() map[protocol.DeviceID]discover.CacheEntry {
return nil
}
// from events.CachingMux
func (m *mockedCachingMux) Add(finder discover.Finder, cacheTime, negCacheTime time.Duration) {
}
// from events.Manager
func (m *mockedCachingMux) ChildErrors() map[string]error {
return nil

View File

@ -215,7 +215,7 @@ func migrateToConfigV18(cfg *Configuration) {
// Do channel selection for existing users. Those who have auto upgrades
// and usage reporting on default to the candidate channel. Others get
// stable.
if cfg.Options.URAccepted > 0 && cfg.Options.AutoUpgradeIntervalH > 0 {
if cfg.Options.URAccepted > 0 && cfg.Options.AutoUpgradeEnabled() {
cfg.Options.UpgradeToPreReleases = true
}

View File

@ -17,11 +17,11 @@ import (
type OptionsConfiguration struct {
RawListenAddresses []string `xml:"listenAddress" json:"listenAddresses" default:"default"`
RawGlobalAnnServers []string `xml:"globalAnnounceServer" json:"globalAnnounceServers" default:"default" restart:"true"`
GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" json:"globalAnnounceEnabled" default:"true" restart:"true"`
LocalAnnEnabled bool `xml:"localAnnounceEnabled" json:"localAnnounceEnabled" default:"true" restart:"true"`
LocalAnnPort int `xml:"localAnnouncePort" json:"localAnnouncePort" default:"21027" restart:"true"`
LocalAnnMCAddr string `xml:"localAnnounceMCAddr" json:"localAnnounceMCAddr" default:"[ff12::8384]:21027" restart:"true"`
RawGlobalAnnServers []string `xml:"globalAnnounceServer" json:"globalAnnounceServers" default:"default"`
GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" json:"globalAnnounceEnabled" default:"true"`
LocalAnnEnabled bool `xml:"localAnnounceEnabled" json:"localAnnounceEnabled" default:"true"`
LocalAnnPort int `xml:"localAnnouncePort" json:"localAnnouncePort" default:"21027"`
LocalAnnMCAddr string `xml:"localAnnounceMCAddr" json:"localAnnounceMCAddr" default:"[ff12::8384]:21027"`
MaxSendKbps int `xml:"maxSendKbps" json:"maxSendKbps"`
MaxRecvKbps int `xml:"maxRecvKbps" json:"maxRecvKbps"`
ReconnectIntervalS int `xml:"reconnectionIntervalS" json:"reconnectionIntervalS" default:"60"`
@ -38,15 +38,15 @@ type OptionsConfiguration struct {
URURL string `xml:"urURL" json:"urURL" default:"https://data.syncthing.net/newdata"` // usage reporting URL
URPostInsecurely bool `xml:"urPostInsecurely" json:"urPostInsecurely" default:"false"` // For testing
URInitialDelayS int `xml:"urInitialDelayS" json:"urInitialDelayS" default:"1800"`
RestartOnWakeup bool `xml:"restartOnWakeup" json:"restartOnWakeup" default:"true" restart:"true"`
AutoUpgradeIntervalH int `xml:"autoUpgradeIntervalH" json:"autoUpgradeIntervalH" default:"12" restart:"true"` // 0 for off
UpgradeToPreReleases bool `xml:"upgradeToPreReleases" json:"upgradeToPreReleases" restart:"true"` // when auto upgrades are enabled
KeepTemporariesH int `xml:"keepTemporariesH" json:"keepTemporariesH" default:"24"` // 0 for off
CacheIgnoredFiles bool `xml:"cacheIgnoredFiles" json:"cacheIgnoredFiles" default:"false" restart:"true"`
RestartOnWakeup bool `xml:"restartOnWakeup" json:"restartOnWakeup" default:"true"`
AutoUpgradeIntervalH int `xml:"autoUpgradeIntervalH" json:"autoUpgradeIntervalH" default:"12"` // 0 for off
UpgradeToPreReleases bool `xml:"upgradeToPreReleases" json:"upgradeToPreReleases"` // when auto upgrades are enabled
KeepTemporariesH int `xml:"keepTemporariesH" json:"keepTemporariesH" default:"24"` // 0 for off
CacheIgnoredFiles bool `xml:"cacheIgnoredFiles" json:"cacheIgnoredFiles" default:"false"`
ProgressUpdateIntervalS int `xml:"progressUpdateIntervalS" json:"progressUpdateIntervalS" default:"5"`
LimitBandwidthInLan bool `xml:"limitBandwidthInLan" json:"limitBandwidthInLan" default:"false"`
MinHomeDiskFree Size `xml:"minHomeDiskFree" json:"minHomeDiskFree" default:"1 %"`
ReleasesURL string `xml:"releasesURL" json:"releasesURL" default:"https://upgrades.syncthing.net/meta.json" restart:"true"`
ReleasesURL string `xml:"releasesURL" json:"releasesURL" default:"https://upgrades.syncthing.net/meta.json"`
AlwaysLocalNets []string `xml:"alwaysLocalNet" json:"alwaysLocalNets"`
OverwriteRemoteDevNames bool `xml:"overwriteRemoteDeviceNamesOnConnect" json:"overwriteRemoteDeviceNamesOnConnect" default:"false"`
TempIndexMinBlocks int `xml:"tempIndexMinBlocks" json:"tempIndexMinBlocks" default:"10"`
@ -56,11 +56,11 @@ type OptionsConfiguration struct {
SetLowPriority bool `xml:"setLowPriority" json:"setLowPriority" default:"true"`
RawMaxFolderConcurrency int `xml:"maxFolderConcurrency" json:"maxFolderConcurrency"`
CRURL string `xml:"crashReportingURL" json:"crURL" default:"https://crash.syncthing.net/newcrash"` // crash reporting URL
CREnabled bool `xml:"crashReportingEnabled" json:"crashReportingEnabled" default:"true" restart:"true"`
StunKeepaliveStartS int `xml:"stunKeepaliveStartS" json:"stunKeepaliveStartS" default:"180"` // 0 for off
StunKeepaliveMinS int `xml:"stunKeepaliveMinS" json:"stunKeepaliveMinS" default:"20"` // 0 for off
CREnabled bool `xml:"crashReportingEnabled" json:"crashReportingEnabled" default:"true"` // Read in the monitor, but it's read before every attempt to report stuff, so does not require a restart.
StunKeepaliveStartS int `xml:"stunKeepaliveStartS" json:"stunKeepaliveStartS" default:"180"` // 0 for off
StunKeepaliveMinS int `xml:"stunKeepaliveMinS" json:"stunKeepaliveMinS" default:"20"` // 0 for off
RawStunServers []string `xml:"stunServer" json:"stunServers" default:"default"`
DatabaseTuning Tuning `xml:"databaseTuning" json:"databaseTuning" restart:"true"`
DatabaseTuning Tuning `xml:"databaseTuning" json:"databaseTuning" restart:"true"` // Can't be adjusted once the database has been opened
RawMaxCIRequestKiB int `xml:"maxConcurrentIncomingRequestKiB" json:"maxConcurrentIncomingRequestKiB"`
DeprecatedUPnPEnabled bool `xml:"upnpEnabled,omitempty" json:"-"`
@ -201,6 +201,6 @@ func (opts OptionsConfiguration) MaxConcurrentIncomingRequestKiB() int {
return opts.RawMaxCIRequestKiB
}
func (opts OptionsConfiguration) ShouldAutoUpgrade() bool {
func (opts OptionsConfiguration) AutoUpgradeEnabled() bool {
return opts.AutoUpgradeIntervalH > 0
}

View File

@ -7,41 +7,21 @@
package discover
import (
"context"
"sort"
stdsync "sync"
"time"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/protocol"
)
// The CachingMux aggregates results from multiple Finders. Each Finder has
// an associated cache time and negative cache time. The cache time sets how
// long we cache and return successful lookup results, the negative cache
// time sets how long we refrain from asking about the same device ID after
// receiving a negative answer. The value of zero disables caching (positive
// or negative).
type CachingMux interface {
FinderService
Add(finder Finder, cacheTime, negCacheTime time.Duration)
ChildErrors() map[string]error
}
type cachingMux struct {
*suture.Supervisor
finders []cachedFinder
caches []*cache
mut sync.RWMutex
}
// A cachedFinder is a Finder with associated cache timeouts.
type cachedFinder struct {
Finder
cacheTime time.Duration
negCacheTime time.Duration
cache *cache
token *suture.ServiceToken
}
// An error may implement cachedError, in which case it will be interrogated
@ -51,150 +31,6 @@ type cachedError interface {
CacheFor() time.Duration
}
func NewCachingMux() CachingMux {
return &cachingMux{
Supervisor: suture.New("discover.cachingMux", suture.Spec{
PassThroughPanics: true,
}),
mut: sync.NewRWMutex(),
}
}
// Add registers a new Finder, with associated cache timeouts.
func (m *cachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) {
m.mut.Lock()
m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime})
m.caches = append(m.caches, newCache())
m.mut.Unlock()
if service, ok := finder.(suture.Service); ok {
m.Supervisor.Add(service)
}
}
// Lookup attempts to resolve the device ID using any of the added Finders,
// while obeying the cache settings.
func (m *cachingMux) Lookup(ctx context.Context, deviceID protocol.DeviceID) (addresses []string, err error) {
m.mut.RLock()
for i, finder := range m.finders {
if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
// We have a cache entry. Lets see what it says.
if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
// It's a positive, valid entry. Use it.
l.Debugln("cached discovery entry for", deviceID, "at", finder)
l.Debugln(" cache:", cacheEntry)
addresses = append(addresses, cacheEntry.Addresses...)
continue
}
valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime
if !cacheEntry.found && valid {
// It's a negative, valid entry. We should not make another
// attempt right now.
l.Debugln("negative cache entry for", deviceID, "at", finder, "valid until", cacheEntry.when.Add(finder.negCacheTime), "or", cacheEntry.validUntil)
continue
}
// It's expired. Ignore and continue.
}
// Perform the actual lookup and cache the result.
if addrs, err := finder.Lookup(ctx, deviceID); err == nil {
l.Debugln("lookup for", deviceID, "at", finder)
l.Debugln(" addresses:", addrs)
addresses = append(addresses, addrs...)
m.caches[i].Set(deviceID, CacheEntry{
Addresses: addrs,
when: time.Now(),
found: len(addrs) > 0,
})
} else {
// Lookup returned error, add a negative cache entry.
entry := CacheEntry{
when: time.Now(),
found: false,
}
if err, ok := err.(cachedError); ok {
entry.validUntil = time.Now().Add(err.CacheFor())
}
m.caches[i].Set(deviceID, entry)
}
}
m.mut.RUnlock()
addresses = util.UniqueTrimmedStrings(addresses)
sort.Strings(addresses)
l.Debugln("lookup results for", deviceID)
l.Debugln(" addresses: ", addresses)
return addresses, nil
}
func (m *cachingMux) String() string {
return "discovery cache"
}
func (m *cachingMux) Error() error {
return nil
}
func (m *cachingMux) ChildErrors() map[string]error {
children := make(map[string]error, len(m.finders))
m.mut.RLock()
for _, f := range m.finders {
children[f.String()] = f.Error()
}
m.mut.RUnlock()
return children
}
func (m *cachingMux) Cache() map[protocol.DeviceID]CacheEntry {
// Res will be the "total" cache, i.e. the union of our cache and all our
// children's caches.
res := make(map[protocol.DeviceID]CacheEntry)
m.mut.RLock()
for i := range m.finders {
// Each finder[i] has a corresponding cache at cache[i]. Go through
// it and populate the total, appending any addresses and keeping
// the newest "when" time. We skip any negative cache entries.
for k, v := range m.caches[i].Cache() {
if v.found {
cur := res[k]
if v.when.After(cur.when) {
cur.when = v.when
}
cur.Addresses = append(cur.Addresses, v.Addresses...)
res[k] = cur
}
}
// Then ask the finder itself for its cache and do the same. If this
// finder is a global discovery client, it will have no cache. If it's
// a local discovery client, this will be its current state.
for k, v := range m.finders[i].Cache() {
if v.found {
cur := res[k]
if v.when.After(cur.when) {
cur.when = v.when
}
cur.Addresses = append(cur.Addresses, v.Addresses...)
res[k] = cur
}
}
}
m.mut.RUnlock()
for k, v := range res {
v.Addresses = util.UniqueTrimmedStrings(v.Addresses)
res[k] = v
}
return res
}
// A cache can be embedded wherever useful
type cache struct {

View File

@ -8,10 +8,13 @@ package discover
import (
"context"
"crypto/tls"
"reflect"
"testing"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol"
)
@ -30,15 +33,19 @@ func TestCacheUnique(t *testing.T) {
"tcp://192.0.2.44:22000",
}
c := NewCachingMux()
c.(*cachingMux).ServeBackground()
cfg := config.New(protocol.LocalDeviceID)
cfg.Options.LocalAnnEnabled = false
cfg.Options.GlobalAnnEnabled = false
c := NewManager(protocol.LocalDeviceID, config.Wrap("", cfg, events.NoopLogger), tls.Certificate{}, events.NoopLogger, nil).(*manager)
c.ServeBackground()
defer c.Stop()
// Add a fake discovery service and verify we get its answers through the
// cache.
f1 := &fakeDiscovery{addresses0}
c.Add(f1, time.Minute, 0)
c.addLocked("f1", f1, time.Minute, 0)
ctx := context.Background()
@ -54,7 +61,7 @@ func TestCacheUnique(t *testing.T) {
// duplicate or otherwise mess up the responses now.
f2 := &fakeDiscovery{addresses1}
c.Add(f2, time.Minute, 0)
c.addLocked("f2", f2, time.Minute, 0)
addr, err = c.Lookup(ctx, protocol.LocalDeviceID)
if err != nil {
@ -86,15 +93,19 @@ func (f *fakeDiscovery) Cache() map[protocol.DeviceID]CacheEntry {
}
func TestCacheSlowLookup(t *testing.T) {
c := NewCachingMux()
c.(*cachingMux).ServeBackground()
cfg := config.New(protocol.LocalDeviceID)
cfg.Options.LocalAnnEnabled = false
cfg.Options.GlobalAnnEnabled = false
c := NewManager(protocol.LocalDeviceID, config.Wrap("", cfg, events.NoopLogger), tls.Certificate{}, events.NoopLogger, nil).(*manager)
c.ServeBackground()
defer c.Stop()
// Add a slow discovery service.
started := make(chan struct{})
f1 := &slowDiscovery{time.Second, started}
c.Add(f1, time.Minute, 0)
c.addLocked("f1", f1, time.Minute, 0)
// Start a lookup, which will take at least a second

View File

@ -37,11 +37,6 @@ type FinderService interface {
suture.Service
}
type FinderMux interface {
Finder
ChildStatus() map[string]error
}
// The AddressLister answers questions about what addresses we are listening
// on.
type AddressLister interface {

View File

@ -12,6 +12,7 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
@ -448,3 +449,15 @@ func (c *contextClient) Post(ctx context.Context, url, ctype string, data io.Rea
req.Header.Set("Content-Type", ctype)
return c.Client.Do(req)
}
func globalDiscoveryIdentity(addr string) string {
return "global discovery server " + addr
}
func ipv4Identity(port int) string {
return fmt.Sprintf("IPv4 local broadcast discovery on port %d", port)
}
func ipv6Identity(addr string) string {
return fmt.Sprintf("IPv6 local multicast discovery on address %s", addr)
}

297
lib/discover/manager.go Normal file
View File

@ -0,0 +1,297 @@
// Copyright (C) 2020 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package discover
import (
"context"
"crypto/tls"
"fmt"
"sort"
"time"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
)
// The Manager aggregates results from multiple Finders. Each Finder has
// an associated cache time and negative cache time. The cache time sets how
// long we cache and return successful lookup results, the negative cache
// time sets how long we refrain from asking about the same device ID after
// receiving a negative answer. The value of zero disables caching (positive
// or negative).
type Manager interface {
FinderService
ChildErrors() map[string]error
}
type manager struct {
*suture.Supervisor
myID protocol.DeviceID
cfg config.Wrapper
cert tls.Certificate
evLogger events.Logger
addressLister AddressLister
finders map[string]cachedFinder
mut sync.RWMutex
}
func NewManager(myID protocol.DeviceID, cfg config.Wrapper, cert tls.Certificate, evLogger events.Logger, lister AddressLister) Manager {
return &manager{
Supervisor: suture.New("discover.Manager", suture.Spec{
PassThroughPanics: true,
}),
myID: myID,
cfg: cfg,
cert: cert,
evLogger: evLogger,
addressLister: lister,
finders: make(map[string]cachedFinder),
mut: sync.NewRWMutex(),
}
}
func (m *manager) Serve() {
m.cfg.Subscribe(m)
defer m.cfg.Unsubscribe(m)
m.CommitConfiguration(config.Configuration{}, m.cfg.RawCopy())
m.Supervisor.Serve()
}
func (m *manager) addLocked(identity string, finder Finder, cacheTime, negCacheTime time.Duration) {
entry := cachedFinder{
Finder: finder,
cacheTime: cacheTime,
negCacheTime: negCacheTime,
cache: newCache(),
token: nil,
}
if service, ok := finder.(suture.Service); ok {
token := m.Supervisor.Add(service)
entry.token = &token
}
m.finders[identity] = entry
l.Infoln("Using discovery mechanism:", identity)
}
func (m *manager) removeLocked(identity string) {
entry, ok := m.finders[identity]
if !ok {
return
}
if entry.token != nil {
err := m.Supervisor.Remove(*entry.token)
if err != nil {
l.Warnf("removing discovery %s: %s", identity, err)
}
}
delete(m.finders, identity)
l.Infoln("Stopped using discovery mechanism: ", identity)
}
// Lookup attempts to resolve the device ID using any of the added Finders,
// while obeying the cache settings.
func (m *manager) Lookup(ctx context.Context, deviceID protocol.DeviceID) (addresses []string, err error) {
m.mut.RLock()
for _, finder := range m.finders {
if cacheEntry, ok := finder.cache.Get(deviceID); ok {
// We have a cache entry. Lets see what it says.
if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
// It's a positive, valid entry. Use it.
l.Debugln("cached discovery entry for", deviceID, "at", finder)
l.Debugln(" cache:", cacheEntry)
addresses = append(addresses, cacheEntry.Addresses...)
continue
}
valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime
if !cacheEntry.found && valid {
// It's a negative, valid entry. We should not make another
// attempt right now.
l.Debugln("negative cache entry for", deviceID, "at", finder, "valid until", cacheEntry.when.Add(finder.negCacheTime), "or", cacheEntry.validUntil)
continue
}
// It's expired. Ignore and continue.
}
// Perform the actual lookup and cache the result.
if addrs, err := finder.Lookup(ctx, deviceID); err == nil {
l.Debugln("lookup for", deviceID, "at", finder)
l.Debugln(" addresses:", addrs)
addresses = append(addresses, addrs...)
finder.cache.Set(deviceID, CacheEntry{
Addresses: addrs,
when: time.Now(),
found: len(addrs) > 0,
})
} else {
// Lookup returned error, add a negative cache entry.
entry := CacheEntry{
when: time.Now(),
found: false,
}
if err, ok := err.(cachedError); ok {
entry.validUntil = time.Now().Add(err.CacheFor())
}
finder.cache.Set(deviceID, entry)
}
}
m.mut.RUnlock()
addresses = util.UniqueTrimmedStrings(addresses)
sort.Strings(addresses)
l.Debugln("lookup results for", deviceID)
l.Debugln(" addresses: ", addresses)
return addresses, nil
}
func (m *manager) String() string {
return "discovery cache"
}
func (m *manager) Error() error {
return nil
}
func (m *manager) ChildErrors() map[string]error {
children := make(map[string]error, len(m.finders))
m.mut.RLock()
for _, f := range m.finders {
children[f.String()] = f.Error()
}
m.mut.RUnlock()
return children
}
func (m *manager) Cache() map[protocol.DeviceID]CacheEntry {
// Res will be the "total" cache, i.e. the union of our cache and all our
// children's caches.
res := make(map[protocol.DeviceID]CacheEntry)
m.mut.RLock()
for _, finder := range m.finders {
// Each finder[i] has a corresponding cache. Go through
// it and populate the total, appending any addresses and keeping
// the newest "when" time. We skip any negative cache finders.
for k, v := range finder.cache.Cache() {
if v.found {
cur := res[k]
if v.when.After(cur.when) {
cur.when = v.when
}
cur.Addresses = append(cur.Addresses, v.Addresses...)
res[k] = cur
}
}
// Then ask the finder itself for its cache and do the same. If this
// finder is a global discovery client, it will have no cache. If it's
// a local discovery client, this will be its current state.
for k, v := range finder.Cache() {
if v.found {
cur := res[k]
if v.when.After(cur.when) {
cur.when = v.when
}
cur.Addresses = append(cur.Addresses, v.Addresses...)
res[k] = cur
}
}
}
m.mut.RUnlock()
for k, v := range res {
v.Addresses = util.UniqueTrimmedStrings(v.Addresses)
res[k] = v
}
return res
}
func (m *manager) VerifyConfiguration(_, _ config.Configuration) error {
return nil
}
func (m *manager) CommitConfiguration(_, to config.Configuration) (handled bool) {
m.mut.Lock()
defer m.mut.Unlock()
toIdentities := make(map[string]struct{})
if to.Options.GlobalAnnEnabled {
for _, srv := range to.Options.GlobalDiscoveryServers() {
toIdentities[globalDiscoveryIdentity(srv)] = struct{}{}
}
}
if to.Options.LocalAnnEnabled {
toIdentities[ipv4Identity(to.Options.LocalAnnPort)] = struct{}{}
toIdentities[ipv6Identity(to.Options.LocalAnnMCAddr)] = struct{}{}
}
// Remove things that we're not expected to have.
for identity := range m.finders {
if _, ok := toIdentities[identity]; !ok {
m.removeLocked(identity)
}
}
// Add things we don't have.
if to.Options.GlobalAnnEnabled {
for _, srv := range to.Options.GlobalDiscoveryServers() {
identity := globalDiscoveryIdentity(srv)
// Skip, if it's already running.
if _, ok := m.finders[identity]; ok {
continue
}
gd, err := NewGlobal(srv, m.cert, m.addressLister, m.evLogger)
if err != nil {
l.Warnln("Global discovery:", err)
continue
}
// Each global discovery server gets its results cached for five
// minutes, and is not asked again for a minute when it's returned
// unsuccessfully.
m.addLocked(identity, gd, 5*time.Minute, time.Minute)
}
}
if to.Options.LocalAnnEnabled {
// v4 broadcasts
v4Identity := ipv4Identity(to.Options.LocalAnnPort)
if _, ok := m.finders[v4Identity]; !ok {
bcd, err := NewLocal(m.myID, fmt.Sprintf(":%d", to.Options.LocalAnnPort), m.addressLister, m.evLogger)
if err != nil {
l.Warnln("IPv4 local discovery:", err)
} else {
m.addLocked(v4Identity, bcd, 0, 0)
}
}
// v6 multicasts
v6Identity := ipv6Identity(to.Options.LocalAnnMCAddr)
if _, ok := m.finders[v6Identity]; !ok {
mcd, err := NewLocal(m.myID, to.Options.LocalAnnMCAddr, m.addressLister, m.evLogger)
if err != nil {
l.Warnln("IPv6 local discovery:", err)
} else {
m.addLocked(v6Identity, mcd, 0, 0)
}
}
}
return true
}

View File

@ -121,10 +121,9 @@ type model struct {
evLogger events.Logger
// constant or concurrency safe fields
finder *db.BlockFinder
progressEmitter *ProgressEmitter
shortID protocol.ShortID
cacheIgnoredFiles bool
finder *db.BlockFinder
progressEmitter *ProgressEmitter
shortID protocol.ShortID
// globalRequestLimiter limits the amount of data in concurrent incoming
// requests
globalRequestLimiter *byteSemaphore
@ -202,7 +201,6 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
finder: db.NewBlockFinder(ldb),
progressEmitter: NewProgressEmitter(cfg, evLogger),
shortID: id.Short(),
cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles,
globalRequestLimiter: newByteSemaphore(1024 * cfg.Options().MaxConcurrentIncomingRequestKiB()),
folderIOLimiter: newByteSemaphore(cfg.Options().MaxFolderConcurrency()),
@ -245,12 +243,13 @@ func (m *model) ServeBackground() {
func (m *model) onServe() {
// Add and start folders
cacheIgnoredFiles := m.cfg.Options().CacheIgnoredFiles
for _, folderCfg := range m.cfg.Folders() {
if folderCfg.Paused {
folderCfg.CreateRoot()
continue
}
m.newFolder(folderCfg)
m.newFolder(folderCfg, cacheIgnoredFiles)
}
m.cfg.Subscribe(m)
}
@ -278,8 +277,8 @@ func (m *model) StartDeadlockDetector(timeout time.Duration) {
}
// Need to hold lock on m.fmut when calling this.
func (m *model) addAndStartFolderLocked(cfg config.FolderConfiguration, fset *db.FileSet) {
ignores := ignore.New(cfg.Filesystem(), ignore.WithCache(m.cacheIgnoredFiles))
func (m *model) addAndStartFolderLocked(cfg config.FolderConfiguration, fset *db.FileSet, cacheIgnoredFiles bool) {
ignores := ignore.New(cfg.Filesystem(), ignore.WithCache(cacheIgnoredFiles))
if err := ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) {
l.Warnln("Loading ignores:", err)
}
@ -445,7 +444,7 @@ func (m *model) cleanupFolderLocked(cfg config.FolderConfiguration) {
delete(m.folderVersioners, cfg.ID)
}
func (m *model) restartFolder(from, to config.FolderConfiguration) {
func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredFiles bool) {
if len(to.ID) == 0 {
panic("bug: cannot restart empty folder ID")
}
@ -495,12 +494,12 @@ func (m *model) restartFolder(from, to config.FolderConfiguration) {
m.cleanupFolderLocked(from)
if !to.Paused {
m.addAndStartFolderLocked(to, fset)
m.addAndStartFolderLocked(to, fset, cacheIgnoredFiles)
}
l.Infof("%v folder %v (%v)", infoMsg, to.Description(), to.Type)
}
func (m *model) newFolder(cfg config.FolderConfiguration) {
func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool) {
// Creating the fileset can take a long time (metadata calculation) so
// we do it outside of the lock.
fset := db.NewFileSet(cfg.ID, cfg.Filesystem(), m.db)
@ -510,7 +509,7 @@ func (m *model) newFolder(cfg config.FolderConfiguration) {
m.fmut.Lock()
defer m.fmut.Unlock()
m.addAndStartFolderLocked(cfg, fset)
m.addAndStartFolderLocked(cfg, fset, cacheIgnoredFiles)
}
func (m *model) UsageReportingStats(report *contract.Report, version int, preview bool) {
@ -2471,7 +2470,7 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
l.Infoln("Paused folder", cfg.Description())
} else {
l.Infoln("Adding folder", cfg.Description())
m.newFolder(cfg)
m.newFolder(cfg, to.Options.CacheIgnoredFiles)
}
}
}
@ -2490,8 +2489,8 @@ func (m *model) CommitConfiguration(from, to config.Configuration) bool {
// This folder exists on both sides. Settings might have changed.
// Check if anything differs that requires a restart.
if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) {
m.restartFolder(fromCfg, toCfg)
if !reflect.DeepEqual(fromCfg.RequiresRestartOnly(), toCfg.RequiresRestartOnly()) || from.Options.CacheIgnoredFiles != to.Options.CacheIgnoredFiles {
m.restartFolder(fromCfg, toCfg, to.Options.CacheIgnoredFiles)
}
// Emit the folder pause/resume event

View File

@ -1475,7 +1475,7 @@ func TestIgnores(t *testing.T) {
// Invalid path, marker should be missing, hence returns an error.
fcfg := config.FolderConfiguration{ID: "fresh", Path: "XXX"}
ignores := ignore.New(fcfg.Filesystem(), ignore.WithCache(m.cacheIgnoredFiles))
ignores := ignore.New(fcfg.Filesystem(), ignore.WithCache(m.cfg.Options().CacheIgnoredFiles))
m.fmut.Lock()
m.folderCfgs[fcfg.ID] = fcfg
m.folderIgnores[fcfg.ID] = ignores
@ -1490,7 +1490,7 @@ func TestIgnores(t *testing.T) {
pausedDefaultFolderConfig := defaultFolderConfig
pausedDefaultFolderConfig.Paused = true
m.restartFolder(defaultFolderConfig, pausedDefaultFolderConfig)
m.restartFolder(defaultFolderConfig, pausedDefaultFolderConfig, false)
// Here folder initialization is not an issue as a paused folder isn't
// added to the model and thus there is no initial scan happening.
@ -2290,7 +2290,7 @@ func TestIndexesForUnknownDevicesDropped(t *testing.T) {
}
m := newModel(defaultCfgWrapper, myID, "syncthing", "dev", dbi, nil)
m.newFolder(defaultFolderConfig)
m.newFolder(defaultFolderConfig, false)
defer cleanupModel(m)
// Remote sequence is cached, hence need to recreated.
@ -3343,7 +3343,7 @@ func TestConnCloseOnRestart(t *testing.T) {
newFcfg.Paused = true
done := make(chan struct{})
go func() {
m.restartFolder(fcfg, newFcfg)
m.restartFolder(fcfg, newFcfg, false)
close(done)
}()
select {

View File

@ -264,11 +264,6 @@ func (a *App) startup() error {
a.mainService.Add(m)
// Start discovery
cachedDiscovery := discover.NewCachingMux()
a.mainService.Add(cachedDiscovery)
// The TLS configuration is used for both the listening socket and outgoing
// connections.
@ -279,44 +274,21 @@ func (a *App) startup() error {
tlsCfg.SessionTicketsDisabled = true
tlsCfg.InsecureSkipVerify = true
// Start connection management
// Start discovery and connection management
connectionsService := connections.NewService(a.cfg, a.myID, m, tlsCfg, cachedDiscovery, bepProtocolName, tlsDefaultCommonName, a.evLogger)
// Chicken and egg, discovery manager depends on connection service to tell it what addresses it's listening on
// Connection service depends on discovery manager to get addresses to connect to.
// Create a wrapper that is then wired after they are both setup.
addrLister := &lateAddressLister{}
discoveryManager := discover.NewManager(a.myID, a.cfg, a.cert, a.evLogger, addrLister)
connectionsService := connections.NewService(a.cfg, a.myID, m, tlsCfg, discoveryManager, bepProtocolName, tlsDefaultCommonName, a.evLogger)
addrLister.AddressLister = connectionsService
a.mainService.Add(discoveryManager)
a.mainService.Add(connectionsService)
if a.cfg.Options().GlobalAnnEnabled {
for _, srv := range a.cfg.Options().GlobalDiscoveryServers() {
l.Infoln("Using discovery server", srv)
gd, err := discover.NewGlobal(srv, a.cert, connectionsService, a.evLogger)
if err != nil {
l.Warnln("Global discovery:", err)
continue
}
// Each global discovery server gets its results cached for five
// minutes, and is not asked again for a minute when it's returned
// unsuccessfully.
cachedDiscovery.Add(gd, 5*time.Minute, time.Minute)
}
}
if a.cfg.Options().LocalAnnEnabled {
// v4 broadcasts
bcd, err := discover.NewLocal(a.myID, fmt.Sprintf(":%d", a.cfg.Options().LocalAnnPort), connectionsService, a.evLogger)
if err != nil {
l.Warnln("IPv4 local discovery:", err)
} else {
cachedDiscovery.Add(bcd, 0, 0)
}
// v6 multicasts
mcd, err := discover.NewLocal(a.myID, a.cfg.Options().LocalAnnMCAddr, connectionsService, a.evLogger)
if err != nil {
l.Warnln("IPv6 local discovery:", err)
} else {
cachedDiscovery.Add(mcd, 0, 0)
}
}
// Candidate builds always run with usage reporting.
if opts := a.cfg.Options(); build.IsCandidate {
@ -341,7 +313,7 @@ func (a *App) startup() error {
// GUI
if err := a.setupGUI(m, defaultSub, diskSub, cachedDiscovery, connectionsService, usageReportingSvc, errors, systemLog); err != nil {
if err := a.setupGUI(m, defaultSub, diskSub, discoveryManager, connectionsService, usageReportingSvc, errors, systemLog); err != nil {
l.Warnln("Failed starting API:", err)
return err
}
@ -430,7 +402,7 @@ func (a *App) stopWithErr(stopReason ExitStatus, err error) ExitStatus {
return a.exitStatus
}
func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, errors, systemLog logger.Recorder) error {
func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.Manager, connectionsService connections.Service, urService *ur.Service, errors, systemLog logger.Recorder) error {
guiCfg := a.cfg.GUI()
if !guiCfg.Enabled {
@ -516,3 +488,7 @@ func printService(w io.Writer, svc interface{}, level int) {
}
}
}
type lateAddressLister struct {
discover.AddressLister
}

View File

@ -206,8 +206,8 @@ func (s *Service) reportData(ctx context.Context, urVersion int, preview bool) (
report.UsesRateLimit = opts.MaxRecvKbps > 0 || opts.MaxSendKbps > 0
report.UpgradeAllowedManual = !(upgrade.DisabledByCompilation || s.noUpgrade)
report.UpgradeAllowedAuto = !(upgrade.DisabledByCompilation || s.noUpgrade) && opts.AutoUpgradeIntervalH > 0
report.UpgradeAllowedPre = !(upgrade.DisabledByCompilation || s.noUpgrade) && opts.AutoUpgradeIntervalH > 0 && opts.UpgradeToPreReleases
report.UpgradeAllowedAuto = !(upgrade.DisabledByCompilation || s.noUpgrade) && opts.AutoUpgradeEnabled()
report.UpgradeAllowedPre = !(upgrade.DisabledByCompilation || s.noUpgrade) && opts.AutoUpgradeEnabled() && opts.UpgradeToPreReleases
// V3