Configuration version 2 (nodes separate from repos)

This commit is contained in:
Jakob Borg 2014-04-08 13:45:18 +02:00
parent ee0ee0e39d
commit 3b3c0c5950
4 changed files with 166 additions and 54 deletions

View File

@ -11,8 +11,9 @@ import (
)
type Configuration struct {
Version int `xml:"version,attr" default:"1"`
Version int `xml:"version,attr" default:"2"`
Repositories []RepositoryConfiguration `xml:"repository"`
Nodes []NodeConfiguration `xml:"node"`
Options OptionsConfiguration `xml:"options"`
XMLName xml.Name `xml:"configuration" json:"-"`
}
@ -21,27 +22,38 @@ type RepositoryConfiguration struct {
ID string `xml:"id,attr"`
Directory string `xml:"directory,attr"`
Nodes []NodeConfiguration `xml:"node"`
ReadOnly bool `xml:"ro,attr"`
nodeIDs []string
}
func (r *RepositoryConfiguration) NodeIDs() []string {
if r.nodeIDs == nil {
for _, n := range r.Nodes {
r.nodeIDs = append(r.nodeIDs, n.NodeID)
}
}
return r.nodeIDs
}
type NodeConfiguration struct {
NodeID string `xml:"id,attr"`
Name string `xml:"name,attr"`
Addresses []string `xml:"address"`
Name string `xml:"name,attr,omitempty"`
Addresses []string `xml:"address,omitempty"`
}
type OptionsConfiguration struct {
ListenAddress []string `xml:"listenAddress" default:":22000" ini:"listen-address"`
ReadOnly bool `xml:"readOnly" ini:"read-only"`
GUIEnabled bool `xml:"guiEnabled" default:"true" ini:"gui-enabled"`
GUIAddress string `xml:"guiAddress" default:"127.0.0.1:8080" ini:"gui-address"`
GlobalAnnServer string `xml:"globalAnnounceServer" default:"announce.syncthing.net:22025" ini:"global-announce-server"`
GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true" ini:"global-announce-enabled"`
LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true" ini:"local-announce-enabled"`
ParallelRequests int `xml:"parallelRequests" default:"16" ini:"parallel-requests"`
MaxSendKbps int `xml:"maxSendKbps" ini:"max-send-kbps"`
RescanIntervalS int `xml:"rescanIntervalS" default:"60" ini:"rescan-interval"`
ReconnectIntervalS int `xml:"reconnectionIntervalS" default:"60" ini:"reconnection-interval"`
MaxChangeKbps int `xml:"maxChangeKbps" default:"1000" ini:"max-change-bw"`
ListenAddress []string `xml:"listenAddress" default:":22000"`
ReadOnly bool `xml:"readOnly,omitempty"`
GUIEnabled bool `xml:"guiEnabled" default:"true"`
GUIAddress string `xml:"guiAddress" default:"127.0.0.1:8080"`
GlobalAnnServer string `xml:"globalAnnounceServer" default:"announce.syncthing.net:22025"`
GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true"`
LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true"`
ParallelRequests int `xml:"parallelRequests" default:"16"`
MaxSendKbps int `xml:"maxSendKbps"`
RescanIntervalS int `xml:"rescanIntervalS" default:"60"`
ReconnectIntervalS int `xml:"reconnectionIntervalS" default:"60"`
MaxChangeKbps int `xml:"maxChangeKbps" default:"1000"`
StartBrowser bool `xml:"startBrowser" default:"true"`
}
@ -159,9 +171,38 @@ func readConfigXML(rd io.Reader) (Configuration, error) {
seenRepos[id] = true
}
if cfg.Version == 1 {
convertV1V2(&cfg)
}
return cfg, err
}
func convertV1V2(cfg *Configuration) {
// Collect the list of nodes.
// Replace node configs inside repositories with only a reference to the nide ID.
// Set all repositories to read only if the global read only flag is set.
var nodes = map[string]NodeConfiguration{}
for i, repo := range cfg.Repositories {
cfg.Repositories[i].ReadOnly = cfg.Options.ReadOnly
for j, node := range repo.Nodes {
if _, ok := nodes[node.NodeID]; !ok {
nodes[node.NodeID] = node
}
cfg.Repositories[i].Nodes[j] = NodeConfiguration{NodeID: node.NodeID}
}
}
// Set and sort the list of nodes.
for _, node := range nodes {
cfg.Nodes = append(cfg.Nodes, node)
}
sort.Sort(NodeConfigurationList(cfg.Nodes))
cfg.Options.ReadOnly = false
cfg.Version = 2
}
type NodeConfigurationList []NodeConfiguration
func (l NodeConfigurationList) Less(a, b int) bool {

View File

@ -10,7 +10,6 @@ import (
func TestDefaultValues(t *testing.T) {
expected := OptionsConfiguration{
ListenAddress: []string{":22000"},
ReadOnly: false,
GUIEnabled: true,
GUIAddress: "127.0.0.1:8080",
GlobalAnnServer: "announce.syncthing.net:22025",
@ -34,6 +33,81 @@ func TestDefaultValues(t *testing.T) {
}
}
func TestNodeConfig(t *testing.T) {
v1data := []byte(`
<configuration version="1">
<repository id="test" directory="~/Sync">
<node id="node1" name="node one">
<address>a</address>
</node>
<node id="node2" name="node two">
<address>b</address>
</node>
</repository>
<options>
<readOnly>true</readOnly>
</options>
</configuration>
`)
v2data := []byte(`
<configuration version="2">
<repository id="test" directory="~/Sync" ro="true">
<node id="node1"/>
<node id="node2"/>
</repository>
<node id="node1" name="node one">
<address>a</address>
</node>
<node id="node2" name="node two">
<address>b</address>
</node>
</configuration>
`)
for i, data := range [][]byte{v1data, v2data} {
cfg, err := readConfigXML(bytes.NewReader(data))
if err != nil {
t.Error(err)
}
expectedRepos := []RepositoryConfiguration{
{
ID: "test",
Directory: "~/Sync",
Nodes: []NodeConfiguration{{NodeID: "node1"}, {NodeID: "node2"}},
ReadOnly: true,
},
}
expectedNodes := []NodeConfiguration{
{
NodeID: "node1",
Name: "node one",
Addresses: []string{"a"},
},
{
NodeID: "node2",
Name: "node two",
Addresses: []string{"b"},
},
}
expectedNodeIDs := []string{"node1", "node2"}
if cfg.Version != 2 {
t.Errorf("%d: Incorrect version %d != 2", i, cfg.Version)
}
if !reflect.DeepEqual(cfg.Repositories, expectedRepos) {
t.Errorf("%d: Incorrect Repositories\n A: %#v\n E: %#v", i, cfg.Repositories, expectedRepos)
}
if !reflect.DeepEqual(cfg.Nodes, expectedNodes) {
t.Errorf("%d: Incorrect Nodes\n A: %#v\n E: %#v", i, cfg.Nodes, expectedNodes)
}
if !reflect.DeepEqual(cfg.Repositories[0].NodeIDs(), expectedNodeIDs) {
t.Errorf("%d: Incorrect NodeIDs\n A: %#v\n E: %#v", i, cfg.Repositories[0].NodeIDs(), expectedNodeIDs)
}
}
}
func TestNoListenAddress(t *testing.T) {
data := []byte(`<configuration version="1">
<repository directory="~/Sync">
@ -59,7 +133,7 @@ func TestNoListenAddress(t *testing.T) {
}
func TestOverriddenValues(t *testing.T) {
data := []byte(`<configuration version="1">
data := []byte(`<configuration version="2">
<repository directory="~/Sync">
<node id="..." name="...">
<address>dynamic</address>
@ -67,7 +141,6 @@ func TestOverriddenValues(t *testing.T) {
</repository>
<options>
<listenAddress>:23000</listenAddress>
<readOnly>true</readOnly>
<allowDelete>false</allowDelete>
<guiEnabled>false</guiEnabled>
<guiAddress>125.2.2.2:8080</guiAddress>
@ -86,7 +159,6 @@ func TestOverriddenValues(t *testing.T) {
expected := OptionsConfiguration{
ListenAddress: []string{":23000"},
ReadOnly: true,
GUIEnabled: false,
GUIAddress: "125.2.2.2:8080",
GlobalAnnServer: "syncthing.nym.se:22025",

View File

@ -16,6 +16,7 @@ import (
"runtime/debug"
"strings"
"time"
"github.com/calmh/syncthing/discover"
"github.com/calmh/syncthing/protocol"
"github.com/juju/ratelimit"
@ -128,11 +129,12 @@ func main() {
{
ID: "default",
Directory: filepath.Join(getHomeDir(), "Sync"),
Nodes: []NodeConfiguration{
{NodeID: myID, Addresses: []string{"dynamic"}},
},
Nodes: []NodeConfiguration{{NodeID: myID}},
},
}
cfg.Nodes = []NodeConfiguration{
{NodeID: myID, Addresses: []string{"dynamic"}},
}
saveConfig()
infof("Edit %s to taste or use the GUI\n", cfgFile)
@ -227,14 +229,16 @@ func main() {
disc := discovery()
go listenConnect(myID, disc, m, tlsCfg, connOpts)
// Routine to pull blocks from other nodes to synchronize the local
// repository. Does not run when we are in read only (publish only) mode.
if cfg.Options.ReadOnly {
okln("Ready to synchronize (read only; no external updates accepted)")
m.StartRO()
} else {
okln("Ready to synchronize (read-write)")
m.StartRW(cfg.Options.ParallelRequests)
for _, repo := range cfg.Repositories {
// Routine to pull blocks from other nodes to synchronize the local
// repository. Does not run when we are in read only (publish only) mode.
if repo.ReadOnly {
okf("Ready to synchronize %s (read only; no external updates accepted)", repo.ID)
m.StartRepoRO(repo.ID)
} else {
okf("Ready to synchronize %s (read-write)", repo.ID)
m.StartRepoRW(repo.ID, cfg.Options.ParallelRequests)
}
}
select {}
@ -362,13 +366,15 @@ func listenConnect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls
go func() {
for {
nextNode:
for _, nodeCfg := range cfg.Repositories[0].Nodes {
for _, nodeCfg := range cfg.Nodes {
if nodeCfg.NodeID == myID {
continue
}
if m.ConnectedTo(nodeCfg.NodeID) {
continue
}
var addrs []string
for _, addr := range nodeCfg.Addresses {
if addr == "dynamic" {
if disc != nil {
@ -376,10 +382,14 @@ func listenConnect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls
if len(t) == 0 {
continue
}
addr = t[0] //XXX: Handle all of them
addrs = append(addrs, t...)
}
} else {
addrs = append(addrs, addr)
}
}
for _, addr := range addrs {
if debugNet {
dlog.Println("dial", nodeCfg.NodeID, addr)
}

View File

@ -66,15 +66,13 @@ func NewModel(maxChangeBw int) *Model {
// StartRW starts read/write processing on the current model. When in
// read/write mode the model will attempt to keep in sync with the cluster by
// pulling needed files from peer nodes.
func (m *Model) StartRW(threads int) {
func (m *Model) StartRepoRW(repo string, threads int) {
m.rmut.Lock()
defer m.rmut.Unlock()
if !m.addedRepo {
if dir, ok := m.repoDirs[repo]; !ok {
panic("cannot start without repo")
}
m.started = true
for repo, dir := range m.repoDirs {
} else {
newPuller(repo, dir, m, threads)
}
}
@ -82,17 +80,8 @@ func (m *Model) StartRW(threads int) {
// StartRO starts read only processing on the current model. When in
// read only mode the model will announce files to the cluster but not
// pull in any external changes.
func (m *Model) StartRO() {
m.rmut.Lock()
defer m.rmut.Unlock()
if !m.addedRepo {
panic("cannot start without repo")
}
m.started = true
for repo, dir := range m.repoDirs {
newPuller(repo, dir, m, 0) // zero threads => read only
}
func (m *Model) StartRepoRO(repo string) {
m.StartRepoRW(repo, 0) // zero threads => read only
}
type ConnectionInfo struct {
@ -555,12 +544,12 @@ func (m *Model) ScanRepos() {
func (m *Model) ScanRepo(repo string) {
sup := &suppressor{threshold: int64(cfg.Options.MaxChangeKbps)}
w := &scanner.Walker{
Dir: m.repoDirs[repo],
IgnoreFile: ".stignore",
BlockSize: BlockSize,
TempNamer: defTempNamer,
Suppressor: sup,
CurrentFiler: cFiler{m, repo},
Dir: m.repoDirs[repo],
IgnoreFile: ".stignore",
BlockSize: BlockSize,
TempNamer: defTempNamer,
Suppressor: sup,
CurrentFiler: cFiler{m, repo},
}
fs, _ := w.Walk()
m.ReplaceLocal(repo, fs)