lib/connections: Fix local address priority

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4534
LGTM: imsodin, calmh
This commit is contained in:
Audrius Butkevicius 2017-11-22 07:05:49 +00:00 committed by Jakob Borg
parent b99e92bad7
commit 4922b46fbd
9 changed files with 57 additions and 80 deletions

View File

@ -87,13 +87,6 @@ const (
maxSystemLog = 250
)
// The discovery results are sorted by their source priority.
const (
ipv6LocalDiscoveryPriority = iota
ipv4LocalDiscoveryPriority
globalDiscoveryPriority
)
func init() {
if Version != "unknown-dev" {
// If not a generic dev build, version string should come from git describe
@ -824,7 +817,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
// Each global discovery server gets its results cached for five
// minutes, and is not asked again for a minute when it's returned
// unsuccessfully.
cachedDiscovery.Add(gd, 5*time.Minute, time.Minute, globalDiscoveryPriority)
cachedDiscovery.Add(gd, 5*time.Minute, time.Minute)
}
}
@ -834,14 +827,14 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
if err != nil {
l.Warnln("IPv4 local discovery:", err)
} else {
cachedDiscovery.Add(bcd, 0, 0, ipv4LocalDiscoveryPriority)
cachedDiscovery.Add(bcd, 0, 0)
}
// v6 multicasts
mcd, err := discover.NewLocal(myID, cfg.Options().LocalAnnMCAddr, connectionsService)
if err != nil {
l.Warnln("IPv6 local discovery:", err)
} else {
cachedDiscovery.Add(mcd, 0, 0, ipv6LocalDiscoveryPriority)
cachedDiscovery.Add(mcd, 0, 0)
}
}

View File

