syncthing/lib/relay/client/dynamic.go

194 lines
4.1 KiB
Go

// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package client
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"sort"
"time"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/rand"
"github.com/syncthing/syncthing/lib/relay/protocol"
)
type dynamicClient struct {
commonClient
pooladdr *url.URL
certs []tls.Certificate
timeout time.Duration
client RelayClient
}
func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient {
c := &dynamicClient{
pooladdr: uri,
certs: certs,
timeout: timeout,
}
c.commonClient = newCommonClient(invitations, c.serve, fmt.Sprintf("dynamicClient@%p", c))
return c
}
func (c *dynamicClient) serve(ctx context.Context) error {
uri := *c.pooladdr
// Trim off the `dynamic+` prefix
uri.Scheme = uri.Scheme[8:]
l.Debugln(c, "looking up dynamic relays")
req, err := http.NewRequest("GET", uri.String(), nil)
if err != nil {
l.Debugln(c, "failed to lookup dynamic relays", err)
return err
}
req.Cancel = ctx.Done()
data, err := http.DefaultClient.Do(req)
if err != nil {
l.Debugln(c, "failed to lookup dynamic relays", err)
return err
}
var ann dynamicAnnouncement
err = json.NewDecoder(data.Body).Decode(&ann)
data.Body.Close()
if err != nil {
l.Debugln(c, "failed to lookup dynamic relays", err)
return err
}
var addrs []string
for _, relayAnn := range ann.Relays {
ruri, err := url.Parse(relayAnn.URL)
if err != nil {
l.Debugln(c, "failed to parse dynamic relay address", relayAnn.URL, err)
continue
}
l.Debugln(c, "found", ruri)
addrs = append(addrs, ruri.String())
}
for _, addr := range relayAddressesOrder(ctx, addrs) {
select {
case <-ctx.Done():
l.Debugln(c, "stopping")
return nil
default:
ruri, err := url.Parse(addr)
if err != nil {
l.Debugln(c, "skipping relay", addr, err)
continue
}
client := newStaticClient(ruri, c.certs, c.invitations, c.timeout)
c.mut.Lock()
c.client = client
c.mut.Unlock()
c.client.Serve()
c.mut.Lock()
c.client = nil
c.mut.Unlock()
}
}
l.Debugln(c, "could not find a connectable relay")
return errors.New("could not find a connectable relay")
}
func (c *dynamicClient) Stop() {
c.mut.RLock()
if c.client != nil {
c.client.Stop()
}
c.mut.RUnlock()
c.commonClient.Stop()
}
func (c *dynamicClient) Error() error {
c.mut.RLock()
defer c.mut.RUnlock()
if c.client == nil {
return c.commonClient.Error()
}
return c.client.Error()
}
func (c *dynamicClient) Latency() time.Duration {
c.mut.RLock()
defer c.mut.RUnlock()
if c.client == nil {
return time.Hour
}
return c.client.Latency()
}
func (c *dynamicClient) String() string {
return fmt.Sprintf("DynamicClient:%p:%s@%s", c, c.URI(), c.pooladdr)
}
func (c *dynamicClient) URI() *url.URL {
c.mut.RLock()
defer c.mut.RUnlock()
if c.client == nil {
return nil
}
return c.client.URI()
}
// This is the announcement received from the relay server;
// {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]}
type dynamicAnnouncement struct {
Relays []struct {
URL string
}
}
// relayAddressesOrder checks the latency to each relay, rounds latency down to
// the closest 50ms, and puts them in buckets of 50ms latency ranges. Then
// shuffles each bucket, and returns all addresses starting with the ones from
// the lowest latency bucket, ending with the highest latency buceket.
func relayAddressesOrder(ctx context.Context, input []string) []string {
buckets := make(map[int][]string)
for _, relay := range input {
latency, err := osutil.GetLatencyForURL(ctx, relay)
if err != nil {
latency = time.Hour
}
id := int(latency/time.Millisecond) / 50
buckets[id] = append(buckets[id], relay)
select {
case <-ctx.Done():
return nil
default:
}
}
var ids []int
for id, bucket := range buckets {
rand.Shuffle(bucket)
ids = append(ids, id)
}
sort.Ints(ids)
addresses := make([]string, 0, len(input))
for _, id := range ids {
addresses = append(addresses, buckets[id]...)
}
return addresses
}