commit bfe935b5ab3518cac083c9b748715b6aabb50215 Author: Jakob Borg Date: Sun Dec 15 11:43:31 2013 +0100 REBASE! diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..e6f784269 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +syncthing diff --git a/LICENSE b/LICENSE new file mode 100644 index 000000000..fa5b4e205 --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +Copyright (C) 2013 Jakob Borg + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +- The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 000000000..6568a168a --- /dev/null +++ b/README.md @@ -0,0 +1,161 @@ +syncthing +========= + +This is `syncthing`, an open BitTorrent Sync alternative. It is +currently far from ready for mass consumption, but it is a usable proof +of concept and tech demo. The following are the project goals: + + 1. Define an open, secure, language neutral protocol usable for + efficient synchronization of a file repository between an arbitrary + number of nodes. This is the [Block Exchange + Protocol](https://github.com/calmh/syncthing/blob/master/protocol/PROTOCOL.md) + (BEP). + + 2. Provide the reference implementation to demonstrate the usability of + said protocol. This is the `syncthing` utility. + +The two are evolving together; the protocol is not to be considered +stable until syncthing 1.0 is released, at which point it is locked down +for incompatible changes. + +Syncthing does not use the BitTorrent protocol. The reasons for this are +1) we don't know if BitTorrent Sync does either, so there's nothing to +be compatible with, 2) BitTorrent includes a lot of functionality for +making sure large swarms of selfish agents behave and somehow work +towards a common goal. Here we have a much smaller swarm of cooperative +agents and a simpler approach will suffice. + +Features +-------- + +The following features are _currently implemented and working_: + + * The formation of a cluster of nodes, certificate authenticated and + communicating over TLS over TCP. + + * Synchronization of a single directory among the cluster nodes. + + * Change detection by periodic scanning of the local repository. + + * Static configuration of cluster nodes. + + * Automatic discovery of cluster nodes on the local network. See + [discover.go](https://github.com/calmh/syncthing/blob/master/discover/discover.go) + for the protocol specification. + + * Handling of deleted files. Deletes can be propagated or ignored per + client. + +The following features are _not yet implemented but planned_: + + * Syncing multiple directories from the same syncthing instance. + + * Change detection by listening to file system notifications instead of + periodic scanning. + + * HTTP GUI. + +The following features are _not implemented but may be implemented_ in +the future: + + * Automatic remote node discovery using a DHT. This is not technically + very difficult but requires one or more globally reachable root + nodes. This is open for discussion -- perhaps we can piggyback on an + existing DHT, or root nodes need to be established in some other + manner. + + * Automatic NAT handling via UPNP. Required for the above, not very + useful without it. + + * Conflict resolution. Currently whichever file has the newest + modification time "wins". The correct behavior in the face of + conflicts is open for discussion. + +Security +-------- + +Security is one of the primary project goals. This means that it should +not be possible for an attacker to join a cluster uninvited, and it +should not be possible to extract private information from intercepted +traffic. Currently this is implemented as follows. + +All traffic is protected by TLS. To prevent uninvited nodes from joining +a cluster, the certificate fingerprint of each node is compared to a +preset list of acceptable nodes at connection establishment. The +fingerprint is computed as the SHA-1 hash of the certificate and +displayed in BASE32 encoding to form a compact yet convenient string. +Currently SHA-1 is deemed secure against preimage attacks. + +Usage +===== + +`go get github.com/calmh/syncthing` + +Check out the options: + +``` +$ syncthing --help +Usage: + syncthing [options] + +... +``` + +Run syncthing to let it create it's config directory and certificate: + +``` +$ syncthing +11:34:13 tls.go:61: OK: wrote cert.pem +11:34:13 tls.go:67: OK: wrote key.pem +11:34:13 main.go:66: INFO: My ID: NCTBZAAHXR6ZZP3D7SL3DLYFFQERMW4Q +11:34:13 main.go:90: FATAL: No config file +``` + +Take note of the "My ID: ..." line. Perform the same operation on +another computer (or the same computer but with a different `--home` for +testing) to create another node. Take note of that ID as well, and +create a config file `~/.syncthing/syncthing.ini` looking something like +this: + +``` +[repository] +dir = /Users/jb/Synced + +[nodes] +NCTBZAAHXR6ZZP3D7SL3DLYFFQERMW4Q = 172.16.32.1:22000 192.23.34.56:22000 +CUGAE43Y5N64CRJU26YFH6MTWPSBLSUL = dynamic +``` + +This assumes that the first node is reachable on either of the two +addresses listed (perhaps one internal and one port-forwarded external) +and that the other node is not normally reachable from the outside. Save +this config file, identically, to both nodes. If both nodes are running +on the same network, you can set all addresses to 'dynamic' and they +will find each other by local node discovery. + +Start syncthing on both nodes. If you're running both on the same +computer, one needs a different repository directory (in the config +file) and listening port (set as a command line paramter). For the +cautious, one side can be set to be read only. + +``` +$ syncthing --ro +13:30:55 main.go:102: INFO: My ID: NCTBZAAHXR6ZZP3D7SL3DLYFFQERMW4Q +13:30:55 main.go:149: INFO: Initial repository scan in progress +13:30:59 main.go:153: INFO: Listening for incoming connections +13:30:59 main.go:157: INFO: Attempting to connect to other nodes +13:30:59 main.go:247: INFO: Starting local discovery +13:30:59 main.go:165: OK: Ready to synchronize +13:31:04 discover.go:113: INFO: Discovered node CUGAE43Y5N64CRJU26YFH6MTWPSBLSUL at 172.16.32.24:23456 +13:31:14 main.go:296: OK: Connected to node CUGAE43Y5N64CRJU26YFH6MTWPSBLSUL +13:31:19 main.go:345: INFO: Transferred 139 KiB in (14 KiB/s), 139 KiB out (14 KiB/s) +... +``` +You should see the synchronization start and then finish a short while +later. Add nodes to taste. + +License +======= + +MIT + diff --git a/blocks.go b/blocks.go new file mode 100644 index 000000000..46504d2cb --- /dev/null +++ b/blocks.go @@ -0,0 +1,67 @@ +package main + +import ( + "bytes" + "crypto/sha256" + "io" +) + +type BlockList []Block + +type Block struct { + Offset uint64 + Length uint32 + Hash []byte +} + +// Blocks returns the blockwise hash of the reader. +func Blocks(r io.Reader, blocksize int) (BlockList, error) { + var blocks BlockList + var offset uint64 + for { + lr := &io.LimitedReader{r, int64(blocksize)} + hf := sha256.New() + n, err := io.Copy(hf, lr) + if err != nil { + return nil, err + } + + if n == 0 { + break + } + + b := Block{ + Offset: offset, + Length: uint32(n), + Hash: hf.Sum(nil), + } + blocks = append(blocks, b) + offset += uint64(n) + } + + return blocks, nil +} + +// To returns the list of blocks necessary to transform src into dst. +// Both block lists must have been created with the same block size. +func (src BlockList) To(tgt BlockList) (have, need BlockList) { + if len(tgt) == 0 && len(src) != 0 { + return nil, nil + } + + if len(tgt) != 0 && len(src) == 0 { + // Copy the entire file + return nil, tgt + } + + for i := range tgt { + if i >= len(src) || bytes.Compare(tgt[i].Hash, src[i].Hash) != 0 { + // Copy differing block + need = append(need, tgt[i]) + } else { + have = append(have, tgt[i]) + } + } + + return have, need +} diff --git a/blocks_test.go b/blocks_test.go new file mode 100644 index 000000000..256db278d --- /dev/null +++ b/blocks_test.go @@ -0,0 +1,115 @@ +package main + +import ( + "bytes" + "fmt" + "testing" +) + +var blocksTestData = []struct { + data []byte + blocksize int + hash []string +}{ + {[]byte(""), 1024, []string{}}, + {[]byte("contents"), 1024, []string{ + "d1b2a59fbea7e20077af9f91b27e95e865061b270be03ff539ab3b73587882e8"}}, + {[]byte("contents"), 9, []string{ + "d1b2a59fbea7e20077af9f91b27e95e865061b270be03ff539ab3b73587882e8"}}, + {[]byte("contents"), 8, []string{ + "d1b2a59fbea7e20077af9f91b27e95e865061b270be03ff539ab3b73587882e8"}}, + {[]byte("contents"), 7, []string{ + "ed7002b439e9ac845f22357d822bac1444730fbdb6016d3ec9432297b9ec9f73", + "043a718774c572bd8a25adbeb1bfcd5c0256ae11cecf9f9c3f925d0e52beaf89"}, + }, + {[]byte("contents"), 3, []string{ + "1143da2bc54c495c4be31d3868785d39ffdfd56df5668f0645d8f14d47647952", + "e4432baa90819aaef51d2a7f8e148bf7e679610f3173752fabb4dcb2d0f418d3", + "44ad63f60af0f6db6fdde6d5186ef78176367df261fa06be3079b6c80c8adba4"}, + }, + {[]byte("conconts"), 3, []string{ + "1143da2bc54c495c4be31d3868785d39ffdfd56df5668f0645d8f14d47647952", + "1143da2bc54c495c4be31d3868785d39ffdfd56df5668f0645d8f14d47647952", + "44ad63f60af0f6db6fdde6d5186ef78176367df261fa06be3079b6c80c8adba4"}, + }, + {[]byte("contenten"), 3, []string{ + "1143da2bc54c495c4be31d3868785d39ffdfd56df5668f0645d8f14d47647952", + "e4432baa90819aaef51d2a7f8e148bf7e679610f3173752fabb4dcb2d0f418d3", + "e4432baa90819aaef51d2a7f8e148bf7e679610f3173752fabb4dcb2d0f418d3"}, + }, +} + +func TestBlocks(t *testing.T) { + for _, test := range blocksTestData { + buf := bytes.NewBuffer(test.data) + blocks, err := Blocks(buf, test.blocksize) + + if err != nil { + t.Fatal(err) + } + + if l := len(blocks); l != len(test.hash) { + t.Fatalf("Incorrect number of blocks %d != %d", l, len(test.hash)) + } else { + i := 0 + for off := uint64(0); off < uint64(len(test.data)); off += uint64(test.blocksize) { + if blocks[i].Offset != off { + t.Errorf("Incorrect offset for block %d: %d != %d", i, blocks[i].Offset, off) + } + + bs := test.blocksize + if rem := len(test.data) - int(off); bs > rem { + bs = rem + } + if int(blocks[i].Length) != bs { + t.Errorf("Incorrect length for block %d: %d != %d", i, blocks[i].Length, bs) + } + if h := fmt.Sprintf("%x", blocks[i].Hash); h != test.hash[i] { + t.Errorf("Incorrect block hash %q != %q", h, test.hash[i]) + } + + i++ + } + } + } +} + +var diffTestData = []struct { + a string + b string + s int + d []Block +}{ + {"contents", "contents", 1024, []Block{}}, + {"", "", 1024, []Block{}}, + {"contents", "contents", 3, []Block{}}, + {"contents", "cantents", 3, []Block{{0, 3, nil}}}, + {"contents", "contants", 3, []Block{{3, 3, nil}}}, + {"contents", "cantants", 3, []Block{{0, 3, nil}, {3, 3, nil}}}, + {"contents", "", 3, nil}, + {"", "contents", 3, []Block{{0, 3, nil}, {3, 3, nil}, {6, 2, nil}}}, + {"con", "contents", 3, []Block{{3, 3, nil}, {6, 2, nil}}}, + {"contents", "con", 3, nil}, + {"contents", "cont", 3, []Block{{3, 1, nil}}}, + {"cont", "contents", 3, []Block{{3, 3, nil}, {6, 2, nil}}}, +} + +func TestDiff(t *testing.T) { + for i, test := range diffTestData { + a, _ := Blocks(bytes.NewBufferString(test.a), test.s) + b, _ := Blocks(bytes.NewBufferString(test.b), test.s) + _, d := a.To(b) + if len(d) != len(test.d) { + t.Fatalf("Incorrect length for diff %d; %d != %d", i, len(d), len(test.d)) + } else { + for j := range test.d { + if d[j].Offset != test.d[j].Offset { + t.Errorf("Incorrect offset for diff %d block %d; %d != %d", i, j, d[j].Offset, test.d[j].Offset) + } + if d[j].Length != test.d[j].Length { + t.Errorf("Incorrect length for diff %d block %d; %d != %d", i, j, d[j].Length, test.d[j].Length) + } + } + } + } +} diff --git a/buffers/buffers.go b/buffers/buffers.go new file mode 100644 index 000000000..0ed1e7826 --- /dev/null +++ b/buffers/buffers.go @@ -0,0 +1,26 @@ +package buffers + +var buffers = make(chan []byte, 32) + +func Get(size int) []byte { + var buf []byte + select { + case buf = <-buffers: + default: + } + if len(buf) < size { + return make([]byte, size) + } + return buf[:size] +} + +func Put(buf []byte) { + if cap(buf) == 0 { + return + } + buf = buf[:cap(buf)] + select { + case buffers <- buf: + default: + } +} diff --git a/discover/discover.go b/discover/discover.go new file mode 100644 index 000000000..e8e335e5e --- /dev/null +++ b/discover/discover.go @@ -0,0 +1,121 @@ +/* +This is the local node discovery protocol. In principle we might be better +served by something more standardized, such as mDNS / DNS-SD. In practice, this +was much easier and quicker to get up and running. + +The mode of operation is to periodically (currently once every 30 seconds) +transmit a broadcast UDP packet to the well known port number 21025. The packet +has the following format: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Magic Number | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Port Number | Length of NodeID | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ NodeID (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +The sending node's address is not encoded -- it is taken to be the source +address of the announcement. Every time such a packet is received, a local +table that maps NodeID to Address is updated. When the local node wants to +connect to another node with the address specification 'dynamic', this table is +consulted. +*/ +package discover + +import ( + "encoding/binary" + "fmt" + "net" + "sync" + "time" +) + +type Discoverer struct { + MyID string + ListenPort int + BroadcastIntv time.Duration + + conn *net.UDPConn + registry map[string]string + registryLock sync.RWMutex +} + +func NewDiscoverer(id string, port int) (*Discoverer, error) { + local4 := &net.UDPAddr{IP: net.IP{0, 0, 0, 0}, Port: 21025} + conn, err := net.ListenUDP("udp4", local4) + if err != nil { + return nil, err + } + + disc := &Discoverer{ + MyID: id, + ListenPort: port, + BroadcastIntv: 30 * time.Second, + conn: conn, + registry: make(map[string]string), + } + + go disc.sendAnnouncements() + go disc.recvAnnouncements() + + return disc, nil +} + +func (d *Discoverer) sendAnnouncements() { + remote4 := &net.UDPAddr{IP: net.IP{255, 255, 255, 255}, Port: 21025} + + idbs := []byte(d.MyID) + buf := make([]byte, 4+4+4+len(idbs)) + + binary.BigEndian.PutUint32(buf, uint32(0x121025)) + binary.BigEndian.PutUint16(buf[4:], uint16(d.ListenPort)) + binary.BigEndian.PutUint16(buf[6:], uint16(len(idbs))) + copy(buf[8:], idbs) + + for { + _, _, err := d.conn.WriteMsgUDP(buf, nil, remote4) + if err != nil { + panic(err) + } + time.Sleep(d.BroadcastIntv) + } +} + +func (d *Discoverer) recvAnnouncements() { + var buf = make([]byte, 1024) + for { + _, addr, err := d.conn.ReadFromUDP(buf) + if err != nil { + panic(err) + } + magic := binary.BigEndian.Uint32(buf) + if magic != 0x121025 { + continue + } + port := binary.BigEndian.Uint16(buf[4:]) + l := binary.BigEndian.Uint16(buf[6:]) + idbs := buf[8 : l+8] + id := string(idbs) + + if id != d.MyID { + nodeAddr := fmt.Sprintf("%s:%d", addr.IP.String(), port) + d.registryLock.Lock() + if d.registry[id] != nodeAddr { + d.registry[id] = nodeAddr + } + d.registryLock.Unlock() + } + } +} + +func (d *Discoverer) Lookup(node string) (string, bool) { + d.registryLock.Lock() + defer d.registryLock.Unlock() + addr, ok := d.registry[node] + return addr, ok +} diff --git a/logger.go b/logger.go new file mode 100644 index 000000000..4e4eb532d --- /dev/null +++ b/logger.go @@ -0,0 +1,72 @@ +package main + +import ( + "fmt" + "log" + "os" +) + +var debugEnabled = true +var logger = log.New(os.Stderr, "", log.Lshortfile|log.Ltime) + +func debugln(vals ...interface{}) { + if debugEnabled { + s := fmt.Sprintln(vals...) + logger.Output(2, "DEBUG: "+s) + } +} + +func debugf(format string, vals ...interface{}) { + if debugEnabled { + s := fmt.Sprintf(format, vals...) + logger.Output(2, "DEBUG: "+s) + } +} + +func infoln(vals ...interface{}) { + s := fmt.Sprintln(vals...) + logger.Output(2, "INFO: "+s) +} + +func infof(format string, vals ...interface{}) { + s := fmt.Sprintf(format, vals...) + logger.Output(2, "INFO: "+s) +} + +func okln(vals ...interface{}) { + s := fmt.Sprintln(vals...) + logger.Output(2, "OK: "+s) +} + +func okf(format string, vals ...interface{}) { + s := fmt.Sprintf(format, vals...) + logger.Output(2, "OK: "+s) +} + +func warnln(vals ...interface{}) { + s := fmt.Sprintln(vals...) + logger.Output(2, "WARNING: "+s) +} + +func warnf(format string, vals ...interface{}) { + s := fmt.Sprintf(format, vals...) + logger.Output(2, "WARNING: "+s) +} + +func fatalln(vals ...interface{}) { + s := fmt.Sprintln(vals...) + logger.Output(2, "FATAL: "+s) + os.Exit(3) +} + +func fatalf(format string, vals ...interface{}) { + s := fmt.Sprintf(format, vals...) + logger.Output(2, "FATAL: "+s) + os.Exit(3) +} + +func fatalErr(err error) { + if err != nil { + fatalf(err.Error()) + } +} diff --git a/main.go b/main.go new file mode 100644 index 000000000..c1c654446 --- /dev/null +++ b/main.go @@ -0,0 +1,352 @@ +package main + +import ( + "crypto/sha1" + "crypto/tls" + "fmt" + "log" + "net" + "net/http" + _ "net/http/pprof" + "os" + "path" + "strconv" + "strings" + "time" + + "github.com/calmh/ini" + "github.com/calmh/syncthing/discover" + "github.com/calmh/syncthing/protocol" + docopt "github.com/docopt/docopt.go" +) + +const ( + confDirName = ".syncthing" + confFileName = "syncthing.ini" + usage = `Usage: + syncthing [options] + +Options: + -l Listening address [default: :22000] + -p Enable HTTP profiler on addr + --home Home directory + --delete Delete files that were deleted on a peer node + --ro Local repository is read only + --scan-intv Repository scan interval, in seconds [default: 60] + --conn-intv Node reconnect interval, in seconds [default: 15] + --no-stats Don't print transfer statistics + +Help Options: + -h, --help Show this help + --version Show version + +Debug Options: + --trace-file Trace file operations + --trace-net Trace network operations + --trace-idx Trace sent indexes +` +) + +var ( + config ini.Config + nodeAddrs = make(map[string][]string) +) + +// Options +var ( + confDir = path.Join(getHomeDir(), confDirName) + addr string + prof string + readOnly bool + scanIntv int + connIntv int + traceNet bool + traceFile bool + traceIdx bool + printStats bool + doDelete bool +) + +func main() { + // Useful for debugging; to be adjusted. + log.SetFlags(log.Ltime | log.Lshortfile) + + arguments, _ := docopt.Parse(usage, nil, true, "syncthing 0.1", false) + + addr = arguments["-l"].(string) + prof, _ = arguments["-p"].(string) + readOnly, _ = arguments["--ro"].(bool) + + if arguments["--home"] != nil { + confDir, _ = arguments["--home"].(string) + } + + scanIntv, _ = strconv.Atoi(arguments["--scan-intv"].(string)) + if scanIntv == 0 { + fatalln("Invalid --scan-intv") + } + + connIntv, _ = strconv.Atoi(arguments["--conn-intv"].(string)) + if connIntv == 0 { + fatalln("Invalid --conn-intv") + } + + doDelete = arguments["--delete"].(bool) + traceFile = arguments["--trace-file"].(bool) + traceNet = arguments["--trace-net"].(bool) + traceIdx = arguments["--trace-idx"].(bool) + printStats = !arguments["--no-stats"].(bool) + + // Ensure that our home directory exists and that we have a certificate and key. + + ensureDir(confDir) + cert, err := loadCert(confDir) + if err != nil { + newCertificate(confDir) + cert, err = loadCert(confDir) + fatalErr(err) + } + + myID := string(certId(cert.Certificate[0])) + infoln("My ID:", myID) + + if prof != "" { + okln("Profiler listening on", prof) + go func() { + http.ListenAndServe(prof, nil) + }() + } + + // The TLS configuration is used for both the listening socket and outgoing + // connections. + + cfg := &tls.Config{ + ClientAuth: tls.RequestClientCert, + ServerName: "syncthing", + NextProtos: []string{"bep/1.0"}, + InsecureSkipVerify: true, + Certificates: []tls.Certificate{cert}, + } + + // Load the configuration file, if it exists. + + cf, err := os.Open(path.Join(confDir, confFileName)) + if err != nil { + fatalln("No config file") + config = ini.Config{} + } + config = ini.Parse(cf) + cf.Close() + + var dir = config.Get("repository", "dir") + + // Create a map of desired node connections based on the configuration file + // directives. + + for nodeID, addrs := range config.OptionMap("nodes") { + addrs := strings.Fields(addrs) + nodeAddrs[nodeID] = addrs + } + + m := NewModel(dir) + + // Walk the repository and update the local model before establishing any + // connections to other nodes. + + infoln("Iniial repository scan in progress") + loadIndex(m) + updateLocalModel(m) + + // Routine to listen for incoming connections + infoln("Listening for incoming connections") + go listen(myID, addr, m, cfg) + + // Routine to connect out to configured nodes + infoln("Attempting to connect to other nodes") + go connect(myID, addr, nodeAddrs, m, cfg) + + // Routine to pull blocks from other nodes to synchronize the local + // repository. Does not run when we are in read only (publish only) mode. + if !readOnly { + infoln("Cleaning out incomplete synchronizations") + CleanTempFiles(dir) + okln("Ready to synchronize") + m.Start() + } + + // Periodically scan the repository and update the local model. + // XXX: Should use some fsnotify mechanism. + go func() { + for { + time.Sleep(time.Duration(scanIntv) * time.Second) + updateLocalModel(m) + } + }() + + select {} +} + +func listen(myID string, addr string, m *Model, cfg *tls.Config) { + l, err := tls.Listen("tcp", addr, cfg) + fatalErr(err) + +listen: + for { + conn, err := l.Accept() + if err != nil { + warnln(err) + continue + } + + if traceNet { + debugln("NET: Connect from", conn.RemoteAddr()) + } + + tc := conn.(*tls.Conn) + err = tc.Handshake() + if err != nil { + warnln(err) + tc.Close() + continue + } + + remoteID := certId(tc.ConnectionState().PeerCertificates[0].Raw) + + if remoteID == myID { + warnf("Connect from myself (%s) - should not happen", remoteID) + conn.Close() + continue + } + + if m.ConnectedTo(remoteID) { + warnf("Connect from connected node (%s)", remoteID) + } + + for nodeID := range nodeAddrs { + if nodeID == remoteID { + nc := protocol.NewConnection(remoteID, conn, conn, m) + m.AddNode(nc) + okln("Connected to nodeID", remoteID, "(in)") + continue listen + } + } + + warnln("Connect from unknown node", remoteID) + conn.Close() + } +} + +func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model, cfg *tls.Config) { + _, portstr, err := net.SplitHostPort(addr) + fatalErr(err) + port, _ := strconv.Atoi(portstr) + + infoln("Starting local discovery") + disc, err := discover.NewDiscoverer(myID, port) + if err != nil { + warnln("No local discovery possible") + } + + for { + nextNode: + for nodeID, addrs := range nodeAddrs { + if nodeID == myID { + continue + } + if m.ConnectedTo(nodeID) { + continue + } + for _, addr := range addrs { + if addr == "dynamic" { + var ok bool + if disc != nil { + addr, ok = disc.Lookup(nodeID) + } + if !ok { + continue + } + } + + if traceNet { + debugln("NET: Dial", nodeID, addr) + } + conn, err := tls.Dial("tcp", addr, cfg) + if err != nil { + if traceNet { + debugln("NET:", err) + } + continue + } + + remoteID := certId(conn.ConnectionState().PeerCertificates[0].Raw) + if remoteID != nodeID { + warnln("Unexpected nodeID", remoteID, "!=", nodeID) + conn.Close() + continue + } + + nc := protocol.NewConnection(nodeID, conn, conn, m) + okln("Connected to node", remoteID, "(out)") + m.AddNode(nc) + if traceNet { + t0 := time.Now() + nc.Ping() + timing("NET: Ping reply", t0) + } + continue nextNode + } + } + + time.Sleep(time.Duration(connIntv) * time.Second) + } +} + +func updateLocalModel(m *Model) { + files := Walk(m.Dir(), m) + m.ReplaceLocal(files) + saveIndex(m) +} + +func saveIndex(m *Model) { + fname := fmt.Sprintf("%x.idx", sha1.Sum([]byte(m.Dir()))) + idxf, err := os.Create(path.Join(confDir, fname)) + if err != nil { + return + } + protocol.WriteIndex(idxf, m.ProtocolIndex()) + idxf.Close() +} + +func loadIndex(m *Model) { + fname := fmt.Sprintf("%x.idx", sha1.Sum([]byte(m.Dir()))) + idxf, err := os.Open(path.Join(confDir, fname)) + if err != nil { + return + } + defer idxf.Close() + + idx, err := protocol.ReadIndex(idxf) + if err != nil { + return + } + m.SeedIndex(idx) +} + +func ensureDir(dir string) { + fi, err := os.Stat(dir) + if os.IsNotExist(err) { + err := os.MkdirAll(dir, 0700) + fatalErr(err) + } else if fi.Mode()&0077 != 0 { + err := os.Chmod(dir, 0700) + fatalErr(err) + } +} + +func getHomeDir() string { + home := os.Getenv("HOME") + if home == "" { + fatalln("No home directory?") + } + return home +} diff --git a/model.go b/model.go new file mode 100644 index 000000000..59057a0c5 --- /dev/null +++ b/model.go @@ -0,0 +1,373 @@ +package main + +/* + +Locking +======= + +The model has read and write locks. These must be acquired as appropriate by +public methods. To prevent deadlock situations, private methods should never +acquire locks, but document what locks they require. + +TODO(jb): Keep global and per node transfer and performance statistics. + +*/ + +import ( + "os" + "path" + "sync" + "time" + + "github.com/calmh/syncthing/buffers" + "github.com/calmh/syncthing/protocol" +) + +type Model struct { + sync.RWMutex + dir string + updated int64 + global map[string]File // the latest version of each file as it exists in the cluster + local map[string]File // the files we currently have locally on disk + remote map[string]map[string]File + need map[string]bool // the files we need to update + nodes map[string]*protocol.Connection +} + +const ( + RemoteFetchers = 4 + FlagDeleted = 1 << 12 +) + +func NewModel(dir string) *Model { + m := &Model{ + dir: dir, + global: make(map[string]File), + local: make(map[string]File), + remote: make(map[string]map[string]File), + need: make(map[string]bool), + nodes: make(map[string]*protocol.Connection), + } + + return m +} + +func (m *Model) Start() { + go m.puller() +} + +func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { + m.Lock() + defer m.Unlock() + + if traceNet { + debugf("NET IDX(in): %s: %d files", nodeID, len(fs)) + } + + m.remote[nodeID] = make(map[string]File) + for _, f := range fs { + if f.Flags&FlagDeleted != 0 && !doDelete { + // Files marked as deleted do not even enter the model + continue + } + mf := File{ + Name: f.Name, + Flags: f.Flags, + Modified: int64(f.Modified), + } + var offset uint64 + for _, b := range f.Blocks { + mf.Blocks = append(mf.Blocks, Block{ + Offset: offset, + Length: b.Length, + Hash: b.Hash, + }) + offset += uint64(b.Length) + } + m.remote[nodeID][f.Name] = mf + } + + m.recomputeGlobal() + m.recomputeNeed() +} + +func (m *Model) SeedIndex(fs []protocol.FileInfo) { + m.Lock() + defer m.Unlock() + + m.local = make(map[string]File) + for _, f := range fs { + mf := File{ + Name: f.Name, + Flags: f.Flags, + Modified: int64(f.Modified), + } + var offset uint64 + for _, b := range f.Blocks { + mf.Blocks = append(mf.Blocks, Block{ + Offset: offset, + Length: b.Length, + Hash: b.Hash, + }) + offset += uint64(b.Length) + } + m.local[f.Name] = mf + } + + m.recomputeGlobal() + m.recomputeNeed() +} + +func (m *Model) Close(node string) { + m.Lock() + defer m.Unlock() + + if traceNet { + debugf("NET CLOSE: %s", node) + } + + delete(m.remote, node) + delete(m.nodes, node) + + m.recomputeGlobal() + m.recomputeNeed() +} + +func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) { + if traceNet && nodeID != "" { + debugf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) + } + fn := path.Join(m.dir, name) + fd, err := os.Open(fn) // XXX: Inefficient, should cache fd? + if err != nil { + return nil, err + } + defer fd.Close() + + buf := buffers.Get(int(size)) + _, err = fd.ReadAt(buf, int64(offset)) + if err != nil { + return nil, err + } + + return buf, nil +} + +func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) { + m.RLock() + nc := m.nodes[nodeID] + m.RUnlock() + + if traceNet { + debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) + } + + return nc.Request(name, offset, size, hash) +} + +func (m *Model) ReplaceLocal(fs []File) { + m.Lock() + defer m.Unlock() + + var updated bool + var newLocal = make(map[string]File) + + for _, f := range fs { + newLocal[f.Name] = f + if ef := m.local[f.Name]; ef.Modified != f.Modified { + updated = true + } + } + + if m.markDeletedLocals(newLocal) { + updated = true + } + + if len(newLocal) != len(m.local) { + updated = true + } + + if updated { + m.local = newLocal + m.recomputeGlobal() + m.recomputeNeed() + m.updated = time.Now().Unix() + go m.broadcastIndex() + } +} + +// Must be called with the read lock held. +func (m *Model) broadcastIndex() { + idx := m.protocolIndex() + for _, node := range m.nodes { + if traceNet { + debugf("NET IDX(out): %s: %d files", node.ID, len(idx)) + } + node.Index(idx) + } +} + +// markDeletedLocals sets the deleted flag on files that have gone missing locally. +// Must be called with the write lock held. +func (m *Model) markDeletedLocals(newLocal map[string]File) bool { + // For every file in the existing local table, check if they are also + // present in the new local table. If they are not, check that we already + // had the newest version available according to the global table and if so + // note the file as having been deleted. + var updated bool + for n, f := range m.local { + if _, ok := newLocal[n]; !ok { + if gf := m.global[n]; gf.Modified <= f.Modified { + if f.Flags&FlagDeleted == 0 { + f.Flags = FlagDeleted + f.Modified = f.Modified + 1 + f.Blocks = nil + updated = true + } + newLocal[n] = f + } + } + } + return updated +} + +func (m *Model) UpdateLocal(f File) { + m.Lock() + defer m.Unlock() + + if ef, ok := m.local[f.Name]; !ok || ef.Modified != f.Modified { + m.local[f.Name] = f + m.recomputeGlobal() + m.recomputeNeed() + m.updated = time.Now().Unix() + go m.broadcastIndex() + } +} + +func (m *Model) Dir() string { + m.RLock() + defer m.RUnlock() + return m.dir +} + +func (m *Model) HaveFiles() []File { + m.RLock() + defer m.RUnlock() + var files []File + for _, file := range m.local { + files = append(files, file) + } + return files +} + +func (m *Model) LocalFile(name string) (File, bool) { + m.RLock() + defer m.RUnlock() + f, ok := m.local[name] + return f, ok +} + +func (m *Model) GlobalFile(name string) (File, bool) { + m.RLock() + defer m.RUnlock() + f, ok := m.global[name] + return f, ok +} + +// Must be called with the write lock held. +func (m *Model) recomputeGlobal() { + var newGlobal = make(map[string]File) + + for n, f := range m.local { + newGlobal[n] = f + } + + for _, fs := range m.remote { + for n, f := range fs { + if cf, ok := newGlobal[n]; !ok || cf.Modified < f.Modified { + newGlobal[n] = f + } + } + } + + m.global = newGlobal +} + +// Must be called with the write lock held. +func (m *Model) recomputeNeed() { + m.need = make(map[string]bool) + for n, f := range m.global { + hf, ok := m.local[n] + if !ok || f.Modified > hf.Modified { + m.need[n] = true + } + } +} + +// Must be called with the read lock held. +func (m *Model) whoHas(name string) []string { + var remote []string + + gf := m.global[name] + for node, files := range m.remote { + if file, ok := files[name]; ok && file.Modified == gf.Modified { + remote = append(remote, node) + } + } + + return remote +} + +func (m *Model) ConnectedTo(nodeID string) bool { + m.RLock() + defer m.RUnlock() + _, ok := m.nodes[nodeID] + return ok +} + +func (m *Model) ProtocolIndex() []protocol.FileInfo { + m.RLock() + defer m.RUnlock() + return m.protocolIndex() +} + +// Must be called with the read lock held. +func (m *Model) protocolIndex() []protocol.FileInfo { + var index []protocol.FileInfo + for _, f := range m.local { + mf := protocol.FileInfo{ + Name: f.Name, + Flags: f.Flags, + Modified: int64(f.Modified), + } + for _, b := range f.Blocks { + mf.Blocks = append(mf.Blocks, protocol.BlockInfo{ + Length: b.Length, + Hash: b.Hash, + }) + } + if traceIdx { + var flagComment string + if mf.Flags&FlagDeleted != 0 { + flagComment = " (deleted)" + } + debugf("IDX: %q m=%d f=%o%s (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, len(mf.Blocks)) + } + index = append(index, mf) + } + return index +} + +func (m *Model) AddNode(node *protocol.Connection) { + m.Lock() + m.nodes[node.ID] = node + m.Unlock() + m.RLock() + idx := m.protocolIndex() + m.RUnlock() + + if traceNet { + debugf("NET IDX(out): %s: %d files", node.ID, len(idx)) + } + node.Index(idx) +} diff --git a/model_puller.go b/model_puller.go new file mode 100644 index 000000000..9b726080a --- /dev/null +++ b/model_puller.go @@ -0,0 +1,212 @@ +package main + +/* + +Locking +======= + +These methods are never called from the outside so don't follow the locking +policy in model.go. Instead, appropriate locks are acquired when needed and +held for as short a time as possible. + +TODO(jb): Refactor this into smaller and cleaner pieces. + +TODO(jb): Some kind of coalescing / rate limiting of index sending, so we don't +send hundreds of index updates in a short period if time when deleting files +etc. + +*/ + +import ( + "bytes" + "fmt" + "io" + "os" + "path" + "sync" + "time" + + "github.com/calmh/syncthing/buffers" +) + +func (m *Model) pullFile(name string) error { + m.RLock() + var localFile = m.local[name] + var globalFile = m.global[name] + m.RUnlock() + + filename := path.Join(m.dir, name) + sdir := path.Dir(filename) + + _, err := os.Stat(sdir) + if err != nil && os.IsNotExist(err) { + os.MkdirAll(sdir, 0777) + } + + tmpFilename := tempName(filename, globalFile.Modified) + tmpFile, err := os.Create(tmpFilename) + if err != nil { + return err + } + defer tmpFile.Close() + + contentChan := make(chan content, 32) + var applyDone sync.WaitGroup + applyDone.Add(1) + go func() { + applyContent(contentChan, tmpFile) + applyDone.Done() + }() + + local, remote := localFile.Blocks.To(globalFile.Blocks) + var fetchDone sync.WaitGroup + + // One local copy routing + + fetchDone.Add(1) + go func() { + for _, block := range local { + data, err := m.Request("", name, block.Offset, block.Length, block.Hash) + if err != nil { + break + } + contentChan <- content{ + offset: int64(block.Offset), + data: data, + } + } + fetchDone.Done() + }() + + // N remote copy routines + + m.RLock() + var nodeIDs = m.whoHas(name) + m.RUnlock() + var remoteBlocksChan = make(chan Block) + go func() { + for _, block := range remote { + remoteBlocksChan <- block + } + close(remoteBlocksChan) + }() + + // XXX: This should be rewritten into something nicer that takes differing + // peer performance into account. + + for i := 0; i < RemoteFetchers; i++ { + for _, nodeID := range nodeIDs { + fetchDone.Add(1) + go func(nodeID string) { + for block := range remoteBlocksChan { + data, err := m.RequestGlobal(nodeID, name, block.Offset, block.Length, block.Hash) + if err != nil { + break + } + contentChan <- content{ + offset: int64(block.Offset), + data: data, + } + } + fetchDone.Done() + }(nodeID) + } + } + + fetchDone.Wait() + close(contentChan) + applyDone.Wait() + + rf, err := os.Open(tmpFilename) + if err != nil { + return err + } + defer rf.Close() + + writtenBlocks, err := Blocks(rf, BlockSize) + if err != nil { + return err + } + if len(writtenBlocks) != len(globalFile.Blocks) { + return fmt.Errorf("%s: incorrect number of blocks after sync", tmpFilename) + } + for i := range writtenBlocks { + if bytes.Compare(writtenBlocks[i].Hash, globalFile.Blocks[i].Hash) != 0 { + return fmt.Errorf("%s: hash mismatch after sync\n %v\n %v", tmpFilename, writtenBlocks[i], globalFile.Blocks[i]) + } + } + + err = os.Chtimes(tmpFilename, time.Unix(globalFile.Modified, 0), time.Unix(globalFile.Modified, 0)) + if err != nil { + return err + } + + err = os.Rename(tmpFilename, filename) + if err != nil { + return err + } + + return nil +} + +func (m *Model) puller() { + for { + for { + var n string + var f File + + m.RLock() + for n = range m.need { + break // just pick first name + } + if len(n) != 0 { + f = m.global[n] + } + m.RUnlock() + + if len(n) == 0 { + // we got nothing + break + } + + var err error + if f.Flags&FlagDeleted == 0 { + if traceFile { + debugf("FILE: Pull %q", n) + } + err = m.pullFile(n) + } else { + if traceFile { + debugf("FILE: Remove %q", n) + } + // Cheerfully ignore errors here + _ = os.Remove(path.Join(m.dir, n)) + } + if err == nil { + m.UpdateLocal(f) + } else { + warnln(err) + } + } + time.Sleep(time.Second) + } +} + +type content struct { + offset int64 + data []byte +} + +func applyContent(cc <-chan content, dst io.WriterAt) error { + var err error + + for c := range cc { + _, err = dst.WriteAt(c.data, c.offset) + if err != nil { + return err + } + buffers.Put(c.data) + } + + return nil +} diff --git a/model_test.go b/model_test.go new file mode 100644 index 000000000..fe2a5c38e --- /dev/null +++ b/model_test.go @@ -0,0 +1,275 @@ +package main + +import ( + "reflect" + "testing" + "time" + + "github.com/calmh/syncthing/protocol" +) + +func TestNewModel(t *testing.T) { + m := NewModel("foo") + + if m == nil { + t.Fatalf("NewModel returned nil") + } + + if len(m.need) > 0 { + t.Errorf("New model should have no Need") + } + + if len(m.local) > 0 { + t.Errorf("New model should have no Have") + } +} + +var testDataExpected = map[string]File{ + "foo": File{ + Name: "foo", + Flags: 0644, + Modified: 1384244572, + Blocks: []Block{{Offset: 0x0, Length: 0x7, Hash: []uint8{0xae, 0xc0, 0x70, 0x64, 0x5f, 0xe5, 0x3e, 0xe3, 0xb3, 0x76, 0x30, 0x59, 0x37, 0x61, 0x34, 0xf0, 0x58, 0xcc, 0x33, 0x72, 0x47, 0xc9, 0x78, 0xad, 0xd1, 0x78, 0xb6, 0xcc, 0xdf, 0xb0, 0x1, 0x9f}}}, + }, + "bar": File{ + Name: "bar", + Flags: 0644, + Modified: 1384244579, + Blocks: []Block{{Offset: 0x0, Length: 0xa, Hash: []uint8{0x2f, 0x72, 0xcc, 0x11, 0xa6, 0xfc, 0xd0, 0x27, 0x1e, 0xce, 0xf8, 0xc6, 0x10, 0x56, 0xee, 0x1e, 0xb1, 0x24, 0x3b, 0xe3, 0x80, 0x5b, 0xf9, 0xa9, 0xdf, 0x98, 0xf9, 0x2f, 0x76, 0x36, 0xb0, 0x5c}}}, + }, + "baz/quux": File{ + Name: "baz/quux", + Flags: 0644, + Modified: 1384244676, + Blocks: []Block{{Offset: 0x0, Length: 0x9, Hash: []uint8{0xc1, 0x54, 0xd9, 0x4e, 0x94, 0xba, 0x72, 0x98, 0xa6, 0xad, 0xb0, 0x52, 0x3a, 0xfe, 0x34, 0xd1, 0xb6, 0xa5, 0x81, 0xd6, 0xb8, 0x93, 0xa7, 0x63, 0xd4, 0x5d, 0xdc, 0x5e, 0x20, 0x9d, 0xcb, 0x83}}}, + }, +} + +func TestUpdateLocal(t *testing.T) { + m := NewModel("foo") + fs := Walk("testdata", m) + m.ReplaceLocal(fs) + + if len(m.need) > 0 { + t.Fatalf("Model with only local data should have no need") + } + + if l1, l2 := len(m.local), len(testDataExpected); l1 != l2 { + t.Fatalf("Model len(local) incorrect, %d != %d", l1, l2) + } + if l1, l2 := len(m.global), len(testDataExpected); l1 != l2 { + t.Fatalf("Model len(global) incorrect, %d != %d", l1, l2) + } + for name, file := range testDataExpected { + if f, ok := m.local[name]; ok { + if !reflect.DeepEqual(f, file) { + t.Errorf("Incorrect local\n%v !=\n%v\nfor file %q", f, file, name) + } + } else { + t.Errorf("Missing file %q in local table", name) + } + if f, ok := m.global[name]; ok { + if !reflect.DeepEqual(f, file) { + t.Errorf("Incorrect global\n%v !=\n%v\nfor file %q", f, file, name) + } + } else { + t.Errorf("Missing file %q in global table", name) + } + } + + for _, f := range fs { + if hf, ok := m.local[f.Name]; !ok || hf.Modified != f.Modified { + t.Fatalf("Incorrect local for %q", f.Name) + } + if cf, ok := m.global[f.Name]; !ok || cf.Modified != f.Modified { + t.Fatalf("Incorrect global for %q", f.Name) + } + } +} + +func TestRemoteUpdateExisting(t *testing.T) { + m := NewModel("foo") + fs := Walk("testdata", m) + m.ReplaceLocal(fs) + + newFile := protocol.FileInfo{ + Name: "foo", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + m.Index(string("42"), []protocol.FileInfo{newFile}) + + if l := len(m.need); l != 1 { + t.Errorf("Model missing Need for one file (%d != 1)", l) + } +} + +func TestRemoteAddNew(t *testing.T) { + m := NewModel("foo") + fs := Walk("testdata", m) + m.ReplaceLocal(fs) + + newFile := protocol.FileInfo{ + Name: "a new file", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + m.Index(string("42"), []protocol.FileInfo{newFile}) + + if l1, l2 := len(m.need), 1; l1 != l2 { + t.Errorf("Model len(m.need) incorrect (%d != %d)", l1, l2) + } +} + +func TestRemoteUpdateOld(t *testing.T) { + m := NewModel("foo") + fs := Walk("testdata", m) + m.ReplaceLocal(fs) + + oldTimeStamp := int64(1234) + newFile := protocol.FileInfo{ + Name: "foo", + Modified: oldTimeStamp, + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + m.Index(string("42"), []protocol.FileInfo{newFile}) + + if l1, l2 := len(m.need), 0; l1 != l2 { + t.Errorf("Model len(need) incorrect (%d != %d)", l1, l2) + } +} + +func TestDelete(t *testing.T) { + m := NewModel("foo") + fs := Walk("testdata", m) + m.ReplaceLocal(fs) + + if l1, l2 := len(m.local), len(fs); l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs); l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + + ot := time.Now().Unix() + newFile := File{ + Name: "a new file", + Modified: ot, + Blocks: []Block{{0, 100, []byte("some hash bytes")}}, + } + m.UpdateLocal(newFile) + + if l1, l2 := len(m.local), len(fs)+1; l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs)+1; l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + + // The deleted file is kept in the local and global tables and marked as deleted. + + m.ReplaceLocal(fs) + + if l1, l2 := len(m.local), len(fs)+1; l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs)+1; l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + + if m.local["a new file"].Flags&(1<<12) == 0 { + t.Error("Unexpected deleted flag = 0 in local table") + } + if len(m.local["a new file"].Blocks) != 0 { + t.Error("Unexpected non-zero blocks for deleted file in local") + } + if ft := m.local["a new file"].Modified; ft != ot+1 { + t.Errorf("Unexpected time %d != %d for deleted file in local", ft, ot+1) + } + + if m.global["a new file"].Flags&(1<<12) == 0 { + t.Error("Unexpected deleted flag = 0 in global table") + } + if len(m.global["a new file"].Blocks) != 0 { + t.Error("Unexpected non-zero blocks for deleted file in global") + } + if ft := m.local["a new file"].Modified; ft != ot+1 { + t.Errorf("Unexpected time %d != %d for deleted file in local", ft, ot+1) + } + + // Another update should change nothing + + m.ReplaceLocal(fs) + + if l1, l2 := len(m.local), len(fs)+1; l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs)+1; l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + + if m.local["a new file"].Flags&(1<<12) == 0 { + t.Error("Unexpected deleted flag = 0 in local table") + } + if len(m.local["a new file"].Blocks) != 0 { + t.Error("Unexpected non-zero blocks for deleted file in local") + } + if ft := m.local["a new file"].Modified; ft != ot+1 { + t.Errorf("Unexpected time %d != %d for deleted file in local", ft, ot+1) + } + + if m.global["a new file"].Flags&(1<<12) == 0 { + t.Error("Unexpected deleted flag = 0 in global table") + } + if len(m.global["a new file"].Blocks) != 0 { + t.Error("Unexpected non-zero blocks for deleted file in global") + } + if ft := m.local["a new file"].Modified; ft != ot+1 { + t.Errorf("Unexpected time %d != %d for deleted file in local", ft, ot+1) + } +} + +func TestForgetNode(t *testing.T) { + m := NewModel("foo") + fs := Walk("testdata", m) + m.ReplaceLocal(fs) + + if l1, l2 := len(m.local), len(fs); l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs); l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.need), 0; l1 != l2 { + t.Errorf("Model len(need) incorrect (%d != %d)", l1, l2) + } + + newFile := protocol.FileInfo{ + Name: "new file", + Modified: time.Now().Unix(), + Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}}, + } + m.Index(string("42"), []protocol.FileInfo{newFile}) + + if l1, l2 := len(m.local), len(fs); l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs)+1; l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.need), 1; l1 != l2 { + t.Errorf("Model len(need) incorrect (%d != %d)", l1, l2) + } + + m.Close(string("42")) + + if l1, l2 := len(m.local), len(fs); l1 != l2 { + t.Errorf("Model len(local) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.global), len(fs); l1 != l2 { + t.Errorf("Model len(global) incorrect (%d != %d)", l1, l2) + } + if l1, l2 := len(m.need), 0; l1 != l2 { + t.Errorf("Model len(need) incorrect (%d != %d)", l1, l2) + } +} diff --git a/protocol/PROTOCOL.md b/protocol/PROTOCOL.md new file mode 100644 index 000000000..c8bc19198 --- /dev/null +++ b/protocol/PROTOCOL.md @@ -0,0 +1,208 @@ +Block Exchange Protocol v1.0 +============================ + +Introduction and Definitions +---------------------------- + +The BEP is used between two or more _nodes_ thus forming a _cluster_. +Each node has a _repository_ of files described by the _local model_, +containing modifications times and block hashes. The local model is sent +to the other nodes in the cluster. The union of all files in the local +models, with files selected for most recent modification time, forms the +_global model_. Each node strives to get it's repository in synch with +the global model by requesting missing blocks from the other nodes. + +Transport and Authentication +---------------------------- + +The BEP itself does not provide retransmissions, compression, encryption +nor authentication. It is expected that this is performed at lower +layers of the networking stack. A typical deployment stack should be +similar to the following: + + |-----------------------------| + | Block Exchange Protocol | + |-----------------------------| + | Compression (RFC 1951) | + |-----------------------------| + | Encryption & Auth (TLS 1.0) | + |-----------------------------| + | TCP | + |-----------------------------| + v v + +The exact nature of the authentication is up to the application. +Possibilities include certificates signed by a common trusted CA, +preshared certificates, preshared certificate fingerprints or +certificate pinning combined with some out of band first verification. + +There is no required order or synchronization among BEP messages - any +message type may be sent at any time and the sender need not await a +response to one message before sending another. Responses must however +be sent in the same order as the requests are received. + +Compression is started directly after a successfull TLS handshake, +before the first message is sent. The compression is flushed at each +message boundary. + +Messages +-------- + +Every message starts with one 32 bit word indicating the message version +and type. For BEP v1.0 the Version field is set to zero. Future versions +with incompatible message formats will increment the Version field. The +reserved bits must be set to zero. + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Ver=0 | Message ID | Type | Reserved | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +All data following the message header is in XDR (RFC 1014) encoding. +The actual data types in use by BEP, in XDR naming convention, are: + + - unsigned int -- unsigned 32 bit integer + - hyper -- signed 64 bit integer + - unsigned hyper -- signed 64 bit integer + - opaque<> -- variable length opaque data + - string<> -- variable length string + +The encoding of opaque<> and string<> are identical, the distinction is +solely in interpretation. Opaque data should not be interpreted as such, +but can be compared bytewise to other opaque data. All strings use the +UTF-8 encoding. + +### Index (Type = 1) + +The Index message defines the contents of the senders repository. A Index +message is sent by each peer immediately upon connection and whenever the +local repository contents changes. However, if a peer has no data to +advertise (the repository is empty, or it is set to only import data) it +is allowed but not required to send an empty Index message (a file list of +zero length). If the repository contents change from non-empty to empty, +an empty Index message must be sent. There is no response to the Index +message. + + struct IndexMessage { + FileInfo Files<>; + } + + struct FileInfo { + string Name<>; + unsigned int Flags; + hyper Modified; + BlockInfo Blocks<>; + } + + struct BlockInfo { + unsigned int Length; + opaque Hash<> + } + +The file name is the part relative to the repository root. The +modification time is expressed as the number of seconds since the Unix +Epoch. The hash algorithm is implied by the hash length. Currently, the +hash must be 32 bytes long and computed by SHA256. + +The flags field is made up of the following single bit flags: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Reserved |D| Unix Perm. & Mode | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + - The lower 12 bits hold the common Unix permission and mode bits. + + - Bit 19 ("D") is set when the file has been deleted. The block list + shall contain zero blocks and the modification time indicates the + time of deletion or, if deletion time is not reliably determinable, + one second past the last know modification time. + + - Bit 0 through 18 are reserved for future use and shall be set to + zero. + +### Request (Type = 2) + +The Request message expresses the desire to receive a data block +corresponding to a part of a certain file in the peer's repository. + +The requested block must correspond exactly to one block seen in the +peer's Index message. The hash field must be set to the expected value by +the sender. The receiver may validate that this is actually the case +before transmitting data. Each Request message must be met with a Response +message. + + struct RequestMessage { + string Name<>; + unsigned hyper Offset; + unsigned int Length; + opaque Hash<>; + } + +The hash algorithm is implied by the hash length. Currently, the hash +must be 32 bytes long and computed by SHA256. + +The Message ID in the header must set to a unique value to be able to +correlate the request with the response message. + +### Response (Type = 3) + +The Response message is sent in response to a Request message. In case the +requested data was not available (an outdated block was requested, or +the file has been deleted), the Data field is empty. + + struct ResponseMessage { + opaque Data<> + } + +The Message ID in the header is used to correlate requests and +responses. + +### Ping (Type = 4) + +The Ping message is used to determine that a connection is alive, and to +keep connections alive through state tracking network elements such as +firewalls and NAT gateways. The Ping message has no contents. + + struct PingMessage { + } + +### Pong (Type = 5) + +The Pong message is sent in response to a Ping. The Pong message has no +contents, but copies the Message ID from the Ping. + + struct PongMessage { + } + +Example Exchange +---------------- + + A B + 1. Index-> <-Index + 2. Request-> + 3. Request-> + 4. Request-> + 5. Request-> + 6. <-Response + 7. <-Response + 8. <-Response + 9. <-Response + 10. Index-> + ... + 11. Ping-> + 12. <-Pong + +The connection is established and at 1. both peers send Index records. +The Index records are received and both peers recompute their knowledge +of the data in the cluster. In this example, peer A has four missing or +outdated blocks. At 2 through 5 peer A sends requests for these blocks. +The requests are received by peer B, who retrieves the data from the +repository and transmits Response records (6 through 9). Node A updates +their repository contents and transmits an updated Index message (10). +Both peers enter idle state after 10. At some later time 11, peer A +determines that it has not seen data from B for some time and sends a +Ping request. A response is sent at 12. + diff --git a/protocol/marshal.go b/protocol/marshal.go new file mode 100644 index 000000000..decd2c136 --- /dev/null +++ b/protocol/marshal.go @@ -0,0 +1,119 @@ +package protocol + +import ( + "io" + + "github.com/calmh/syncthing/buffers" +) + +func pad(l int) int { + d := l % 4 + if d == 0 { + return 0 + } + return 4 - d +} + +var padBytes = []byte{0, 0, 0} + +type marshalWriter struct { + w io.Writer + tot int + err error +} + +func (w *marshalWriter) writeString(s string) { + w.writeBytes([]byte(s)) +} + +func (w *marshalWriter) writeBytes(bs []byte) { + if w.err != nil { + return + } + w.writeUint32(uint32(len(bs))) + if w.err != nil { + return + } + _, w.err = w.w.Write(bs) + if p := pad(len(bs)); p > 0 { + w.w.Write(padBytes[:p]) + } + w.tot += len(bs) + pad(len(bs)) +} + +func (w *marshalWriter) writeUint32(v uint32) { + if w.err != nil { + return + } + var b [4]byte + b[0] = byte(v >> 24) + b[1] = byte(v >> 16) + b[2] = byte(v >> 8) + b[3] = byte(v) + _, w.err = w.w.Write(b[:]) + w.tot += 4 +} + +func (w *marshalWriter) writeUint64(v uint64) { + if w.err != nil { + return + } + var b [8]byte + b[0] = byte(v >> 56) + b[1] = byte(v >> 48) + b[2] = byte(v >> 40) + b[3] = byte(v >> 32) + b[4] = byte(v >> 24) + b[5] = byte(v >> 16) + b[6] = byte(v >> 8) + b[7] = byte(v) + _, w.err = w.w.Write(b[:]) + w.tot += 8 +} + +type marshalReader struct { + r io.Reader + tot int + err error +} + +func (r *marshalReader) readString() string { + bs := r.readBytes() + defer buffers.Put(bs) + return string(bs) +} + +func (r *marshalReader) readBytes() []byte { + if r.err != nil { + return nil + } + l := int(r.readUint32()) + if r.err != nil { + return nil + } + b := buffers.Get(l + pad(l)) + _, r.err = io.ReadFull(r.r, b) + r.tot += int(l + pad(l)) + return b[:l] +} + +func (r *marshalReader) readUint32() uint32 { + if r.err != nil { + return 0 + } + var b [4]byte + _, r.err = io.ReadFull(r.r, b[:]) + r.tot += 4 + return uint32(b[3]) | uint32(b[2])<<8 | uint32(b[1])<<16 | uint32(b[0])<<24 +} + +func (r *marshalReader) readUint64() uint64 { + if r.err != nil { + return 0 + } + var b [8]byte + _, r.err = io.ReadFull(r.r, b[:]) + r.tot += 8 + return uint64(b[7]) | uint64(b[6])<<8 | uint64(b[5])<<16 | uint64(b[4])<<24 | + uint64(b[3])<<32 | uint64(b[2])<<40 | uint64(b[1])<<48 | uint64(b[0])<<56 +} diff --git a/protocol/messages.go b/protocol/messages.go new file mode 100644 index 000000000..591f7f4de --- /dev/null +++ b/protocol/messages.go @@ -0,0 +1,106 @@ +package protocol + +import "io" + +type request struct { + name string + offset uint64 + size uint32 + hash []byte +} + +type header struct { + version int + msgID int + msgType int +} + +func encodeHeader(h header) uint32 { + return uint32(h.version&0xf)<<28 + + uint32(h.msgID&0xfff)<<16 + + uint32(h.msgType&0xff)<<8 +} + +func decodeHeader(u uint32) header { + return header{ + version: int(u>>28) & 0xf, + msgID: int(u>>16) & 0xfff, + msgType: int(u>>8) & 0xff, + } +} + +func (w *marshalWriter) writeHeader(h header) { + w.writeUint32(encodeHeader(h)) +} + +func (w *marshalWriter) writeIndex(idx []FileInfo) { + w.writeUint32(uint32(len(idx))) + for _, f := range idx { + w.writeString(f.Name) + w.writeUint32(f.Flags) + w.writeUint64(uint64(f.Modified)) + w.writeUint32(uint32(len(f.Blocks))) + for _, b := range f.Blocks { + w.writeUint32(b.Length) + w.writeBytes(b.Hash) + } + } +} + +func WriteIndex(w io.Writer, idx []FileInfo) (int, error) { + mw := marshalWriter{w, 0, nil} + mw.writeIndex(idx) + return mw.tot, mw.err +} + +func (w *marshalWriter) writeRequest(r request) { + w.writeString(r.name) + w.writeUint64(r.offset) + w.writeUint32(r.size) + w.writeBytes(r.hash) +} + +func (w *marshalWriter) writeResponse(data []byte) { + w.writeBytes(data) +} + +func (r *marshalReader) readHeader() header { + return decodeHeader(r.readUint32()) +} + +func (r *marshalReader) readIndex() []FileInfo { + nfiles := r.readUint32() + files := make([]FileInfo, nfiles) + for i := range files { + files[i].Name = r.readString() + files[i].Flags = r.readUint32() + files[i].Modified = int64(r.readUint64()) + nblocks := r.readUint32() + blocks := make([]BlockInfo, nblocks) + for j := range blocks { + blocks[j].Length = r.readUint32() + blocks[j].Hash = r.readBytes() + } + files[i].Blocks = blocks + } + return files +} + +func ReadIndex(r io.Reader) ([]FileInfo, error) { + mr := marshalReader{r, 0, nil} + idx := mr.readIndex() + return idx, mr.err +} + +func (r *marshalReader) readRequest() request { + var req request + req.name = r.readString() + req.offset = r.readUint64() + req.size = r.readUint32() + req.hash = r.readBytes() + return req +} + +func (r *marshalReader) readResponse() []byte { + return r.readBytes() +} diff --git a/protocol/messages_test.go b/protocol/messages_test.go new file mode 100644 index 000000000..ebd291f4b --- /dev/null +++ b/protocol/messages_test.go @@ -0,0 +1,115 @@ +package protocol + +import ( + "bytes" + "io/ioutil" + "reflect" + "testing" + "testing/quick" +) + +func TestIndex(t *testing.T) { + idx := []FileInfo{ + { + "Foo", + 0755, + 1234567890, + []BlockInfo{ + {12345678, []byte("hash hash hash")}, + {23456781, []byte("ash hash hashh")}, + {34567812, []byte("sh hash hashha")}, + }, + }, { + "Quux/Quux", + 0644, + 2345678901, + []BlockInfo{ + {45678123, []byte("4321 hash hash hash")}, + {56781234, []byte("3214 ash hash hashh")}, + {67812345, []byte("2143 sh hash hashha")}, + }, + }, + } + + var buf = new(bytes.Buffer) + var wr = marshalWriter{buf, 0, nil} + wr.writeIndex(idx) + + var rd = marshalReader{buf, 0, nil} + var idx2 = rd.readIndex() + + if !reflect.DeepEqual(idx, idx2) { + t.Errorf("Index marshal error:\n%#v\n%#v\n", idx, idx2) + } +} + +func TestRequest(t *testing.T) { + f := func(name string, offset uint64, size uint32, hash []byte) bool { + var buf = new(bytes.Buffer) + var req = request{name, offset, size, hash} + var wr = marshalWriter{buf, 0, nil} + wr.writeRequest(req) + var rd = marshalReader{buf, 0, nil} + var req2 = rd.readRequest() + return req.name == req2.name && + req.offset == req2.offset && + req.size == req2.size && + bytes.Compare(req.hash, req2.hash) == 0 + } + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +} + +func TestResponse(t *testing.T) { + f := func(data []byte) bool { + var buf = new(bytes.Buffer) + var wr = marshalWriter{buf, 0, nil} + wr.writeResponse(data) + var rd = marshalReader{buf, 0, nil} + var read = rd.readResponse() + return bytes.Compare(read, data) == 0 + } + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +} + +func BenchmarkWriteIndex(b *testing.B) { + idx := []FileInfo{ + { + "Foo", + 0777, + 1234567890, + []BlockInfo{ + {12345678, []byte("hash hash hash")}, + {23456781, []byte("ash hash hashh")}, + {34567812, []byte("sh hash hashha")}, + }, + }, { + "Quux/Quux", + 0644, + 2345678901, + []BlockInfo{ + {45678123, []byte("4321 hash hash hash")}, + {56781234, []byte("3214 ash hash hashh")}, + {67812345, []byte("2143 sh hash hashha")}, + }, + }, + } + + var wr = marshalWriter{ioutil.Discard, 0, nil} + + for i := 0; i < b.N; i++ { + wr.writeIndex(idx) + } +} + +func BenchmarkWriteRequest(b *testing.B) { + var req = request{"blah blah", 1231323, 13123123, []byte("hash hash hash")} + var wr = marshalWriter{ioutil.Discard, 0, nil} + + for i := 0; i < b.N; i++ { + wr.writeRequest(req) + } +} diff --git a/protocol/protocol.go b/protocol/protocol.go new file mode 100644 index 000000000..7eca8428a --- /dev/null +++ b/protocol/protocol.go @@ -0,0 +1,239 @@ +package protocol + +import ( + "compress/flate" + "errors" + "io" + "sync" + + "github.com/calmh/syncthing/buffers" +) + +const ( + messageTypeReserved = iota + messageTypeIndex + messageTypeRequest + messageTypeResponse + messageTypePing + messageTypePong +) + +var ErrClosed = errors.New("Connection closed") + +type FileInfo struct { + Name string + Flags uint32 + Modified int64 + Blocks []BlockInfo +} + +type BlockInfo struct { + Length uint32 + Hash []byte +} + +type Model interface { + // An index was received from the peer node + Index(nodeID string, files []FileInfo) + // A request was made by the peer node + Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) + // The peer node closed the connection + Close(nodeID string) +} + +type Connection struct { + receiver Model + reader io.Reader + mreader *marshalReader + writer io.Writer + mwriter *marshalWriter + wLock sync.RWMutex + closed bool + closedLock sync.RWMutex + awaiting map[int]chan interface{} + nextId int + ID string +} + +func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) *Connection { + flrd := flate.NewReader(reader) + flwr, err := flate.NewWriter(writer, flate.BestSpeed) + if err != nil { + panic(err) + } + + c := Connection{ + receiver: receiver, + reader: flrd, + mreader: &marshalReader{flrd, 0, nil}, + writer: flwr, + mwriter: &marshalWriter{flwr, 0, nil}, + awaiting: make(map[int]chan interface{}), + ID: nodeID, + } + + go c.readerLoop() + + return &c +} + +// Index writes the list of file information to the connected peer node +func (c *Connection) Index(idx []FileInfo) { + c.wLock.Lock() + defer c.wLock.Unlock() + + c.mwriter.writeHeader(header{0, c.nextId, messageTypeIndex}) + c.nextId = (c.nextId + 1) & 0xfff + c.mwriter.writeIndex(idx) + c.flush() +} + +// Request returns the bytes for the specified block after fetching them from the connected peer. +func (c *Connection) Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error) { + c.wLock.Lock() + rc := make(chan interface{}) + c.awaiting[c.nextId] = rc + c.mwriter.writeHeader(header{0, c.nextId, messageTypeRequest}) + c.mwriter.writeRequest(request{name, offset, size, hash}) + c.flush() + c.nextId = (c.nextId + 1) & 0xfff + c.wLock.Unlock() + + // Reading something that might be nil from a possibly closed channel... + // r0<~ + + var data []byte + i, ok := <-rc + if ok { + if d, ok := i.([]byte); ok { + data = d + } + } + + var err error + i, ok = <-rc + if ok { + if e, ok := i.(error); ok { + err = e + } + } + return data, err +} + +func (c *Connection) Ping() bool { + c.wLock.Lock() + rc := make(chan interface{}) + c.awaiting[c.nextId] = rc + c.mwriter.writeHeader(header{0, c.nextId, messageTypePing}) + c.flush() + c.nextId = (c.nextId + 1) & 0xfff + c.wLock.Unlock() + + _, ok := <-rc + return ok +} + +func (c *Connection) Stop() { +} + +type flusher interface { + Flush() error +} + +func (c *Connection) flush() { + if f, ok := c.writer.(flusher); ok { + f.Flush() + } +} + +func (c *Connection) close() { + c.closedLock.Lock() + c.closed = true + c.closedLock.Unlock() + c.wLock.Lock() + for _, ch := range c.awaiting { + close(ch) + } + c.awaiting = nil + c.wLock.Unlock() + c.receiver.Close(c.ID) +} + +func (c *Connection) isClosed() bool { + c.closedLock.RLock() + defer c.closedLock.RUnlock() + return c.closed +} + +func (c *Connection) readerLoop() { + for !c.isClosed() { + hdr := c.mreader.readHeader() + if c.mreader.err != nil { + c.close() + break + } + + switch hdr.msgType { + case messageTypeIndex: + files := c.mreader.readIndex() + if c.mreader.err != nil { + c.close() + } else { + c.receiver.Index(c.ID, files) + } + + case messageTypeRequest: + c.processRequest(hdr.msgID) + + case messageTypeResponse: + data := c.mreader.readResponse() + + if c.mreader.err != nil { + c.close() + } else { + c.wLock.RLock() + rc, ok := c.awaiting[hdr.msgID] + c.wLock.RUnlock() + + if ok { + rc <- data + rc <- c.mreader.err + delete(c.awaiting, hdr.msgID) + close(rc) + } + } + + case messageTypePing: + c.wLock.Lock() + c.mwriter.writeUint32(encodeHeader(header{0, hdr.msgID, messageTypePong})) + c.flush() + c.wLock.Unlock() + + case messageTypePong: + c.wLock.Lock() + if rc, ok := c.awaiting[hdr.msgID]; ok { + rc <- true + close(rc) + delete(c.awaiting, hdr.msgID) + } + c.wLock.Unlock() + } + } +} + +func (c *Connection) processRequest(msgID int) { + req := c.mreader.readRequest() + if c.mreader.err != nil { + c.close() + } else { + go func() { + data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash) + c.wLock.Lock() + c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse})) + c.mwriter.writeResponse(data) + buffers.Put(data) + c.flush() + c.wLock.Unlock() + }() + } +} diff --git a/protocol/protocol_test.go b/protocol/protocol_test.go new file mode 100644 index 000000000..99d2618c2 --- /dev/null +++ b/protocol/protocol_test.go @@ -0,0 +1,37 @@ +package protocol + +import ( + "testing" + "testing/quick" +) + +func TestHeaderFunctions(t *testing.T) { + f := func(ver, id, typ int) bool { + ver = int(uint(ver) % 16) + id = int(uint(id) % 4096) + typ = int(uint(typ) % 256) + h0 := header{ver, id, typ} + h1 := decodeHeader(encodeHeader(h0)) + return h0 == h1 + } + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +} + +func TestPad(t *testing.T) { + tests := [][]int{ + {0, 0}, + {1, 3}, + {2, 2}, + {3, 1}, + {4, 0}, + {32, 0}, + {33, 3}, + } + for _, tc := range tests { + if p := pad(tc[0]); p != tc[1] { + t.Errorf("Incorrect padding for %d bytes, %d != %d", tc[0], p, tc[1]) + } + } +} diff --git a/syncthing.ini b/syncthing.ini new file mode 100644 index 000000000..d7db21ef7 --- /dev/null +++ b/syncthing.ini @@ -0,0 +1,11 @@ +[repository] +dir = /Users/jb/Synced + +# The nodes section lists the nodes that make up the cluster. The format is +# = +# The special address "dynamic" means that outbound connections will not be +# attempted, but inbound connections are accepted. + +[nodes] +ITZXTZ7A32DWV3NLNR5W4M3CHVBW56NA = 172.16.32.1:22000 192.23.34.56:22000 +CUGAE43Y5N64CRJU26YFH6MTWPSBLSUL = dynamic diff --git a/testdata/bar b/testdata/bar new file mode 100644 index 000000000..b33c13891 --- /dev/null +++ b/testdata/bar @@ -0,0 +1 @@ +foobarbaz diff --git a/testdata/baz/quux b/testdata/baz/quux new file mode 100644 index 000000000..55976ea06 --- /dev/null +++ b/testdata/baz/quux @@ -0,0 +1 @@ +baazquux diff --git a/testdata/foo b/testdata/foo new file mode 100644 index 000000000..323fae03f --- /dev/null +++ b/testdata/foo @@ -0,0 +1 @@ +foobar diff --git a/tls.go b/tls.go new file mode 100644 index 000000000..9cf47c90d --- /dev/null +++ b/tls.go @@ -0,0 +1,68 @@ +package main + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/sha1" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/base32" + "encoding/pem" + "math/big" + "os" + "path" + "time" +) + +const ( + tlsRSABits = 2048 + tlsName = "syncthing" +) + +func loadCert(dir string) (tls.Certificate, error) { + return tls.LoadX509KeyPair(path.Join(dir, "cert.pem"), path.Join(dir, "key.pem")) +} + +func certId(bs []byte) string { + hf := sha1.New() + hf.Write(bs) + id := hf.Sum(nil) + return base32.StdEncoding.EncodeToString(id) +} + +func newCertificate(dir string) { + priv, err := rsa.GenerateKey(rand.Reader, tlsRSABits) + fatalErr(err) + + notBefore := time.Now() + notAfter := time.Date(2049, 12, 31, 23, 59, 59, 0, time.UTC) + + template := x509.Certificate{ + SerialNumber: new(big.Int).SetInt64(0), + Subject: pkix.Name{ + CommonName: tlsName, + }, + NotBefore: notBefore, + NotAfter: notAfter, + + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + } + + derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv) + fatalErr(err) + + certOut, err := os.Create(path.Join(dir, "cert.pem")) + fatalErr(err) + pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + certOut.Close() + okln("wrote cert.pem") + + keyOut, err := os.OpenFile(path.Join(dir, "key.pem"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + fatalErr(err) + pem.Encode(keyOut, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(priv)}) + keyOut.Close() + okln("wrote key.pem") +} diff --git a/util.go b/util.go new file mode 100644 index 000000000..f8346f5d7 --- /dev/null +++ b/util.go @@ -0,0 +1,7 @@ +package main + +import "time" + +func timing(name string, t0 time.Time) { + debugf("%s: %.02f ms", name, time.Since(t0).Seconds()*1000) +} diff --git a/walk.go b/walk.go new file mode 100644 index 000000000..4e50b4973 --- /dev/null +++ b/walk.go @@ -0,0 +1,117 @@ +package main + +import ( + "fmt" + "os" + "path" + "path/filepath" + "strings" +) + +const BlockSize = 128 * 1024 + +type File struct { + Name string + Flags uint32 + Modified int64 + Blocks BlockList +} + +func (f File) Dump() { + fmt.Printf("%s\n", f.Name) + for _, b := range f.Blocks { + fmt.Printf(" %dB @ %d: %x\n", b.Length, b.Offset, b.Hash) + } + fmt.Println() +} + +func isTempName(name string) bool { + return strings.HasPrefix(path.Base(name), ".syncthing.") +} + +func tempName(name string, modified int64) string { + tdir := path.Dir(name) + tname := fmt.Sprintf(".syncthing.%s.%d", path.Base(name), modified) + return path.Join(tdir, tname) +} + +func genWalker(base string, res *[]File, model *Model) filepath.WalkFunc { + return func(p string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if isTempName(p) { + return nil + } + + if info.Mode()&os.ModeType == 0 { + rn, err := filepath.Rel(base, p) + if err != nil { + return err + } + + fi, err := os.Stat(p) + if err != nil { + return err + } + modified := fi.ModTime().Unix() + + hf, ok := model.LocalFile(rn) + if ok && hf.Modified == modified { + // No change + *res = append(*res, hf) + } else { + if traceFile { + debugf("FILE: Hash %q", p) + } + fd, err := os.Open(p) + if err != nil { + return err + } + defer fd.Close() + + blocks, err := Blocks(fd, BlockSize) + if err != nil { + return err + } + f := File{ + Name: rn, + Flags: uint32(info.Mode()), + Modified: modified, + Blocks: blocks, + } + *res = append(*res, f) + } + } + + return nil + } +} + +func Walk(dir string, model *Model) []File { + var files []File + fn := genWalker(dir, &files, model) + err := filepath.Walk(dir, fn) + if err != nil { + warnln(err) + } + return files +} + +func cleanTempFile(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.Mode()&os.ModeType == 0 && isTempName(path) { + if traceFile { + debugf("FILE: Remove %q", path) + } + os.Remove(path) + } + return nil +} + +func CleanTempFiles(dir string) { + filepath.Walk(dir, cleanTempFile) +} diff --git a/walk_test.go b/walk_test.go new file mode 100644 index 000000000..4f41daedc --- /dev/null +++ b/walk_test.go @@ -0,0 +1,42 @@ +package main + +import ( + "fmt" + "testing" + "time" +) + +var testdata = []struct { + name string + size int + hash string +}{ + {"bar", 10, "2f72cc11a6fcd0271ecef8c61056ee1eb1243be3805bf9a9df98f92f7636b05c"}, + {"baz/quux", 9, "c154d94e94ba7298a6adb0523afe34d1b6a581d6b893a763d45ddc5e209dcb83"}, + {"foo", 7, "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f"}, +} + +func TestWalk(t *testing.T) { + m := new(Model) + files := Walk("testdata", m) + + if l1, l2 := len(files), len(testdata); l1 != l2 { + t.Fatalf("Incorrect number of walked files %d != %d", l1, l2) + } + + for i := range testdata { + if n1, n2 := testdata[i].name, files[i].Name; n1 != n2 { + t.Errorf("Incorrect file name %q != %q for case #%d", n1, n2, i) + } + + if h1, h2 := fmt.Sprintf("%x", files[i].Blocks[0].Hash), testdata[i].hash; h1 != h2 { + t.Errorf("Incorrect hash %q != %q for case #%d", h1, h2, i) + } + + t0 := time.Date(2010, 1, 1, 0, 0, 0, 0, time.UTC).Unix() + t1 := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix() + if mt := files[i].Modified; mt < t0 || mt > t1 { + t.Errorf("Unrealistic modtime %d for test %d", mt, i) + } + } +}