@ -44,7 +44,7 @@ func (m *mockedCachingMux) Cache() map[protocol.DeviceID]discover.CacheEntry {
// from events.CachingMux
func (m *mockedCachingMux) Add(finder discover.Finder, cacheTime, negCacheTime time.Duration, priority int) {
func (m *mockedCachingMux) Add(finder discover.Finder, cacheTime, negCacheTime time.Duration) {
}
func (m *mockedCachingMux) ChildErrors() map[string]error {

View File

@ -100,6 +100,10 @@ func (kcpDialerFactory) Priority() int {
return kcpPriority
}
func (kcpDialerFactory) AlwaysWAN() bool {
return false
}
func (kcpDialerFactory) Enabled(cfg config.Configuration) bool {
return true
}

View File

@ -64,10 +64,6 @@ func (d *relayDialer) Dial(id protocol.DeviceID, uri *url.URL) (internalConn, er
return internalConn{tc, connTypeRelayClient, relayPriority}, nil
}
func (relayDialer) Priority() int {
return relayPriority
}
func (d *relayDialer) RedialFrequency() time.Duration {
return time.Duration(d.cfg.Options().RelayReconnectIntervalM) * time.Minute
}
@ -85,6 +81,10 @@ func (relayDialerFactory) Priority() int {
return relayPriority
}
func (relayDialerFactory) AlwaysWAN() bool {
return true
}
func (relayDialerFactory) Enabled(cfg config.Configuration) bool {
return cfg.Options.RelaysEnabled
}

View File

@ -336,6 +336,8 @@ func (s *Service) connect() {
}
}
addrs = util.UniqueStrings(addrs)
l.Debugln("Reconnect loop for", deviceID, addrs)
seen = append(seen, addrs...)
@ -375,9 +377,9 @@ func (s *Service) connect() {
continue
}
prio := dialerFactory.Priority()
priority := dialerFactory.Priority()
if connected && prio >= ct.Priority() {
if connected && priority >= ct.Priority() {
l.Debugf("Not dialing using %s as priority is less than current connection (%d >= %d)", dialerFactory, dialerFactory.Priority(), ct.Priority())
continue
}
@ -385,9 +387,18 @@ func (s *Service) connect() {
dialer := dialerFactory.New(s.cfg, s.tlsCfg)
nextDial[addr] = now.Add(dialer.RedialFrequency())
// For LAN addresses, increase the priority so that we
// try these first.
switch {
case dialerFactory.AlwaysWAN():
// Do nothing.
case s.isLANHost(uri.Host):
priority -= 1
}
dialTargets = append(dialTargets, dialTarget{
dialer: dialer,
priority: prio,
priority: priority,
deviceID: deviceID,
uri: uri,
})
@ -412,6 +423,17 @@ func (s *Service) connect() {
}
}
func (s *Service) isLANHost(host string) bool {
if noPort, _, err := net.SplitHostPort(host); err == nil && noPort != "" {
host = noPort
}
addr, err := net.ResolveIPAddr("ip", host)
if err != nil {
return false
}
return s.isLAN(addr)
}
func (s *Service) isLAN(addr net.Addr) bool {
tcpaddr, ok := addr.(*net.TCPAddr)
if !ok {

View File

@ -120,6 +120,7 @@ func (c internalConn) String() string {
type dialerFactory interface {
New(*config.Wrapper, *tls.Config) genericDialer
Priority() int
AlwaysWAN() bool
Enabled(config.Configuration) bool
String() string
}

View File

@ -73,6 +73,10 @@ func (tcpDialerFactory) Priority() int {
return tcpPriority
}
func (tcpDialerFactory) AlwaysWAN() bool {
return false
}
func (tcpDialerFactory) Enabled(cfg config.Configuration) bool {
return true
}

View File

@ -7,7 +7,6 @@
package discover
import (
"sort"
stdsync "sync"
"time"
@ -25,7 +24,7 @@ import (
// or negative).
type CachingMux interface {
FinderService
Add(finder Finder, cacheTime, negCacheTime time.Duration, priority int)
Add(finder Finder, cacheTime, negCacheTime time.Duration)
ChildErrors() map[string]error
}
@ -41,14 +40,6 @@ type cachedFinder struct {
Finder
cacheTime time.Duration
negCacheTime time.Duration
priority int
}
// A prioritizedAddress is what we use to sort addresses returned from
// different sources with different priorities.
type prioritizedAddress struct {
priority int
addr string
}
// An error may implement cachedError, in which case it will be interrogated
@ -66,9 +57,9 @@ func NewCachingMux() CachingMux {
}
// Add registers a new Finder, with associated cache timeouts.
func (m *cachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration, priority int) {
func (m *cachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) {
m.mut.Lock()
m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime, priority})
m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime})
m.caches = append(m.caches, newCache())
m.mut.Unlock()
@ -80,8 +71,6 @@ func (m *cachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration, p
// Lookup attempts to resolve the device ID using any of the added Finders,
// while obeying the cache settings.
func (m *cachingMux) Lookup(deviceID protocol.DeviceID) (addresses []string, err error) {
var paddresses []prioritizedAddress
m.mut.RLock()
for i, finder := range m.finders {
if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
@ -91,9 +80,7 @@ func (m *cachingMux) Lookup(deviceID protocol.DeviceID) (addresses []string, err
// It's a positive, valid entry. Use it.
l.Debugln("cached discovery entry for", deviceID, "at", finder)
l.Debugln(" cache:", cacheEntry)
for _, addr := range cacheEntry.Addresses {
paddresses = append(paddresses, prioritizedAddress{finder.priority, addr})
}
addresses = append(addresses, cacheEntry.Addresses...)
continue
}
@ -112,9 +99,7 @@ func (m *cachingMux) Lookup(deviceID protocol.DeviceID) (addresses []string, err
if addrs, err := finder.Lookup(deviceID); err == nil {
l.Debugln("lookup for", deviceID, "at", finder)
l.Debugln(" addresses:", addrs)
for _, addr := range addrs {
paddresses = append(paddresses, prioritizedAddress{finder.priority, addr})
}
addresses = append(addresses, addrs...)
m.caches[i].Set(deviceID, CacheEntry{
Addresses: addrs,
when: time.Now(),
@ -134,10 +119,11 @@ func (m *cachingMux) Lookup(deviceID protocol.DeviceID) (addresses []string, err
}
m.mut.RUnlock()
addresses = uniqueSortedAddrs(paddresses)
l.Debugln("lookup results for", deviceID)
l.Debugln(" addresses: ", addresses)
addresses = util.UniqueStrings(addresses)
return addresses, nil
}
@ -239,35 +225,3 @@ func (c *cache) Cache() map[protocol.DeviceID]CacheEntry {
c.mut.Unlock()
return m
}
func uniqueSortedAddrs(ss []prioritizedAddress) []string {
// We sort the addresses by priority, then filter them based on seen
// (first time seen is the on kept, so we retain priority).
sort.Sort(prioritizedAddressList(ss))
filtered := make([]string, 0, len(ss))
seen := make(map[string]struct{}, len(ss))
for _, s := range ss {
if _, ok := seen[s.addr]; !ok {
filtered = append(filtered, s.addr)
seen[s.addr] = struct{}{}
}
}
return filtered
}
type prioritizedAddressList []prioritizedAddress
func (l prioritizedAddressList) Len() int {
return len(l)
}
func (l prioritizedAddressList) Swap(a, b int) {
l[a], l[b] = l[b], l[a]
}
func (l prioritizedAddressList) Less(a, b int) bool {
if l[a].priority != l[b].priority {
return l[a].priority < l[b].priority
}
return l[a].addr < l[b].addr
}

View File

@ -15,19 +15,18 @@ import (
)
func TestCacheUnique(t *testing.T) {
addresses0 := []string{"tcp://192.0.2.44:22000", "tcp://192.0.2.42:22000"} // prio 0
addresses1 := []string{"tcp://192.0.2.43:22000", "tcp://192.0.2.42:22000"} // prio 1
addresses0 := []string{"tcp://192.0.2.44:22000", "tcp://192.0.2.42:22000"}
addresses1 := []string{"tcp://192.0.2.43:22000", "tcp://192.0.2.42:22000"}
// what we expect from just addresses0
addresses0Sorted := []string{"tcp://192.0.2.42:22000", "tcp://192.0.2.44:22000"}
// what we expect from addresses0+addresses1
totalSorted := []string{
// first prio 0, sorted
"tcp://192.0.2.42:22000", "tcp://192.0.2.44:22000",
// then prio 1
"tcp://192.0.2.43:22000",
"tcp://192.0.2.42:22000",
// no duplicate .42
"tcp://192.0.2.43:22000",
"tcp://192.0.2.44:22000",
}
c := NewCachingMux()
@ -38,7 +37,7 @@ func TestCacheUnique(t *testing.T) {
// cache.
f1 := &fakeDiscovery{addresses0}
c.Add(f1, time.Minute, 0, 0)
c.Add(f1, time.Minute, 0)
addr, err := c.Lookup(protocol.LocalDeviceID)
if err != nil {
@ -52,7 +51,7 @@ func TestCacheUnique(t *testing.T) {
// duplicate or otherwise mess up the responses now.
f2 := &fakeDiscovery{addresses1}
c.Add(f2, time.Minute, 0, 1)
c.Add(f2, time.Minute, 0)
addr, err = c.Lookup(protocol.LocalDeviceID)
if err != nil {
@ -92,7 +91,7 @@ func TestCacheSlowLookup(t *testing.T) {
started := make(chan struct{})
f1 := &slowDiscovery{time.Second, started}
c.Add(f1, time.Minute, 0, 0)
c.Add(f1, time.Minute, 0)
// Start a lookup, which will take at least a second