From 5837277f8d070847f6bca215abfd8072a072c677 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 20 Feb 2014 17:40:15 +0100 Subject: [PATCH] Rework XDR encoding --- LICENSE | 2 +- README.md | 24 ++- build.sh | 2 +- discover/PROTOCOL.md | 115 ++++++++++ discover/cmd/discosrv/main.go | 200 ++++++++++++++--- discover/discover.go | 201 ++++++++--------- discover/encoding.go | 160 -------------- discover/encoding_test.go | 138 ------------ discover/packets.go | 39 ++++ discover/packets_xdr.go | 220 +++++++++++++++++++ gui.go | 4 +- main.go | 21 +- model.go | 43 ++-- protocol/PROTOCOL.md | 352 +++++++++++++++++++++--------- protocol/header.go | 34 +++ protocol/message_types.go | 35 +++ protocol/message_xdr.go | 286 +++++++++++++++++++++++++ protocol/messages.go | 186 ---------------- protocol/messages_test.go | 143 ------------- protocol/protocol.go | 157 +++++++------- protocol/protocol_test.go | 19 +- util.go | 4 +- walk.go | 9 +- xdr/cmd/coder/main.go | 393 ++++++++++++++++++++++++++++++++++ xdr/reader.go | 50 ++++- xdr/writer.go | 27 ++- xdr/xdr_test.go | 8 +- 27 files changed, 1843 insertions(+), 1029 deletions(-) create mode 100644 discover/PROTOCOL.md delete mode 100644 discover/encoding.go delete mode 100644 discover/encoding_test.go create mode 100644 discover/packets.go create mode 100644 discover/packets_xdr.go create mode 100644 protocol/header.go create mode 100644 protocol/message_types.go create mode 100644 protocol/message_xdr.go delete mode 100644 protocol/messages.go delete mode 100644 protocol/messages_test.go create mode 100644 xdr/cmd/coder/main.go diff --git a/LICENSE b/LICENSE index fa5b4e205..f94dde919 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (C) 2013 Jakob Borg +Copyright (C) 2013-2014 Jakob Borg Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/README.md b/README.md index 5ebebc3a5..bca960dae 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,18 @@ syncthing ========= -This is `syncthing`, an open BitTorrent Sync alternative. It is -currently far from ready for mass consumption, but it is a usable proof -of concept and tech demo. The following are the project goals: +This is the `syncthing` project. The following are the project goals: - 1. Define an open, secure, language neutral protocol usable for - efficient synchronization of a file repository between an arbitrary - number of nodes. This is the [Block Exchange - Protocol](https://github.com/calmh/syncthing/blob/master/protocol/PROTOCOL.md) - (BEP). + 1. Define a protocol for synchronization of a file repository between a + number of collaborating nodes. The protocol should be well defined, + unambigous, easily understood, free to use, efficient, secure and + languange neutral. This is the [Block Exchange + Protocol](https://github.com/calmh/syncthing/blob/master/protocol/PROTOCOL.md). 2. Provide the reference implementation to demonstrate the usability of - said protocol. This is the `syncthing` utility. + said protocol. This is the `syncthing` utility. It is the hope that + alternative, compatible implementations of the protocol will come to + exist. The two are evolving together; the protocol is not to be considered stable until syncthing 1.0 is released, at which point it is locked down @@ -34,5 +34,9 @@ The syncthing documentation is kept on the License ======= -MIT +All documentation and protocol specifications are licensed +under the [Creative Commons Attribution 4.0 International +License](http://creativecommons.org/licenses/by/4.0/). +All code is licensed under the [MIT +License](https://github.com/calmh/syncthing/blob/master/LICENSE). diff --git a/build.sh b/build.sh index f11232112..4e84528c8 100755 --- a/build.sh +++ b/build.sh @@ -26,7 +26,7 @@ elif [[ $1 == "all" ]] ; then mkdir -p "$buildDir" || exit 1 export GOARM=7 - for os in darwin-amd64 linux-386 linux-amd64 linux-arm freebsd-386 freebsd-amd64 windows-386 windows-amd64 ; do + for os in darwin-amd64 linux-amd64 linux-arm freebsd-amd64 windows-amd64 ; do echo "$os" export name="syncthing-$os" export GOOS=${os%-*} diff --git a/discover/PROTOCOL.md b/discover/PROTOCOL.md new file mode 100644 index 000000000..399d089ba --- /dev/null +++ b/discover/PROTOCOL.md @@ -0,0 +1,115 @@ +Node Discovery Protocol v2 +========================== + +Mode of Operation +----------------- + +There are two distinct modes: "local discovery", performed on a LAN +segment (broadcast domain) and "global discovery" performed over the +Internet in general with the support of a well known server. + +Local discovery does not use Query packets. Instead Announcement packets +are sent periodically and each participating node keeps a table of the +announcements it has seen. On multihomed hosts the announcement packets +should be sent on each interface that syncthing will accept connections. + +It is recommended that local discovery Announcement packets are sent on +a 30 to 60 second interval, possibly with forced transmissions when a +previously unknown node is discovered. + +Global discovery is made possible by periodically updating a global server +using Announcement packets indentical to those transmitted for local +discovery. The node performing discovery will transmit a Query packet to +the global server and expect an Announcement packet in response. In case +the global server has no knowledge of the queried node ID, there will be +no response. A timeout is to be used to determine lookup failure. + +There is no message to unregister from the global server; instead +registrations are forgotten after 60 minutes. It is recommended to +send Announcement packets to the global server on a 30 minute interval. + +Packet Formats +-------------- + +The Announcement packet has the following structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Magic Number (0x029E4C77) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of Node ID | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Node ID (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Addresses | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Zero or more Address Structures \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + Address Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of IP | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ IP (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Port Number | 0x0000 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +This is the XDR encoding of: + + struct Announcement { + unsigned int MagicNumber; + string NodeID<>; + Address Addresses<>; + } + + struct Address { + opaque IP<>; + unsigned short PortNumber; + } + +NodeID is padded to a multiple of 32 bits and all fields are in sent in +network (big endian) byte order. In the Address structure, the IP field +can be of three differnt kinds; + + - A zero length indicates that the IP address should be taken from the + source address of the announcement packet, be it IPv4 or IPv6. The + source address must be a valid unicast address. + + - A four byte length indicates that the address is an IPv4 unicast + address. + + - A sixteen byte length indicates that the address is an IPv6 unicast + address. + +The Query packet has the following structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Magic Number (0x23D63A9A) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of Node ID | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Node ID (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +This is the XDR encoding of: + + struct Announcement { + unsigned int MagicNumber; + string NodeID<>; + } + diff --git a/discover/cmd/discosrv/main.go b/discover/cmd/discosrv/main.go index b1d3492c1..1ac8ae170 100644 --- a/discover/cmd/discosrv/main.go +++ b/discover/cmd/discosrv/main.go @@ -1,8 +1,12 @@ package main import ( + "encoding/binary" + "encoding/hex" + "flag" "log" "net" + "os" "sync" "time" @@ -10,19 +14,38 @@ import ( ) type Node struct { - IP []byte - Port uint16 - Updated time.Time + Addresses []Address + Updated time.Time +} + +type Address struct { + IP []byte + Port uint16 } var ( - nodes = make(map[string]Node) - lock sync.Mutex - queries = 0 + nodes = make(map[string]Node) + lock sync.Mutex + queries = 0 + answered = 0 ) func main() { - addr, _ := net.ResolveUDPAddr("udp", ":22025") + var debug bool + var listen string + var timestamp bool + + flag.StringVar(&listen, "listen", ":22025", "Listen address") + flag.BoolVar(&debug, "debug", false, "Enable debug output") + flag.BoolVar(×tamp, "timestamp", true, "Timestamp the log output") + flag.Parse() + + log.SetOutput(os.Stdout) + if !timestamp { + log.SetFlags(0) + } + + addr, _ := net.ResolveUDPAddr("udp", listen) conn, err := net.ListenUDP("udp", addr) if err != nil { panic(err) @@ -41,8 +64,9 @@ func main() { deleted++ } } - log.Printf("Expired %d nodes; %d nodes in registry; %d queries", deleted, len(nodes), queries) + log.Printf("Expired %d nodes; %d nodes in registry; %d queries (%d answered)", deleted, len(nodes), queries, answered) queries = 0 + answered = 0 lock.Unlock() } @@ -50,49 +74,163 @@ func main() { var buf = make([]byte, 1024) for { + buf = buf[:cap(buf)] n, addr, err := conn.ReadFromUDP(buf) if err != nil { panic(err) } - pkt, err := discover.DecodePacket(buf[:n]) - if err != nil { - log.Println("Warning:", err) + if n < 4 { + log.Printf("Received short packet (%d bytes)", n) continue } - switch pkt.Magic { - case 0x20121025: - // Announcement - lock.Lock() + buf = buf[:n] + magic := binary.BigEndian.Uint32(buf) + + switch magic { + case discover.AnnouncementMagicV1: + var pkt discover.AnnounceV1 + err := pkt.UnmarshalXDR(buf) + if err != nil { + log.Println("AnnounceV1 Unmarshal:", err) + log.Println(hex.Dump(buf)) + continue + } + if debug { + log.Printf("<- %v %#v", addr, pkt) + } + ip := addr.IP.To4() if ip == nil { ip = addr.IP.To16() } node := Node{ - IP: ip, - Port: uint16(pkt.Port), + Addresses: []Address{{ + IP: ip, + Port: pkt.Port, + }}, Updated: time.Now(), } - //log.Println("<-", pkt.ID, node) - nodes[pkt.ID] = node - lock.Unlock() - case 0x19760309: - // Query + lock.Lock() - node, ok := nodes[pkt.ID] + nodes[pkt.NodeID] = node + lock.Unlock() + + case discover.QueryMagicV1: + var pkt discover.QueryV1 + err := pkt.UnmarshalXDR(buf) + if err != nil { + log.Println("QueryV1 Unmarshal:", err) + log.Println(hex.Dump(buf)) + continue + } + if debug { + log.Printf("<- %v %#v", addr, pkt) + } + + lock.Lock() + node, ok := nodes[pkt.NodeID] queries++ lock.Unlock() - if ok { - pkt := discover.Packet{ - Magic: 0x20121025, - ID: pkt.ID, - Port: node.Port, - IP: node.IP, + + if ok && len(node.Addresses) > 0 { + pkt := discover.AnnounceV1{ + Magic: discover.AnnouncementMagicV1, + NodeID: pkt.NodeID, + Port: node.Addresses[0].Port, + IP: node.Addresses[0].IP, } - _, _, err = conn.WriteMsgUDP(discover.EncodePacket(pkt), nil, addr) + if debug { + log.Printf("-> %v %#v", addr, pkt) + } + + tb := pkt.MarshalXDR() + _, _, err = conn.WriteMsgUDP(tb, nil, addr) if err != nil { - log.Println("Warning:", err) + log.Println("QueryV1 response write:", err) } + + lock.Lock() + answered++ + lock.Unlock() + } + + case discover.AnnouncementMagicV2: + var pkt discover.AnnounceV2 + err := pkt.UnmarshalXDR(buf) + if err != nil { + log.Println("AnnounceV2 Unmarshal:", err) + log.Println(hex.Dump(buf)) + continue + } + if debug { + log.Printf("<- %v %#v", addr, pkt) + } + + ip := addr.IP.To4() + if ip == nil { + ip = addr.IP.To16() + } + + var addrs []Address + for _, addr := range pkt.Addresses { + tip := addr.IP + if len(tip) == 0 { + tip = ip + } + addrs = append(addrs, Address{ + IP: tip, + Port: addr.Port, + }) + } + + node := Node{ + Addresses: addrs, + Updated: time.Now(), + } + + lock.Lock() + nodes[pkt.NodeID] = node + lock.Unlock() + + case discover.QueryMagicV2: + var pkt discover.QueryV2 + err := pkt.UnmarshalXDR(buf) + if err != nil { + log.Println("QueryV2 Unmarshal:", err) + log.Println(hex.Dump(buf)) + continue + } + if debug { + log.Printf("<- %v %#v", addr, pkt) + } + + lock.Lock() + node, ok := nodes[pkt.NodeID] + queries++ + lock.Unlock() + + if ok && len(node.Addresses) > 0 { + pkt := discover.AnnounceV2{ + Magic: discover.AnnouncementMagicV2, + NodeID: pkt.NodeID, + } + for _, addr := range node.Addresses { + pkt.Addresses = append(pkt.Addresses, discover.Address{IP: addr.IP, Port: addr.Port}) + } + if debug { + log.Printf("-> %v %#v", addr, pkt) + } + + tb := pkt.MarshalXDR() + _, _, err = conn.WriteMsgUDP(tb, nil, addr) + if err != nil { + log.Println("QueryV2 response write:", err) + } + + lock.Lock() + answered++ + lock.Unlock() } } } diff --git a/discover/discover.go b/discover/discover.go index d914494d1..6556ad87c 100644 --- a/discover/discover.go +++ b/discover/discover.go @@ -1,89 +1,12 @@ -/* -This is the local node discovery protocol. In principle we might be better -served by something more standardized, such as mDNS / DNS-SD. In practice, this -was much easier and quicker to get up and running. - -The mode of operation is to periodically (currently once every 30 seconds) -broadcast an Announcement packet to UDP port 21025. The packet has the -following format: - - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Magic Number (0x20121025) | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Port Number | Reserved | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Length of NodeID | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - / / - \ NodeID (variable length) \ - / / - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Length of IP | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - / / - \ IP (variable length) \ - / / - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - -This is the XDR encoding of: - -struct Announcement { - unsigned int Magic; - unsigned short Port; - string NodeID<>; -} - -(Hence NodeID is padded to a multiple of 32 bits) - -The sending node's address is not encoded in local announcement -- the Length -of IP field is set to zero and the address is taken to be the source address of -the announcement. In announcement packets sent by a discovery server in -response to a query, the IP is present and the length is either 4 (IPv4) or 16 -(IPv6). - -Every time such a packet is received, a local table that maps NodeID to Address -is updated. When the local node wants to connect to another node with the -address specification 'dynamic', this table is consulted. - -For external discovery, an identical packet is sent every 30 minutes to the -external discovery server. The server keeps information for up to 60 minutes. -To query the server, and UDP packet with the format below is sent. - - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Magic Number (0x19760309) | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Length of NodeID | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - / / - \ NodeID (variable length) \ - / / - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - -This is the XDR encoding of: - -struct Announcement { - unsigned int Magic; - string NodeID<>; -} - -(Hence NodeID is padded to a multiple of 32 bits) - -It is answered with an announcement packet for the queried node ID if the -information is available. There is no answer for queries about unknown nodes. A -reasonable timeout is recommended instead. (This, combined with server side -rate limits for packets per source IP and queries per node ID, prevents the -server from being used as an amplifier in a DDoS attack.) -*/ package discover import ( + "encoding/hex" + "errors" "fmt" "log" "net" + "strings" "sync" "time" @@ -91,9 +14,8 @@ import ( ) const ( - AnnouncementPort = 21025 - AnnouncementMagic = 0x20121025 - QueryMagic = 0x19760309 + AnnouncementPort = 21025 + Debug = false ) type Discoverer struct { @@ -103,11 +25,15 @@ type Discoverer struct { ExtBroadcastIntv time.Duration conn *net.UDPConn - registry map[string]string + registry map[string][]string registryLock sync.RWMutex extServer string } +var ( + ErrIncorrectMagic = errors.New("Incorrect magic number") +) + // We tolerate a certain amount of errors because we might be running on // laptops that sleep and wake, have intermittent network connectivity, etc. // When we hit this many errors in succession, we stop. @@ -127,7 +53,7 @@ func NewDiscoverer(id string, port int, extServer string) (*Discoverer, error) { ExtBroadcastIntv: 1800 * time.Second, conn: conn, - registry: make(map[string]string), + registry: make(map[string][]string), extServer: extServer, } @@ -146,7 +72,8 @@ func NewDiscoverer(id string, port int, extServer string) (*Discoverer, error) { func (d *Discoverer) sendAnnouncements() { remote4 := &net.UDPAddr{IP: net.IP{255, 255, 255, 255}, Port: AnnouncementPort} - buf := EncodePacket(Packet{AnnouncementMagic, uint16(d.ListenPort), d.MyID, nil}) + pkt := AnnounceV2{AnnouncementMagicV2, d.MyID, []Address{{nil, 22000}}} + buf := pkt.MarshalXDR() go d.writeAnnouncements(buf, remote4, d.BroadcastIntv) } @@ -157,7 +84,8 @@ func (d *Discoverer) sendExtAnnouncements() { return } - buf := EncodePacket(Packet{AnnouncementMagic, uint16(22000), d.MyID, nil}) + pkt := AnnounceV2{AnnouncementMagicV2, d.MyID, []Address{{nil, 22000}}} + buf := pkt.MarshalXDR() go d.writeAnnouncements(buf, extIP, d.ExtBroadcastIntv) } @@ -189,90 +117,137 @@ func (d *Discoverer) recvAnnouncements() { continue } - pkt, err := DecodePacket(buf[:n]) - if err != nil || pkt.Magic != AnnouncementMagic { + if Debug { + log.Printf("read announcement:\n%s", hex.Dump(buf[:n])) + } + + var pkt AnnounceV2 + err = pkt.UnmarshalXDR(buf[:n]) + if err != nil { errCounter++ time.Sleep(time.Second) continue } + if Debug { + log.Printf("read announcement: %#v", pkt) + } + errCounter = 0 - if pkt.ID != d.MyID { - nodeAddr := fmt.Sprintf("%s:%d", addr.IP.String(), pkt.Port) - d.registryLock.Lock() - if d.registry[pkt.ID] != nodeAddr { - d.registry[pkt.ID] = nodeAddr + if pkt.NodeID != d.MyID { + var addrs []string + for _, a := range pkt.Addresses { + var nodeAddr string + if len(a.IP) > 0 { + nodeAddr = fmt.Sprintf("%s:%d", ipStr(a.IP), a.Port) + } else { + nodeAddr = fmt.Sprintf("%s:%d", addr.IP.String(), a.Port) + } + addrs = append(addrs, nodeAddr) } + if Debug { + log.Printf("register: %#v", addrs) + } + d.registryLock.Lock() + d.registry[pkt.NodeID] = addrs d.registryLock.Unlock() } } log.Println("discover/read: stopping due to too many errors:", err) } -func (d *Discoverer) externalLookup(node string) (string, bool) { +func (d *Discoverer) externalLookup(node string) []string { extIP, err := net.ResolveUDPAddr("udp", d.extServer) if err != nil { log.Printf("discover/external: %v; no external lookup", err) - return "", false + return nil } conn, err := net.DialUDP("udp", nil, extIP) if err != nil { log.Printf("discover/external: %v; no external lookup", err) - return "", false + return nil } defer conn.Close() err = conn.SetDeadline(time.Now().Add(5 * time.Second)) if err != nil { log.Printf("discover/external: %v; no external lookup", err) - return "", false + return nil } - _, err = conn.Write(EncodePacket(Packet{QueryMagic, 0, node, nil})) + buf := QueryV2{QueryMagicV2, node}.MarshalXDR() + _, err = conn.Write(buf) if err != nil { log.Printf("discover/external: %v; no external lookup", err) - return "", false + return nil } + buffers.Put(buf) - var buf = buffers.Get(256) + buf = buffers.Get(256) defer buffers.Put(buf) n, err := conn.Read(buf) if err != nil { if err, ok := err.(net.Error); ok && err.Timeout() { // Expected if the server doesn't know about requested node ID - return "", false + return nil } log.Printf("discover/external/read: %v; no external lookup", err) - return "", false + return nil } - pkt, err := DecodePacket(buf[:n]) + if Debug { + log.Printf("read external:\n%s", hex.Dump(buf[:n])) + } + + var pkt AnnounceV2 + err = pkt.UnmarshalXDR(buf[:n]) if err != nil { - log.Printf("discover/external/read: %v; no external lookup", err) - return "", false + log.Println("discover/external/decode:", err) + return nil } - if pkt.Magic != AnnouncementMagic { - log.Printf("discover/external/read: bad magic; no external lookup", err) - return "", false + if Debug { + log.Printf("read external: %#v", pkt) } - return fmt.Sprintf("%s:%d", ipStr(pkt.IP), pkt.Port), true + var addrs []string + for _, a := range pkt.Addresses { + var nodeAddr string + if len(a.IP) > 0 { + nodeAddr = fmt.Sprintf("%s:%d", ipStr(a.IP), a.Port) + } + addrs = append(addrs, nodeAddr) + } + return addrs } -func (d *Discoverer) Lookup(node string) (string, bool) { +func (d *Discoverer) Lookup(node string) []string { d.registryLock.Lock() addr, ok := d.registry[node] d.registryLock.Unlock() if ok { - return addr, true + return addr } else if len(d.extServer) != 0 { // We might want to cache this, but not permanently so it needs some intelligence return d.externalLookup(node) } - return "", false + return nil +} + +func ipStr(ip []byte) string { + var f = "%d" + var s = "." + if len(ip) > 4 { + f = "%x" + s = ":" + } + var ss = make([]string, len(ip)) + for i := range ip { + ss[i] = fmt.Sprintf(f, ip[i]) + } + return strings.Join(ss, s) } diff --git a/discover/encoding.go b/discover/encoding.go deleted file mode 100644 index bdf7338b2..000000000 --- a/discover/encoding.go +++ /dev/null @@ -1,160 +0,0 @@ -package discover - -import ( - "encoding/binary" - "errors" - "fmt" -) - -type Packet struct { - Magic uint32 // AnnouncementMagic or QueryMagic - Port uint16 // unset if magic == QueryMagic - ID string - IP []byte // zero length in local announcements -} - -var ( - errBadMagic = errors.New("bad magic") - errFormat = errors.New("incorrect packet format") -) - -func EncodePacket(pkt Packet) []byte { - if l := len(pkt.IP); l != 0 && l != 4 && l != 16 { - // bad ip format - return nil - } - - var idbs = []byte(pkt.ID) - var l = 4 + 4 + len(idbs) + pad(len(idbs)) - if pkt.Magic == AnnouncementMagic { - l += 4 + 4 + len(pkt.IP) - } - - var buf = make([]byte, l) - var offset = 0 - - binary.BigEndian.PutUint32(buf[offset:], pkt.Magic) - offset += 4 - - if pkt.Magic == AnnouncementMagic { - binary.BigEndian.PutUint16(buf[offset:], uint16(pkt.Port)) - offset += 4 - } - - binary.BigEndian.PutUint32(buf[offset:], uint32(len(idbs))) - offset += 4 - copy(buf[offset:], idbs) - offset += len(idbs) + pad(len(idbs)) - - if pkt.Magic == AnnouncementMagic { - binary.BigEndian.PutUint32(buf[offset:], uint32(len(pkt.IP))) - offset += 4 - copy(buf[offset:], pkt.IP) - offset += len(pkt.IP) - } - - return buf -} - -func DecodePacket(buf []byte) (*Packet, error) { - var p Packet - var offset int - - if len(buf) < 4 { - // short packet - return nil, errFormat - } - p.Magic = binary.BigEndian.Uint32(buf[offset:]) - offset += 4 - - if p.Magic != AnnouncementMagic && p.Magic != QueryMagic { - return nil, errBadMagic - } - - if p.Magic == AnnouncementMagic { - // Port Number - - if len(buf) < offset+4 { - // short packet - return nil, errFormat - } - p.Port = binary.BigEndian.Uint16(buf[offset:]) - offset += 2 - reserved := binary.BigEndian.Uint16(buf[offset:]) - if reserved != 0 { - return nil, errFormat - } - offset += 2 - } - - // Node ID - - if len(buf) < offset+4 { - // short packet - return nil, errFormat - } - l := binary.BigEndian.Uint32(buf[offset:]) - offset += 4 - - if len(buf) < offset+int(l)+pad(int(l)) { - // short packet - return nil, errFormat - } - idbs := buf[offset : offset+int(l)] - p.ID = string(idbs) - offset += int(l) + pad(int(l)) - - if p.Magic == AnnouncementMagic { - // IP - - if len(buf) < offset+4 { - // short packet - return nil, errFormat - } - l = binary.BigEndian.Uint32(buf[offset:]) - offset += 4 - - if l != 0 && l != 4 && l != 16 { - // weird ip length - return nil, errFormat - } - if len(buf) < offset+int(l) { - // short packet - return nil, errFormat - } - if l > 0 { - p.IP = buf[offset : offset+int(l)] - offset += int(l) - } - } - - if len(buf[offset:]) > 0 { - // extra data - return nil, errFormat - } - - return &p, nil -} - -func pad(l int) int { - d := l % 4 - if d == 0 { - return 0 - } - return 4 - d -} - -func ipStr(ip []byte) string { - switch len(ip) { - case 4: - return fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]) - case 16: - return fmt.Sprintf("%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x", - ip[0], ip[1], ip[2], ip[3], - ip[4], ip[5], ip[6], ip[7], - ip[8], ip[9], ip[10], ip[11], - ip[12], ip[13], ip[14], ip[15]) - default: - return "" - } -} diff --git a/discover/encoding_test.go b/discover/encoding_test.go deleted file mode 100644 index bd212e777..000000000 --- a/discover/encoding_test.go +++ /dev/null @@ -1,138 +0,0 @@ -package discover - -import ( - "bytes" - "reflect" - "testing" -) - -var testdata = []struct { - data []byte - packet *Packet - err error -}{ - { - []byte{0x20, 0x12, 0x10, 0x25, - 0x12, 0x34, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x05, - 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00}, - &Packet{ - Magic: 0x20121025, - Port: 0x1234, - ID: "hello", - }, - nil, - }, - { - []byte{0x20, 0x12, 0x10, 0x25, - 0x34, 0x56, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x08, - 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x21, 0x21, 0x21, - 0x00, 0x00, 0x00, 0x04, - 0x01, 0x02, 0x03, 0x04}, - &Packet{ - Magic: 0x20121025, - Port: 0x3456, - ID: "hello!!!", - IP: []byte{1, 2, 3, 4}, - }, - nil, - }, - { - []byte{0x19, 0x76, 0x03, 0x09, - 0x00, 0x00, 0x00, 0x06, - 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x21, 0x00, 0x00}, - &Packet{ - Magic: 0x19760309, - ID: "hello!", - }, - nil, - }, - { - []byte{0x20, 0x12, 0x10, 0x25, - 0x12, 0x34, 0x12, 0x34, // reserved bits not set to zero - 0x00, 0x00, 0x00, 0x06, - 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x21, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00}, - nil, - errFormat, - }, - { - []byte{0x20, 0x12, 0x10, 0x25, - 0x12, 0x34, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x06, - 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x21, // missing padding - 0x00, 0x00, 0x00, 0x00}, - nil, - errFormat, - }, - { - []byte{0x19, 0x77, 0x03, 0x09, // incorrect Magic - 0x00, 0x00, 0x00, 0x06, - 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x21, 0x00, 0x00}, - nil, - errBadMagic, - }, - { - []byte{0x19, 0x76, 0x03, 0x09, - 0x6c, 0x6c, 0x6c, 0x6c, // length exceeds packet size - 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x21, 0x00, 0x00}, - nil, - errFormat, - }, - { - []byte{0x19, 0x76, 0x03, 0x09, - 0x00, 0x00, 0x00, 0x06, - 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x21, 0x00, 0x00, - 0x23}, // extra data at the end - nil, - errFormat, - }, -} - -func TestDecodePacket(t *testing.T) { - for i, test := range testdata { - p, err := DecodePacket(test.data) - if err != test.err { - t.Errorf("%d: unexpected error %v", i, err) - } else { - if !reflect.DeepEqual(p, test.packet) { - t.Errorf("%d: incorrect packet\n%v\n%v", i, test.packet, p) - } - } - } -} - -func TestEncodePacket(t *testing.T) { - for i, test := range testdata { - if test.err != nil { - continue - } - buf := EncodePacket(*test.packet) - if bytes.Compare(buf, test.data) != 0 { - t.Errorf("%d: incorrect encoded packet\n% x\n% 0x", i, test.data, buf) - } - } -} - -var ipstrTests = []struct { - d []byte - s string -}{ - {[]byte{192, 168, 34}, ""}, - {[]byte{192, 168, 0, 34}, "192.168.0.34"}, - {[]byte{0x20, 0x01, 0x12, 0x34, - 0x34, 0x56, 0x56, 0x78, - 0x78, 0x00, 0x00, 0xdc, - 0x00, 0x00, 0x43, 0x54}, "2001:1234:3456:5678:7800:00dc:0000:4354"}, -} - -func TestIPStr(t *testing.T) { - for _, tc := range ipstrTests { - s1 := ipStr(tc.d) - if s1 != tc.s { - t.Errorf("Incorrect ipstr %q != %q", tc.s, s1) - } - } -} diff --git a/discover/packets.go b/discover/packets.go new file mode 100644 index 000000000..abd254120 --- /dev/null +++ b/discover/packets.go @@ -0,0 +1,39 @@ +package discover + +const ( + AnnouncementMagicV1 = 0x20121025 + QueryMagicV1 = 0x19760309 +) + +type QueryV1 struct { + Magic uint32 + NodeID string // max:64 +} + +type AnnounceV1 struct { + Magic uint32 + Port uint16 + NodeID string // max:64 + IP []byte // max:16 +} + +const ( + AnnouncementMagicV2 = 0x029E4C77 + QueryMagicV2 = 0x23D63A9A +) + +type QueryV2 struct { + Magic uint32 + NodeID string // max:64 +} + +type AnnounceV2 struct { + Magic uint32 + NodeID string // max:64 + Addresses []Address // max:16 +} + +type Address struct { + IP []byte // max:16 + Port uint16 +} diff --git a/discover/packets_xdr.go b/discover/packets_xdr.go new file mode 100644 index 000000000..d8fcb66a6 --- /dev/null +++ b/discover/packets_xdr.go @@ -0,0 +1,220 @@ +package discover + +import ( + "bytes" + "io" + + "github.com/calmh/syncthing/xdr" +) + +func (o QueryV1) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o QueryV1) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o QueryV1) encodeXDR(xw *xdr.Writer) (int, error) { + xw.WriteUint32(o.Magic) + if len(o.NodeID) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.NodeID) + return xw.Tot(), xw.Error() +} + +func (o *QueryV1) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *QueryV1) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *QueryV1) decodeXDR(xr *xdr.Reader) error { + o.Magic = xr.ReadUint32() + o.NodeID = xr.ReadStringMax(64) + return xr.Error() +} + +func (o AnnounceV1) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o AnnounceV1) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o AnnounceV1) encodeXDR(xw *xdr.Writer) (int, error) { + xw.WriteUint32(o.Magic) + xw.WriteUint16(o.Port) + if len(o.NodeID) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.NodeID) + if len(o.IP) > 16 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteBytes(o.IP) + return xw.Tot(), xw.Error() +} + +func (o *AnnounceV1) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *AnnounceV1) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *AnnounceV1) decodeXDR(xr *xdr.Reader) error { + o.Magic = xr.ReadUint32() + o.Port = xr.ReadUint16() + o.NodeID = xr.ReadStringMax(64) + o.IP = xr.ReadBytesMax(16) + return xr.Error() +} + +func (o QueryV2) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o QueryV2) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o QueryV2) encodeXDR(xw *xdr.Writer) (int, error) { + xw.WriteUint32(o.Magic) + if len(o.NodeID) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.NodeID) + return xw.Tot(), xw.Error() +} + +func (o *QueryV2) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *QueryV2) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *QueryV2) decodeXDR(xr *xdr.Reader) error { + o.Magic = xr.ReadUint32() + o.NodeID = xr.ReadStringMax(64) + return xr.Error() +} + +func (o AnnounceV2) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o AnnounceV2) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o AnnounceV2) encodeXDR(xw *xdr.Writer) (int, error) { + xw.WriteUint32(o.Magic) + if len(o.NodeID) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.NodeID) + if len(o.Addresses) > 16 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteUint32(uint32(len(o.Addresses))) + for i := range o.Addresses { + o.Addresses[i].encodeXDR(xw) + } + return xw.Tot(), xw.Error() +} + +func (o *AnnounceV2) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *AnnounceV2) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *AnnounceV2) decodeXDR(xr *xdr.Reader) error { + o.Magic = xr.ReadUint32() + o.NodeID = xr.ReadStringMax(64) + _AddressesSize := int(xr.ReadUint32()) + if _AddressesSize > 16 { + return xdr.ErrElementSizeExceeded + } + o.Addresses = make([]Address, _AddressesSize) + for i := range o.Addresses { + (&o.Addresses[i]).decodeXDR(xr) + } + return xr.Error() +} + +func (o Address) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o Address) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o Address) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.IP) > 16 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteBytes(o.IP) + xw.WriteUint16(o.Port) + return xw.Tot(), xw.Error() +} + +func (o *Address) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *Address) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *Address) decodeXDR(xr *xdr.Reader) error { + o.IP = xr.ReadBytesMax(16) + o.Port = xr.ReadUint16() + return xr.Error() +} diff --git a/gui.go b/gui.go index a584f886e..93bbbed14 100644 --- a/gui.go +++ b/gui.go @@ -112,11 +112,11 @@ type guiFile File func (f guiFile) MarshalJSON() ([]byte, error) { type t struct { Name string - Size int + Size int64 } return json.Marshal(t{ Name: f.Name, - Size: File(f).Size(), + Size: File(f).Size, }) } diff --git a/main.go b/main.go index 541810bc0..96c6c8f1e 100644 --- a/main.go +++ b/main.go @@ -343,7 +343,7 @@ func printStatsLoop(m *Model) { outbps := 8 * int(float64(stats.OutBytesTotal-lastStats[node].OutBytesTotal)/secs) if inbps+outbps > 0 { - infof("%s: %sb/s in, %sb/s out", node[0:5], MetricPrefix(inbps), MetricPrefix(outbps)) + infof("%s: %sb/s in, %sb/s out", node[0:5], MetricPrefix(int64(inbps)), MetricPrefix(int64(outbps))) } lastStats[node] = stats @@ -449,12 +449,12 @@ func connect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Confi } for _, addr := range nodeCfg.Addresses { if addr == "dynamic" { - var ok bool if disc != nil { - addr, ok = disc.Lookup(nodeCfg.NodeID) - } - if !ok { - continue + t := disc.Lookup(nodeCfg.NodeID) + if len(t) == 0 { + continue + } + addr = t[0] //XXX: Handle all of them } } @@ -502,7 +502,7 @@ func saveIndex(m *Model) { gzw := gzip.NewWriter(idxf) - protocol.WriteIndex(gzw, "local", m.ProtocolIndex()) + protocol.IndexMessage{"local", m.ProtocolIndex()}.EncodeXDR(gzw) gzw.Close() idxf.Close() os.Rename(fullName+".tmp", fullName) @@ -522,11 +522,12 @@ func loadIndex(m *Model) { } defer gzr.Close() - repo, idx, err := protocol.ReadIndex(gzr) - if repo != "local" || err != nil { + var im protocol.IndexMessage + err = im.DecodeXDR(gzr) + if err != nil || im.Repository != "local" { return } - m.SeedLocal(idx) + m.SeedLocal(im.Files) } func ensureDir(dir string, mode int) { diff --git a/model.go b/model.go index 003acf57e..112bde4b9 100644 --- a/model.go +++ b/model.go @@ -56,7 +56,7 @@ type Model struct { type Connection interface { ID() string Index(string, []protocol.FileInfo) - Request(repo, name string, offset int64, size uint32, hash []byte) ([]byte, error) + Request(repo, name string, offset int64, size int) ([]byte, error) Statistics() protocol.Statistics Option(key string) string } @@ -171,9 +171,9 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { m.pmut.RLock() m.rmut.RLock() - var tot int + var tot int64 for _, f := range m.global { - tot += f.Size() + tot += f.Size } var res = make(map[string]ConnectionInfo) @@ -187,14 +187,14 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { ci.Address = nc.RemoteAddr().String() } - var have int + var have int64 for _, f := range m.remote[node] { if f.Equals(m.global[f.Name]) { - have += f.Size() + have += f.Size } } - ci.Completion = 100 * have / tot + ci.Completion = int(100 * have / tot) res[node] = ci } @@ -205,15 +205,15 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { return res } -// LocalSize returns the number of files, deleted files and total bytes for all +// GlobalSize returns the number of files, deleted files and total bytes for all // files in the global model. -func (m *Model) GlobalSize() (files, deleted, bytes int) { +func (m *Model) GlobalSize() (files, deleted int, bytes int64) { m.gmut.RLock() for _, f := range m.global { if f.Flags&protocol.FlagDeleted == 0 { files++ - bytes += f.Size() + bytes += f.Size } else { deleted++ } @@ -225,13 +225,13 @@ func (m *Model) GlobalSize() (files, deleted, bytes int) { // LocalSize returns the number of files, deleted files and total bytes for all // files in the local repository. -func (m *Model) LocalSize() (files, deleted, bytes int) { +func (m *Model) LocalSize() (files, deleted int, bytes int64) { m.lmut.RLock() for _, f := range m.local { if f.Flags&protocol.FlagDeleted == 0 { files++ - bytes += f.Size() + bytes += f.Size } else { deleted++ } @@ -243,14 +243,14 @@ func (m *Model) LocalSize() (files, deleted, bytes int) { // InSyncSize returns the number and total byte size of the local files that // are in sync with the global model. -func (m *Model) InSyncSize() (files, bytes int) { +func (m *Model) InSyncSize() (files, bytes int64) { m.gmut.RLock() m.lmut.RLock() for n, f := range m.local { if gf, ok := m.global[n]; ok && f.Equals(gf) { files++ - bytes += f.Size() + bytes += f.Size } } @@ -260,7 +260,7 @@ func (m *Model) InSyncSize() (files, bytes int) { } // NeedFiles returns the list of currently needed files and the total size. -func (m *Model) NeedFiles() (files []File, bytes int) { +func (m *Model) NeedFiles() (files []File, bytes int64) { qf := m.fq.QueuedFiles() m.gmut.RLock() @@ -268,7 +268,7 @@ func (m *Model) NeedFiles() (files []File, bytes int) { for _, n := range qf { f := m.global[n] files = append(files, f) - bytes += f.Size() + bytes += f.Size } m.gmut.RUnlock() @@ -387,7 +387,7 @@ func (m *Model) Close(node string, err error) { // Request returns the specified data segment by reading it from local disk. // Implements the protocol.Model interface. -func (m *Model) Request(nodeID, repo, name string, offset int64, size uint32, hash []byte) ([]byte, error) { +func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]byte, error) { // Verify that the requested file exists in the local and global model. m.lmut.RLock() lf, localOk := m.local[name] @@ -398,7 +398,7 @@ func (m *Model) Request(nodeID, repo, name string, offset int64, size uint32, ha m.gmut.RUnlock() if !localOk || !globalOk { - warnf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) + warnf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size) return nil, ErrNoSuchFile } if lf.Flags&protocol.FlagInvalid != 0 { @@ -406,7 +406,7 @@ func (m *Model) Request(nodeID, repo, name string, offset int64, size uint32, ha } if m.trace["net"] && nodeID != "" { - debugf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) + debugf("NET REQ(in): %s: %q o=%d s=%d", nodeID, name, offset, size) } fn := path.Join(m.dir, name) fd, err := os.Open(fn) // XXX: Inefficient, should cache fd? @@ -541,7 +541,7 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { if m.trace["pull"] { debugln("PULL: Request", nodeID, i, qb.name, qb.block.Offset) } - data, _ := protoConn.Request("default", qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash) + data, _ := protoConn.Request("default", qb.name, qb.block.Offset, int(qb.block.Size)) m.fq.Done(qb.name, qb.block.Offset, data) } else { time.Sleep(1 * time.Second) @@ -574,7 +574,7 @@ func (m *Model) ProtocolIndex() []protocol.FileInfo { return index } -func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) { +func (m *Model) requestGlobal(nodeID, name string, offset int64, size int, hash []byte) ([]byte, error) { m.pmut.RLock() nc, ok := m.protoConn[nodeID] m.pmut.RUnlock() @@ -587,7 +587,7 @@ func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, ha debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash) } - return nc.Request("default", name, offset, size, hash) + return nc.Request("default", name, offset, size) } func (m *Model) broadcastIndexLoop() { @@ -891,6 +891,7 @@ func fileFromFileInfo(f protocol.FileInfo) File { } return File{ Name: f.Name, + Size: offset, Flags: f.Flags, Modified: f.Modified, Version: f.Version, diff --git a/protocol/PROTOCOL.md b/protocol/PROTOCOL.md index 885f753f4..eb1383c2e 100644 --- a/protocol/PROTOCOL.md +++ b/protocol/PROTOCOL.md @@ -1,26 +1,29 @@ -Block Exchange Protocol v1.0 -============================ +Block Exchange Protocol v1 +========================== Introduction and Definitions ---------------------------- -The BEP is used between two or more _nodes_ thus forming a _cluster_. -Each node has a _repository_ of files described by the _local model_, -containing modifications times and block hashes. The local model is sent -to the other nodes in the cluster. The union of all files in the local -models, with files selected for most recent modification time, forms the -_global model_. Each node strives to get it's repository in sync with -the global model by requesting missing blocks from the other nodes. +BEP is used between two or more _nodes_ thus forming a _cluster_. Each +node has one or more _repositories_ of files described by the _local +model_, containing metadata and block hashes. The local model is sent to +the other nodes in the cluster. The union of all files in the local +models, with files selected for highest change version, forms the +_global model_. Each node strives to get it's repositories in sync with +the global model by requesting missing or outdated blocks from the other +nodes in the cluster. + +File data is described and transferred in units of _blocks_, each being +128 KiB (131072 bytes) in size. Transport and Authentication ---------------------------- -The BEP itself does not provide retransmissions, compression, encryption -nor authentication. It is expected that this is performed at lower -layers of the networking stack. A typical deployment stack should be -similar to the following: +BEP itself does not provide retransmissions, compression, encryption nor +authentication. It is expected that this is performed at lower layers of +the networking stack. The typical deployment stack is the following: - |-----------------------------| + +-----------------------------| | Block Exchange Protocol | |-----------------------------| | Compression (RFC 1951) | @@ -48,73 +51,127 @@ message boundary. Messages -------- -Every message starts with one 32 bit word indicating the message version -and type. For BEP v1.0 the Version field is set to zero. Future versions -with incompatible message formats will increment the Version field. The -reserved bits must be set to zero. +Every message starts with one 32 bit word indicating the message +version, type and ID. 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Ver=0 | Message ID | Type | Reserved | + | Ver | Type | Message ID | Reply To | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -All data following the message header is in XDR (RFC 1014) encoding. -The actual data types in use by BEP, in XDR naming convention, are: +For BEP v1 the Version field is set to zero. Future versions with +incompatible message formats will increment the Version field. + +The Type field indicates the type of data following the message header +and is one of the integers defined below. + +The Message ID is set to a unique value for each transmitted message. In +request messages the Reply To is set to zero. In response messages it is +set to the message ID of the corresponding request. + +All data following the message header is in XDR (RFC 1014) encoding. All +fields smaller than 32 bits and all variable length data is padded to a +multiple of 32 bits. The actual data types in use by BEP, in XDR naming +convention, are: - (unsigned) int -- (unsigned) 32 bit integer - (unsigned) hyper -- (unsigned) 64 bit integer - opaque<> -- variable length opaque data - string<> -- variable length string -The encoding of opaque<> and string<> are identical, the distinction is -solely in interpretation. Opaque data should not be interpreted as such, -but can be compared bytewise to other opaque data. All strings use the -UTF-8 encoding. +The transmitted length of string and opaque data is the length of actual +data, excluding any added padding. The encoding of opaque<> and string<> +are identical, the distinction being solely in interpretation. Opaque +data should not be interpreted but can be compared bytewise to other +opaque data. All strings use the UTF-8 encoding. ### Index (Type = 1) -The Index message defines the contents of the senders repository. A Index -message is sent by each peer immediately upon connection and whenever the -local repository contents changes. However, if a peer has no data to -advertise (the repository is empty, or it is set to only import data) it -is allowed but not required to send an empty Index message (a file list of -zero length). If the repository contents change from non-empty to empty, -an empty Index message must be sent. There is no response to the Index -message. +The Index message defines the contents of the senders repository. An +Index message is sent by each peer immediately upon connection. A peer +with no data to advertise (the repository is empty, or it is set to only +import data) is allowed but not required to send an empty Index message +(a file list of zero length). If the repository contents change from +non-empty to empty, an empty Index message must be sent. There is no +response to the Index message. - struct IndexMessage { - string Repository<>; - FileInfo Files<>; - } +#### Graphical Representation - struct FileInfo { - string Name<>; - unsigned int Flags; - hyper Modified; - unsigned int Version; - BlockInfo Blocks<>; - } + IndexMessage Structure: - struct BlockInfo { - unsigned int Length; - opaque Hash<> - } + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of Repository | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Repository (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Files | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Zero or more FileInfo Structures \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + + FileInfo Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of Name | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Name (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Flags | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | + + Modified (64 bits) + + | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Version | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Blocks | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Zero or more BlockInfo Structures \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + + BlockInfo Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Size | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of Hash | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Hash (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +#### Fields The Repository field identifies the repository that the index message pertains to. For single repository implementations an empty repository -ID is acceptable. +ID is acceptable, or the word "default". The Name is the file name path +relative to the repository root. The combination of Repository and Name +uniquely identifies each file in a cluster. -The file name is the part relative to the repository root. The -modification time is expressed as the number of seconds since the Unix -Epoch. The version field is a counter that increments each time the file -changes but resets to zero each time the modification is updated. This -is used to signal changes to the file (or file metadata) while the -modification time remains unchanged. The hash algorithm is implied by -the hash length. Currently, the hash must be 32 bytes long and computed -by SHA256. +The Version field is a counter that is initially zero for each file. It +is incremented each time a change is detected. The combination of +Repository, Name and Version uniquely identifies the contents of a file +at a certain point in time. -The flags field is made up of the following single bit flags: +The Flags field is made up of the following single bit flags: 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 @@ -136,62 +193,128 @@ The flags field is made up of the following single bit flags: - Bit 0 through 17 are reserved for future use and shall be set to zero. +The hash algorithm is implied by the Hash length. Currently, the hash +must be 32 bytes long and computed by SHA256. + +The Modified time is expressed as the number of seconds since the Unix +Epoch. In the rare occasion that a file is simultaneously and +independently modified by two nodes in the same cluster and thus end up +on the same Version number after modification, the Modified field is +used as a tie breaker. + +The Size field is the size of the file, in bytes. + +The Blocks list contains the size and hash for each block in the file. +Each block represents a 128 KiB slice of the file, except for the last +block which may represent a smaller amount of data. + +#### XDR + + struct IndexMessage { + string Repository<>; + FileInfo Files<>; + } + + struct FileInfo { + string Name<>; + unsigned int Flags; + hyper Modified; + unsigned int Version; + BlockInfo Blocks<>; + } + + struct BlockInfo { + unsigned int Size; + opaque Hash<>; + } + ### Request (Type = 2) The Request message expresses the desire to receive a data block corresponding to a part of a certain file in the peer's repository. -The requested block must correspond exactly to one block seen in the -peer's Index message. The hash field must be set to the expected value by -the sender. The receiver may validate that this is actually the case -before transmitting data. Each Request message must be met with a Response +#### Graphical Representation + + RequestMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of Repository | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Repository (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of Name | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Name (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | + + Offset (64 bits) + + | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Size | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +#### Fields + +The Repository and Name fields are as documented for the Index message. +The Offset and Size fields specify the region of the file to be +transferred. This should equate to exactly one block as seen in an Index message. +#### XDR + struct RequestMessage { string Repository<>; string Name<>; unsigned hyper Offset; - unsigned int Length; - opaque Hash<>; + unsigned int Size; } -The hash algorithm is implied by the hash length. Currently, the hash -must be 32 bytes long and computed by SHA256. - -The Message ID in the header must set to a unique value to be able to -correlate the request with the response message. - ### Response (Type = 3) -The Response message is sent in response to a Request message. In case the -requested data was not available (an outdated block was requested, or -the file has been deleted), the Data field is empty. +The Response message is sent in response to a Request message. + +#### Graphical Representation + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of Data | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Data (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +#### Fields + +The Data field contains either a full 128 KiB block, a shorter block in +the case of the last block in a file, or is empty (zero length) if the +requested block is not available. + +#### XDR struct ResponseMessage { opaque Data<> } -The Message ID in the header is used to correlate requests and -responses. - ### Ping (Type = 4) The Ping message is used to determine that a connection is alive, and to keep connections alive through state tracking network elements such as firewalls and NAT gateways. The Ping message has no contents. - struct PingMessage { - } - ### Pong (Type = 5) The Pong message is sent in response to a Ping. The Pong message has no contents, but copies the Message ID from the Ping. - struct PongMessage { - } - -### IndexUpdate (Type = 6) +### Index Update (Type = 6) This message has exactly the same structure as the Index message. However instead of replacing the contents of the repository in the @@ -206,26 +329,59 @@ configuration, version, etc. It is sent at connection initiation and, optionally, when any of the sent parameters have changed. The message is in the form of a list of (key, value) pairs, both of string type. +Key ID:s apart from the well known ones are implementation specific. An +implementation is expected to ignore unknown keys. An implementation may +impose limits on key and value size. + +Well known keys: + + - "clientId" -- The name of the implementation. Example: "syncthing". + + - "clientVersion" -- The version of the client. Example: "v1.0.33-47". The + Following the SemVer 2.0 specification for version strings is + encouraged but not enforced. + +#### Graphical Representation + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Options | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Zero or more KeyValue Structures \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + KeyValue Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of Key | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Key (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of Value | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Value (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +#### XDR + struct OptionsMessage { KeyValue Options<>; } struct KeyValue { - string Key; - string Value; + string Key<>; + string Value<>; } -Key ID:s apart from the well known ones are implementation -specific. An implementation is expected to ignore unknown keys. An -implementation may impose limits on key and value size. - -Well known keys: - - - "clientId" -- The name of the implementation. Example: "syncthing". - - "clientVersion" -- The version of the client. Example: "v1.0.33-47". The - Following the SemVer 2.0 specification for version strings is - encouraged but not enforced. - Example Exchange ---------------- @@ -239,7 +395,7 @@ Example Exchange 7. <-Response 8. <-Response 9. <-Response - 10. Index-> + 10. Index Update-> ... 11. Ping-> 12. <-Pong @@ -250,7 +406,7 @@ of the data in the cluster. In this example, peer A has four missing or outdated blocks. At 2 through 5 peer A sends requests for these blocks. The requests are received by peer B, who retrieves the data from the repository and transmits Response records (6 through 9). Node A updates -their repository contents and transmits an updated Index message (10). +their repository contents and transmits an Index Update message (10). Both peers enter idle state after 10. At some later time 11, peer A determines that it has not seen data from B for some time and sends a Ping request. A response is sent at 12. diff --git a/protocol/header.go b/protocol/header.go new file mode 100644 index 000000000..0a454151d --- /dev/null +++ b/protocol/header.go @@ -0,0 +1,34 @@ +package protocol + +import "github.com/calmh/syncthing/xdr" + +type header struct { + version int + msgID int + msgType int +} + +func (h header) encodeXDR(xw *xdr.Writer) (int, error) { + u := encodeHeader(h) + return xw.WriteUint32(u) +} + +func (h *header) decodeXDR(xr *xdr.Reader) error { + u := xr.ReadUint32() + *h = decodeHeader(u) + return xr.Error() +} + +func encodeHeader(h header) uint32 { + return uint32(h.version&0xf)<<28 + + uint32(h.msgID&0xfff)<<16 + + uint32(h.msgType&0xff)<<8 +} + +func decodeHeader(u uint32) header { + return header{ + version: int(u>>28) & 0xf, + msgID: int(u>>16) & 0xfff, + msgType: int(u>>8) & 0xff, + } +} diff --git a/protocol/message_types.go b/protocol/message_types.go new file mode 100644 index 000000000..56fc365de --- /dev/null +++ b/protocol/message_types.go @@ -0,0 +1,35 @@ +package protocol + +type IndexMessage struct { + Repository string // max:64 + Files []FileInfo // max:100000 +} + +type FileInfo struct { + Name string // max:1024 + Flags uint32 + Modified int64 + Version uint32 + Blocks []BlockInfo // max:100000 +} + +type BlockInfo struct { + Size uint32 + Hash []byte // max:64 +} + +type RequestMessage struct { + Repository string // max:64 + Name string // max:1024 + Offset uint64 + Size uint32 +} + +type OptionsMessage struct { + Options []Option // max:64 +} + +type Option struct { + Key string // max:64 + Value string // max:1024 +} diff --git a/protocol/message_xdr.go b/protocol/message_xdr.go new file mode 100644 index 000000000..4174d03be --- /dev/null +++ b/protocol/message_xdr.go @@ -0,0 +1,286 @@ +package protocol + +import ( + "bytes" + "io" + + "github.com/calmh/syncthing/xdr" +) + +func (o IndexMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o IndexMessage) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o IndexMessage) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.Repository) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Repository) + if len(o.Files) > 100000 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteUint32(uint32(len(o.Files))) + for i := range o.Files { + o.Files[i].encodeXDR(xw) + } + return xw.Tot(), xw.Error() +} + +func (o *IndexMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *IndexMessage) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *IndexMessage) decodeXDR(xr *xdr.Reader) error { + o.Repository = xr.ReadStringMax(64) + _FilesSize := int(xr.ReadUint32()) + if _FilesSize > 100000 { + return xdr.ErrElementSizeExceeded + } + o.Files = make([]FileInfo, _FilesSize) + for i := range o.Files { + (&o.Files[i]).decodeXDR(xr) + } + return xr.Error() +} + +func (o FileInfo) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o FileInfo) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o FileInfo) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.Name) > 1024 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Name) + xw.WriteUint32(o.Flags) + xw.WriteUint64(uint64(o.Modified)) + xw.WriteUint32(o.Version) + if len(o.Blocks) > 100000 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteUint32(uint32(len(o.Blocks))) + for i := range o.Blocks { + o.Blocks[i].encodeXDR(xw) + } + return xw.Tot(), xw.Error() +} + +func (o *FileInfo) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *FileInfo) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *FileInfo) decodeXDR(xr *xdr.Reader) error { + o.Name = xr.ReadStringMax(1024) + o.Flags = xr.ReadUint32() + o.Modified = int64(xr.ReadUint64()) + o.Version = xr.ReadUint32() + _BlocksSize := int(xr.ReadUint32()) + if _BlocksSize > 100000 { + return xdr.ErrElementSizeExceeded + } + o.Blocks = make([]BlockInfo, _BlocksSize) + for i := range o.Blocks { + (&o.Blocks[i]).decodeXDR(xr) + } + return xr.Error() +} + +func (o BlockInfo) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o BlockInfo) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o BlockInfo) encodeXDR(xw *xdr.Writer) (int, error) { + xw.WriteUint32(o.Size) + if len(o.Hash) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteBytes(o.Hash) + return xw.Tot(), xw.Error() +} + +func (o *BlockInfo) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *BlockInfo) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *BlockInfo) decodeXDR(xr *xdr.Reader) error { + o.Size = xr.ReadUint32() + o.Hash = xr.ReadBytesMax(64) + return xr.Error() +} + +func (o RequestMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o RequestMessage) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o RequestMessage) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.Repository) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Repository) + if len(o.Name) > 1024 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Name) + xw.WriteUint64(o.Offset) + xw.WriteUint32(o.Size) + return xw.Tot(), xw.Error() +} + +func (o *RequestMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *RequestMessage) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *RequestMessage) decodeXDR(xr *xdr.Reader) error { + o.Repository = xr.ReadStringMax(64) + o.Name = xr.ReadStringMax(1024) + o.Offset = xr.ReadUint64() + o.Size = xr.ReadUint32() + return xr.Error() +} + +func (o OptionsMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o OptionsMessage) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o OptionsMessage) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.Options) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteUint32(uint32(len(o.Options))) + for i := range o.Options { + o.Options[i].encodeXDR(xw) + } + return xw.Tot(), xw.Error() +} + +func (o *OptionsMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *OptionsMessage) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *OptionsMessage) decodeXDR(xr *xdr.Reader) error { + _OptionsSize := int(xr.ReadUint32()) + if _OptionsSize > 64 { + return xdr.ErrElementSizeExceeded + } + o.Options = make([]Option, _OptionsSize) + for i := range o.Options { + (&o.Options[i]).decodeXDR(xr) + } + return xr.Error() +} + +func (o Option) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o Option) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o Option) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.Key) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Key) + if len(o.Value) > 1024 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Value) + return xw.Tot(), xw.Error() +} + +func (o *Option) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *Option) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *Option) decodeXDR(xr *xdr.Reader) error { + o.Key = xr.ReadStringMax(64) + o.Value = xr.ReadStringMax(1024) + return xr.Error() +} diff --git a/protocol/messages.go b/protocol/messages.go deleted file mode 100644 index 39067b358..000000000 --- a/protocol/messages.go +++ /dev/null @@ -1,186 +0,0 @@ -package protocol - -import ( - "errors" - "io" - - "github.com/calmh/syncthing/buffers" - "github.com/calmh/syncthing/xdr" -) - -const ( - maxNumFiles = 100000 // More than 100000 files is a protocol error - maxNumBlocks = 100000 // 100000 * 128KB = 12.5 GB max acceptable file size -) - -var ( - ErrMaxFilesExceeded = errors.New("Protocol error: number of files per index exceeds limit") - ErrMaxBlocksExceeded = errors.New("Protocol error: number of blocks per file exceeds limit") -) - -type request struct { - repo string - name string - offset int64 - size uint32 - hash []byte -} - -type header struct { - version int - msgID int - msgType int -} - -func encodeHeader(h header) uint32 { - return uint32(h.version&0xf)<<28 + - uint32(h.msgID&0xfff)<<16 + - uint32(h.msgType&0xff)<<8 -} - -func decodeHeader(u uint32) header { - return header{ - version: int(u>>28) & 0xf, - msgID: int(u>>16) & 0xfff, - msgType: int(u>>8) & 0xff, - } -} - -func WriteIndex(w io.Writer, repo string, idx []FileInfo) (int, error) { - mw := newMarshalWriter(w) - mw.writeIndex(repo, idx) - return int(mw.Tot()), mw.Err() -} - -type marshalWriter struct { - *xdr.Writer -} - -func newMarshalWriter(w io.Writer) marshalWriter { - return marshalWriter{xdr.NewWriter(w)} -} - -func (w *marshalWriter) writeHeader(h header) { - w.WriteUint32(encodeHeader(h)) -} - -func (w *marshalWriter) writeIndex(repo string, idx []FileInfo) { - w.WriteString(repo) - w.WriteUint32(uint32(len(idx))) - for _, f := range idx { - w.WriteString(f.Name) - w.WriteUint32(f.Flags) - w.WriteUint64(uint64(f.Modified)) - w.WriteUint32(f.Version) - w.WriteUint32(uint32(len(f.Blocks))) - for _, b := range f.Blocks { - w.WriteUint32(b.Size) - w.WriteBytes(b.Hash) - } - } -} - -func (w *marshalWriter) writeRequest(r request) { - w.WriteString(r.repo) - w.WriteString(r.name) - w.WriteUint64(uint64(r.offset)) - w.WriteUint32(r.size) - w.WriteBytes(r.hash) -} - -func (w *marshalWriter) writeResponse(data []byte) { - w.WriteBytes(data) -} - -func (w *marshalWriter) writeOptions(opts map[string]string) { - w.WriteUint32(uint32(len(opts))) - for k, v := range opts { - w.WriteString(k) - w.WriteString(v) - } -} - -func ReadIndex(r io.Reader) (string, []FileInfo, error) { - mr := newMarshalReader(r) - repo, idx := mr.readIndex() - return repo, idx, mr.Err() -} - -type marshalReader struct { - *xdr.Reader - err error -} - -func newMarshalReader(r io.Reader) marshalReader { - return marshalReader{ - Reader: xdr.NewReader(r), - err: nil, - } -} - -func (r marshalReader) Err() error { - if r.err != nil { - return r.err - } - return r.Reader.Err() -} - -func (r marshalReader) readHeader() header { - return decodeHeader(r.ReadUint32()) -} - -func (r marshalReader) readIndex() (string, []FileInfo) { - var files []FileInfo - repo := r.ReadString() - nfiles := r.ReadUint32() - if nfiles > maxNumFiles { - r.err = ErrMaxFilesExceeded - return "", nil - } - if nfiles > 0 { - files = make([]FileInfo, nfiles) - for i := range files { - files[i].Name = r.ReadString() - files[i].Flags = r.ReadUint32() - files[i].Modified = int64(r.ReadUint64()) - files[i].Version = r.ReadUint32() - nblocks := r.ReadUint32() - if nblocks > maxNumBlocks { - r.err = ErrMaxBlocksExceeded - return "", nil - } - blocks := make([]BlockInfo, nblocks) - for j := range blocks { - blocks[j].Size = r.ReadUint32() - blocks[j].Hash = r.ReadBytes(buffers.Get(32)) - } - files[i].Blocks = blocks - } - } - return repo, files -} - -func (r marshalReader) readRequest() request { - var req request - req.repo = r.ReadString() - req.name = r.ReadString() - req.offset = int64(r.ReadUint64()) - req.size = r.ReadUint32() - req.hash = r.ReadBytes(buffers.Get(32)) - return req -} - -func (r marshalReader) readResponse() []byte { - return r.ReadBytes(buffers.Get(128 * 1024)) -} - -func (r marshalReader) readOptions() map[string]string { - n := r.ReadUint32() - opts := make(map[string]string, n) - for i := 0; i < int(n); i++ { - k := r.ReadString() - v := r.ReadString() - opts[k] = v - } - return opts -} diff --git a/protocol/messages_test.go b/protocol/messages_test.go deleted file mode 100644 index 968d1b292..000000000 --- a/protocol/messages_test.go +++ /dev/null @@ -1,143 +0,0 @@ -package protocol - -import ( - "bytes" - "io/ioutil" - "reflect" - "testing" - "testing/quick" -) - -func TestIndex(t *testing.T) { - idx := []FileInfo{ - { - "Foo", - FlagInvalid & FlagDeleted & 0755, - 1234567890, - 142, - []BlockInfo{ - {12345678, []byte("hash hash hash")}, - {23456781, []byte("ash hash hashh")}, - {34567812, []byte("sh hash hashha")}, - }, - }, { - "Quux/Quux", - 0644, - 2345678901, - 232323232, - []BlockInfo{ - {45678123, []byte("4321 hash hash hash")}, - {56781234, []byte("3214 ash hash hashh")}, - {67812345, []byte("2143 sh hash hashha")}, - }, - }, - } - - var buf = new(bytes.Buffer) - var wr = newMarshalWriter(buf) - wr.writeIndex("default", idx) - - var rd = newMarshalReader(buf) - var repo, idx2 = rd.readIndex() - - if repo != "default" { - t.Error("Incorrect repo", repo) - } - - if !reflect.DeepEqual(idx, idx2) { - t.Errorf("Index marshal error:\n%#v\n%#v\n", idx, idx2) - } -} - -func TestRequest(t *testing.T) { - f := func(repo, name string, offset int64, size uint32, hash []byte) bool { - var buf = new(bytes.Buffer) - var req = request{repo, name, offset, size, hash} - var wr = newMarshalWriter(buf) - wr.writeRequest(req) - var rd = newMarshalReader(buf) - var req2 = rd.readRequest() - return req.name == req2.name && - req.offset == req2.offset && - req.size == req2.size && - bytes.Compare(req.hash, req2.hash) == 0 - } - if err := quick.Check(f, nil); err != nil { - t.Error(err) - } -} - -func TestResponse(t *testing.T) { - f := func(data []byte) bool { - var buf = new(bytes.Buffer) - var wr = newMarshalWriter(buf) - wr.writeResponse(data) - var rd = newMarshalReader(buf) - var read = rd.readResponse() - return bytes.Compare(read, data) == 0 - } - if err := quick.Check(f, nil); err != nil { - t.Error(err) - } -} - -func BenchmarkWriteIndex(b *testing.B) { - idx := []FileInfo{ - { - "Foo", - 0777, - 1234567890, - 424242, - []BlockInfo{ - {12345678, []byte("hash hash hash")}, - {23456781, []byte("ash hash hashh")}, - {34567812, []byte("sh hash hashha")}, - }, - }, { - "Quux/Quux", - 0644, - 2345678901, - 323232, - []BlockInfo{ - {45678123, []byte("4321 hash hash hash")}, - {56781234, []byte("3214 ash hash hashh")}, - {67812345, []byte("2143 sh hash hashha")}, - }, - }, - } - - var wr = newMarshalWriter(ioutil.Discard) - - for i := 0; i < b.N; i++ { - wr.writeIndex("default", idx) - } -} - -func BenchmarkWriteRequest(b *testing.B) { - var req = request{"default", "blah blah", 1231323, 13123123, []byte("hash hash hash")} - var wr = newMarshalWriter(ioutil.Discard) - - for i := 0; i < b.N; i++ { - wr.writeRequest(req) - } -} - -func TestOptions(t *testing.T) { - opts := map[string]string{ - "foo": "bar", - "someKey": "otherValue", - "hello": "", - "": "42", - } - - var buf = new(bytes.Buffer) - var wr = newMarshalWriter(buf) - wr.writeOptions(opts) - - var rd = newMarshalReader(buf) - var ropts = rd.readOptions() - - if !reflect.DeepEqual(opts, ropts) { - t.Error("Incorrect options marshal/demarshal") - } -} diff --git a/protocol/protocol.go b/protocol/protocol.go index 206abdcc7..ffae03897 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -13,6 +13,8 @@ import ( "github.com/calmh/syncthing/xdr" ) +const BlockSize = 128 * 1024 + const ( messageTypeIndex = 1 messageTypeRequest = 2 @@ -32,26 +34,13 @@ var ( ErrClusterHash = fmt.Errorf("Configuration error: mismatched cluster hash") ) -type FileInfo struct { - Name string - Flags uint32 - Modified int64 - Version uint32 - Blocks []BlockInfo -} - -type BlockInfo struct { - Size uint32 - Hash []byte -} - type Model interface { // An index was received from the peer node Index(nodeID string, files []FileInfo) // An index update was received from the peer node IndexUpdate(nodeID string, files []FileInfo) // A request was made by the peer node - Request(nodeID, repo string, name string, offset int64, size uint32, hash []byte) ([]byte, error) + Request(nodeID, repo string, name string, offset int64, size int) ([]byte, error) // The peer node closed the connection Close(nodeID string, err error) } @@ -62,9 +51,9 @@ type Connection struct { id string receiver Model reader io.Reader - mreader marshalReader + xr *xdr.Reader writer io.Writer - mwriter marshalWriter + xw *xdr.Writer closed bool awaiting map[int]chan asyncResult nextId int @@ -102,9 +91,9 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M id: nodeID, receiver: receiver, reader: flrd, - mreader: marshalReader{Reader: xdr.NewReader(flrd)}, + xr: xdr.NewReader(flrd), writer: flwr, - mwriter: marshalWriter{Writer: xdr.NewWriter(flwr)}, + xw: xdr.NewWriter(flwr), awaiting: make(map[int]chan asyncResult), indexSent: make(map[string]map[string][2]int64), } @@ -116,9 +105,16 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M c.myOptions = options go func() { c.Lock() - c.mwriter.writeHeader(header{0, c.nextId, messageTypeOptions}) - c.mwriter.writeOptions(options) - err := c.flush() + header{0, c.nextId, messageTypeOptions}.encodeXDR(c.xw) + var om OptionsMessage + for k, v := range options { + om.Options = append(om.Options, Option{k, v}) + } + om.encodeXDR(c.xw) + err := c.xw.Error() + if err == nil { + err = c.flush() + } if err != nil { log.Println("Warning: Write error during initial handshake:", err) } @@ -159,9 +155,11 @@ func (c *Connection) Index(repo string, idx []FileInfo) { idx = diff } - c.mwriter.writeHeader(header{0, c.nextId, msgType}) - c.mwriter.writeIndex(repo, idx) - err := c.flush() + header{0, c.nextId, msgType}.encodeXDR(c.xw) + _, err := IndexMessage{repo, idx}.encodeXDR(c.xw) + if err == nil { + err = c.flush() + } c.nextId = (c.nextId + 1) & 0xfff c.hasSentIndex = true c.Unlock() @@ -169,14 +167,11 @@ func (c *Connection) Index(repo string, idx []FileInfo) { if err != nil { c.close(err) return - } else if c.mwriter.Err() != nil { - c.close(c.mwriter.Err()) - return } } // Request returns the bytes for the specified block after fetching them from the connected peer. -func (c *Connection) Request(repo string, name string, offset int64, size uint32, hash []byte) ([]byte, error) { +func (c *Connection) Request(repo string, name string, offset int64, size int) ([]byte, error) { c.Lock() if c.closed { c.Unlock() @@ -184,14 +179,11 @@ func (c *Connection) Request(repo string, name string, offset int64, size uint32 } rc := make(chan asyncResult) c.awaiting[c.nextId] = rc - c.mwriter.writeHeader(header{0, c.nextId, messageTypeRequest}) - c.mwriter.writeRequest(request{repo, name, offset, size, hash}) - if c.mwriter.Err() != nil { - c.Unlock() - c.close(c.mwriter.Err()) - return nil, c.mwriter.Err() + header{0, c.nextId, messageTypeRequest}.encodeXDR(c.xw) + _, err := RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw) + if err == nil { + err = c.flush() } - err := c.flush() if err != nil { c.Unlock() c.close(err) @@ -215,15 +207,15 @@ func (c *Connection) ping() bool { } rc := make(chan asyncResult, 1) c.awaiting[c.nextId] = rc - c.mwriter.writeHeader(header{0, c.nextId, messageTypePing}) + header{0, c.nextId, messageTypePing}.encodeXDR(c.xw) err := c.flush() if err != nil { c.Unlock() c.close(err) return false - } else if c.mwriter.Err() != nil { + } else if c.xw.Error() != nil { c.Unlock() - c.close(c.mwriter.Err()) + c.close(c.xw.Error()) return false } c.nextId = (c.nextId + 1) & 0xfff @@ -269,9 +261,10 @@ func (c *Connection) isClosed() bool { func (c *Connection) readerLoop() { loop: for { - hdr := c.mreader.readHeader() - if c.mreader.Err() != nil { - c.close(c.mreader.Err()) + var hdr header + hdr.decodeXDR(c.xr) + if c.xr.Error() != nil { + c.close(c.xr.Error()) break loop } if hdr.version != 0 { @@ -281,64 +274,65 @@ loop: switch hdr.msgType { case messageTypeIndex: - repo, files := c.mreader.readIndex() - _ = repo - if c.mreader.Err() != nil { - c.close(c.mreader.Err()) + var im IndexMessage + im.decodeXDR(c.xr) + if c.xr.Error() != nil { + c.close(c.xr.Error()) break loop } else { - c.receiver.Index(c.id, files) + c.receiver.Index(c.id, im.Files) } c.Lock() c.hasRecvdIndex = true c.Unlock() case messageTypeIndexUpdate: - repo, files := c.mreader.readIndex() - _ = repo - if c.mreader.Err() != nil { - c.close(c.mreader.Err()) + var im IndexMessage + im.decodeXDR(c.xr) + if c.xr.Error() != nil { + c.close(c.xr.Error()) break loop } else { - c.receiver.IndexUpdate(c.id, files) + c.receiver.IndexUpdate(c.id, im.Files) } case messageTypeRequest: - req := c.mreader.readRequest() - if c.mreader.Err() != nil { - c.close(c.mreader.Err()) + var req RequestMessage + req.decodeXDR(c.xr) + if c.xr.Error() != nil { + c.close(c.xr.Error()) break loop } go c.processRequest(hdr.msgID, req) case messageTypeResponse: - data := c.mreader.readResponse() + data := c.xr.ReadBytes() - if c.mreader.Err() != nil { - c.close(c.mreader.Err()) + if c.xr.Error() != nil { + c.close(c.xr.Error()) break loop - } else { - c.Lock() - rc, ok := c.awaiting[hdr.msgID] - delete(c.awaiting, hdr.msgID) - c.Unlock() + } - if ok { - rc <- asyncResult{data, c.mreader.Err()} - close(rc) - } + c.Lock() + rc, ok := c.awaiting[hdr.msgID] + delete(c.awaiting, hdr.msgID) + c.Unlock() + + if ok { + rc <- asyncResult{data, c.xr.Error()} + close(rc) } case messageTypePing: c.Lock() - c.mwriter.WriteUint32(encodeHeader(header{0, hdr.msgID, messageTypePong})) + header{0, hdr.msgID, messageTypePong}.encodeXDR(c.xw) err := c.flush() c.Unlock() if err != nil { c.close(err) break loop - } else if c.mwriter.Err() != nil { - c.close(c.mwriter.Err()) + } else if c.xw.Error() != nil { + c.close(c.xw.Error()) break loop } @@ -357,8 +351,18 @@ loop: } case messageTypeOptions: + var om OptionsMessage + om.decodeXDR(c.xr) + if c.xr.Error() != nil { + c.close(c.xr.Error()) + break loop + } + c.optionsLock.Lock() - c.peerOptions = c.mreader.readOptions() + c.peerOptions = make(map[string]string, len(om.Options)) + for _, opt := range om.Options { + c.peerOptions[opt.Key] = opt.Value + } c.optionsLock.Unlock() if mh, rh := c.myOptions["clusterHash"], c.peerOptions["clusterHash"]; len(mh) > 0 && len(rh) > 0 && mh != rh { @@ -373,13 +377,12 @@ loop: } } -func (c *Connection) processRequest(msgID int, req request) { - data, _ := c.receiver.Request(c.id, req.repo, req.name, req.offset, req.size, req.hash) +func (c *Connection) processRequest(msgID int, req RequestMessage) { + data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size)) c.Lock() - c.mwriter.WriteUint32(encodeHeader(header{0, msgID, messageTypeResponse})) - c.mwriter.writeResponse(data) - err := c.mwriter.Err() + header{0, msgID, messageTypeResponse}.encodeXDR(c.xw) + _, err := c.xw.WriteBytes(data) if err == nil { err = c.flush() } @@ -428,8 +431,8 @@ func (c *Connection) Statistics() Statistics { stats := Statistics{ At: time.Now(), - InBytesTotal: int(c.mreader.Tot()), - OutBytesTotal: int(c.mwriter.Tot()), + InBytesTotal: int(c.xr.Tot()), + OutBytesTotal: int(c.xw.Tot()), } return stats diff --git a/protocol/protocol_test.go b/protocol/protocol_test.go index 0f49c01dc..595083474 100644 --- a/protocol/protocol_test.go +++ b/protocol/protocol_test.go @@ -80,7 +80,7 @@ func TestRequestResponseErr(t *testing.T) { NewConnection("c0", ar, ebw, m0, nil) c1 := NewConnection("c1", br, eaw, m1, nil) - d, err := c1.Request("default", "tn", 1234, 3456, []byte("hashbytes")) + d, err := c1.Request("default", "tn", 1234) if err == e || err == ErrClosed { t.Logf("Error at %d+%d bytes", i, j) if !m1.closed { @@ -104,15 +104,12 @@ func TestRequestResponseErr(t *testing.T) { if m0.name != "tn" { t.Error("Incorrect name %q", m0.name) } - if m0.offset != 1234 { + if m0.offset != 1234*BlockSize { t.Error("Incorrect offset %d", m0.offset) } - if m0.size != 3456 { + if m0.size != BlockSize { t.Error("Incorrect size %d", m0.size) } - if string(m0.hash) != "hashbytes" { - t.Error("Incorrect hash %q", m0.hash) - } t.Logf("Pass at %d+%d bytes", i, j) pass = true } @@ -132,11 +129,11 @@ func TestVersionErr(t *testing.T) { c0 := NewConnection("c0", ar, bw, m0, nil) NewConnection("c1", br, aw, m1, nil) - c0.mwriter.writeHeader(header{ + c0.xw.WriteUint32(encodeHeader(header{ version: 2, msgID: 0, msgType: 0, - }) + })) c0.flush() if !m1.closed { @@ -154,11 +151,11 @@ func TestTypeErr(t *testing.T) { c0 := NewConnection("c0", ar, bw, m0, nil) NewConnection("c1", br, aw, m1, nil) - c0.mwriter.writeHeader(header{ + c0.xw.WriteUint32(encodeHeader(header{ version: 0, msgID: 0, msgType: 42, - }) + })) c0.flush() if !m1.closed { @@ -193,7 +190,7 @@ func TestClose(t *testing.T) { c0.Index("default", nil) c0.Index("default", nil) - _, err := c0.Request("default", "foo", 0, 0, nil) + _, err := c0.Request("default", "foo", 0) if err == nil { t.Error("Request should return an error") } diff --git a/util.go b/util.go index 5da698e66..4e14adc6b 100644 --- a/util.go +++ b/util.go @@ -2,7 +2,7 @@ package main import "fmt" -func MetricPrefix(n int) string { +func MetricPrefix(n int64) string { if n > 1e9 { return fmt.Sprintf("%.02f G", float64(n)/1e9) } @@ -15,7 +15,7 @@ func MetricPrefix(n int) string { return fmt.Sprintf("%d ", n) } -func BinaryPrefix(n int) string { +func BinaryPrefix(n int64) string { if n > 1<<30 { return fmt.Sprintf("%.02f Gi", float64(n)/(1<<30)) } diff --git a/walk.go b/walk.go index 23cd985c0..719e16174 100644 --- a/walk.go +++ b/walk.go @@ -21,16 +21,10 @@ type File struct { Flags uint32 Modified int64 Version uint32 + Size int64 Blocks []Block } -func (f File) Size() (bytes int) { - for _, b := range f.Blocks { - bytes += int(b.Size) - } - return -} - func (f File) String() string { return fmt.Sprintf("File{Name:%q, Flags:0x%x, Modified:%d, Version:%d, NumBlocks:%d}", f.Name, f.Flags, f.Modified, f.Version, len(f.Blocks)) @@ -165,6 +159,7 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath. } f := File{ Name: rn, + Size: info.Size(), Flags: uint32(info.Mode()), Modified: modified, Blocks: blocks, diff --git a/xdr/cmd/coder/main.go b/xdr/cmd/coder/main.go new file mode 100644 index 000000000..f7ec34b80 --- /dev/null +++ b/xdr/cmd/coder/main.go @@ -0,0 +1,393 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "go/ast" + "go/format" + "go/parser" + "go/token" + "os" + "regexp" + "strconv" + "strings" + "text/template" +) + +var output string + +type field struct { + Name string + IsBasic bool + IsSlice bool + IsMap bool + FieldType string + KeyType string + Encoder string + Convert string + Max int +} + +var headerTpl = template.Must(template.New("header").Parse(`package {{.Package}} + +import ( + "bytes" + "io" + + "github.com/calmh/syncthing/xdr" +) +`)) + +var encodeTpl = template.Must(template.New("encoder").Parse(` +func (o {{.TypeName}}) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +}//+n + +func (o {{.TypeName}}) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +}//+n + +func (o {{.TypeName}}) encodeXDR(xw *xdr.Writer) (int, error) { + {{range $field := .Fields}} + {{if not $field.IsSlice}} + {{if ne $field.Convert ""}} + xw.Write{{$field.Encoder}}({{$field.Convert}}(o.{{$field.Name}})) + {{else if $field.IsBasic}} + {{if ge $field.Max 1}} + if len(o.{{$field.Name}}) > {{$field.Max}} { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + {{end}} + xw.Write{{$field.Encoder}}(o.{{$field.Name}}) + {{else}} + o.{{$field.Name}}.encodeXDR(xw) + {{end}} + {{else}} + {{if ge $field.Max 1}} + if len(o.{{$field.Name}}) > {{$field.Max}} { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + {{end}} + xw.WriteUint32(uint32(len(o.{{$field.Name}}))) + for i := range o.{{$field.Name}} { + {{if ne $field.Convert ""}} + xw.Write{{$field.Encoder}}({{$field.Convert}}(o.{{$field.Name}}[i])) + {{else if $field.IsBasic}} + xw.Write{{$field.Encoder}}(o.{{$field.Name}}[i]) + {{else}} + o.{{$field.Name}}[i].encodeXDR(xw) + {{end}} + } + {{end}} + {{end}} + return xw.Tot(), xw.Error() +}//+n + +func (o *{{.TypeName}}) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +}//+n + +func (o *{{.TypeName}}) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +}//+n + +func (o *{{.TypeName}}) decodeXDR(xr *xdr.Reader) error { + {{range $field := .Fields}} + {{if not $field.IsSlice}} + {{if ne $field.Convert ""}} + o.{{$field.Name}} = {{$field.FieldType}}(xr.Read{{$field.Encoder}}()) + {{else if $field.IsBasic}} + {{if ge $field.Max 1}} + o.{{$field.Name}} = xr.Read{{$field.Encoder}}Max({{$field.Max}}) + {{else}} + o.{{$field.Name}} = xr.Read{{$field.Encoder}}() + {{end}} + {{else}} + (&o.{{$field.Name}}).decodeXDR(xr) + {{end}} + {{else}} + _{{$field.Name}}Size := int(xr.ReadUint32()) + {{if ge $field.Max 1}} + if _{{$field.Name}}Size > {{$field.Max}} { + return xdr.ErrElementSizeExceeded + } + {{end}} + o.{{$field.Name}} = make([]{{$field.FieldType}}, _{{$field.Name}}Size) + for i := range o.{{$field.Name}} { + {{if ne $field.Convert ""}} + o.{{$field.Name}}[i] = {{$field.FieldType}}(xr.Read{{$field.Encoder}}()) + {{else if $field.IsBasic}} + o.{{$field.Name}}[i] = xr.Read{{$field.Encoder}}() + {{else}} + (&o.{{$field.Name}}[i]).decodeXDR(xr) + {{end}} + } + {{end}} + {{end}} + return xr.Error() +}`)) + +var maxRe = regexp.MustCompile(`\Wmax:(\d+)`) + +type typeSet struct { + Type string + Encoder string +} + +var xdrEncoders = map[string]typeSet{ + "int16": typeSet{"uint16", "Uint16"}, + "uint16": typeSet{"", "Uint16"}, + "int32": typeSet{"uint32", "Uint32"}, + "uint32": typeSet{"", "Uint32"}, + "int64": typeSet{"uint64", "Uint64"}, + "uint64": typeSet{"", "Uint64"}, + "int": typeSet{"uint64", "Uint64"}, + "string": typeSet{"", "String"}, + "[]byte": typeSet{"", "Bytes"}, + "bool": typeSet{"", "Bool"}, +} + +func handleStruct(name string, t *ast.StructType) { + var fs []field + for _, sf := range t.Fields.List { + if len(sf.Names) == 0 { + // We don't handle anonymous fields + continue + } + + fn := sf.Names[0].Name + var max = 0 + if sf.Comment != nil { + c := sf.Comment.List[0].Text + if m := maxRe.FindStringSubmatch(c); m != nil { + max, _ = strconv.Atoi(m[1]) + } + } + + var f field + switch ft := sf.Type.(type) { + case *ast.Ident: + tn := ft.Name + if enc, ok := xdrEncoders[tn]; ok { + f = field{ + Name: fn, + IsBasic: true, + FieldType: tn, + Encoder: enc.Encoder, + Convert: enc.Type, + Max: max, + } + } else { + f = field{ + Name: fn, + IsBasic: false, + FieldType: tn, + Max: max, + } + } + + case *ast.ArrayType: + if ft.Len != nil { + // We don't handle arrays + continue + } + + tn := ft.Elt.(*ast.Ident).Name + if enc, ok := xdrEncoders["[]"+tn]; ok { + f = field{ + Name: fn, + IsBasic: true, + FieldType: tn, + Encoder: enc.Encoder, + Convert: enc.Type, + Max: max, + } + } else if enc, ok := xdrEncoders[tn]; ok { + f = field{ + Name: fn, + IsBasic: true, + IsSlice: true, + FieldType: tn, + Encoder: enc.Encoder, + Convert: enc.Type, + Max: max, + } + } else { + f = field{ + Name: fn, + IsBasic: false, + IsSlice: true, + FieldType: tn, + Max: max, + } + } + } + + fs = append(fs, f) + } + + switch output { + case "code": + generateCode(name, fs) + case "diagram": + generateDiagram(name, fs) + case "xdr": + generateXdr(name, fs) + } +} + +func generateCode(name string, fs []field) { + var buf bytes.Buffer + err := encodeTpl.Execute(&buf, map[string]interface{}{"TypeName": name, "Fields": fs}) + if err != nil { + panic(err) + } + + bs := regexp.MustCompile(`(\s*\n)+`).ReplaceAll(buf.Bytes(), []byte("\n")) + bs = bytes.Replace(bs, []byte("//+n"), []byte("\n"), -1) + + bs, err = format.Source(bs) + if err != nil { + panic(err) + } + fmt.Println(string(bs)) +} + +func generateDiagram(sn string, fs []field) { + fmt.Println(sn + " Structure:") + fmt.Println() + fmt.Println(" 0 1 2 3") + fmt.Println(" 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1") + line := "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+" + fmt.Println(line) + + for _, f := range fs { + tn := f.FieldType + sl := f.IsSlice + + if sl { + fmt.Printf("| %s |\n", center("Number of "+f.Name, 61)) + fmt.Println(line) + } + switch tn { + case "uint16": + fmt.Printf("| %s | %s |\n", center(f.Name, 29), center("0x0000", 29)) + fmt.Println(line) + case "uint32": + fmt.Printf("| %s |\n", center(f.Name, 61)) + fmt.Println(line) + case "int64", "uint64": + fmt.Printf("| %-61s |\n", "") + fmt.Printf("+ %s +\n", center(f.Name+" (64 bits)", 61)) + fmt.Printf("| %-61s |\n", "") + fmt.Println(line) + case "string", "byte": // XXX We assume slice of byte! + fmt.Printf("| %s |\n", center("Length of "+f.Name, 61)) + fmt.Println(line) + fmt.Printf("/ %61s /\n", "") + fmt.Printf("\\ %s \\\n", center(f.Name+" (variable length)", 61)) + fmt.Printf("/ %61s /\n", "") + fmt.Println(line) + default: + if sl { + tn = "Zero or more " + tn + " Structures" + fmt.Printf("/ %s /\n", center("", 61)) + fmt.Printf("\\ %s \\\n", center(tn, 61)) + fmt.Printf("/ %s /\n", center("", 61)) + } else { + fmt.Printf("| %s |\n", center(tn, 61)) + } + fmt.Println(line) + } + } + fmt.Println() + fmt.Println() +} + +func generateXdr(sn string, fs []field) { + fmt.Printf("struct %s {\n", sn) + + for _, f := range fs { + tn := f.FieldType + fn := f.Name + suf := "" + if f.IsSlice { + suf = "<>" + } + + switch tn { + case "uint16": + fmt.Printf("\tunsigned short %s%s;\n", fn, suf) + case "uint32": + fmt.Printf("\tunsigned int %s%s;\n", fn, suf) + case "int64": + fmt.Printf("\thyper %s%s;\n", fn, suf) + case "uint64": + fmt.Printf("\tunsigned hyper %s%s;\n", fn, suf) + case "string": + fmt.Printf("\tstring %s<>;\n", fn) + case "byte": + fmt.Printf("\topaque %s<>;\n", fn) + default: + fmt.Printf("\t%s %s%s;\n", tn, fn, suf) + } + } + fmt.Println("}") + fmt.Println() +} + +func center(s string, w int) string { + w -= len(s) + l := w / 2 + r := l + if l+r < w { + r++ + } + return strings.Repeat(" ", l) + s + strings.Repeat(" ", r) +} + +func inspector(fset *token.FileSet) func(ast.Node) bool { + return func(n ast.Node) bool { + switch n := n.(type) { + case *ast.TypeSpec: + switch t := n.Type.(type) { + case *ast.StructType: + name := n.Name.Name + handleStruct(name, t) + } + return false + default: + return true + } + } +} + +func main() { + flag.StringVar(&output, "output", "code", "code,xdr,diagram") + flag.Parse() + fname := flag.Arg(0) + + // Create the AST by parsing src. + fset := token.NewFileSet() // positions are relative to fset + f, err := parser.ParseFile(fset, fname, nil, parser.ParseComments) + if err != nil { + panic(err) + } + + //ast.Print(fset, f) + + if output == "code" { + headerTpl.Execute(os.Stdout, map[string]string{"Package": f.Name.Name}) + } + + i := inspector(fset) + ast.Inspect(f, i) +} diff --git a/xdr/reader.go b/xdr/reader.go index b5c39a6b2..64be77f58 100644 --- a/xdr/reader.go +++ b/xdr/reader.go @@ -1,10 +1,15 @@ package xdr -import "io" +import ( + "errors" + "io" +) + +var ErrElementSizeExceeded = errors.New("Element size exceeded") type Reader struct { r io.Reader - tot uint64 + tot int err error b [8]byte } @@ -16,10 +21,26 @@ func NewReader(r io.Reader) *Reader { } func (r *Reader) ReadString() string { - return string(r.ReadBytes(nil)) + return string(r.ReadBytes()) } -func (r *Reader) ReadBytes(dst []byte) []byte { +func (r *Reader) ReadStringMax(max int) string { + return string(r.ReadBytesMax(max)) +} + +func (r *Reader) ReadBytes() []byte { + return r.ReadBytesInto(nil) +} + +func (r *Reader) ReadBytesMax(max int) []byte { + return r.ReadBytesMaxInto(max, nil) +} + +func (r *Reader) ReadBytesInto(dst []byte) []byte { + return r.ReadBytesMaxInto(0, dst) +} + +func (r *Reader) ReadBytesMaxInto(max int, dst []byte) []byte { if r.err != nil { return nil } @@ -27,22 +48,35 @@ func (r *Reader) ReadBytes(dst []byte) []byte { if r.err != nil { return nil } + if max > 0 && l > max { + r.err = ErrElementSizeExceeded + return nil + } if l+pad(l) > len(dst) { dst = make([]byte, l+pad(l)) } else { dst = dst[:l+pad(l)] } _, r.err = io.ReadFull(r.r, dst) - r.tot += uint64(l + pad(l)) + r.tot += l + pad(l) return dst[:l] } +func (r *Reader) ReadUint16() uint16 { + if r.err != nil { + return 0 + } + _, r.err = io.ReadFull(r.r, r.b[:4]) + r.tot += 4 + return uint16(r.b[1]) | uint16(r.b[0])<<8 +} + func (r *Reader) ReadUint32() uint32 { if r.err != nil { return 0 } _, r.err = io.ReadFull(r.r, r.b[:4]) - r.tot += 8 + r.tot += 4 return uint32(r.b[3]) | uint32(r.b[2])<<8 | uint32(r.b[1])<<16 | uint32(r.b[0])<<24 } @@ -56,10 +90,10 @@ func (r *Reader) ReadUint64() uint64 { uint64(r.b[3])<<32 | uint64(r.b[2])<<40 | uint64(r.b[1])<<48 | uint64(r.b[0])<<56 } -func (r *Reader) Tot() uint64 { +func (r *Reader) Tot() int { return r.tot } -func (r *Reader) Err() error { +func (r *Reader) Error() error { return r.err } diff --git a/xdr/writer.go b/xdr/writer.go index 30c7c5639..cad292a6b 100644 --- a/xdr/writer.go +++ b/xdr/writer.go @@ -14,7 +14,7 @@ var padBytes = []byte{0, 0, 0} type Writer struct { w io.Writer - tot uint64 + tot int err error b [8]byte } @@ -48,7 +48,22 @@ func (w *Writer) WriteBytes(bs []byte) (int, error) { l += n } - w.tot += uint64(l) + w.tot += l + return l, w.err +} + +func (w *Writer) WriteUint16(v uint16) (int, error) { + if w.err != nil { + return 0, w.err + } + w.b[0] = byte(v >> 8) + w.b[1] = byte(v) + w.b[2] = 0 + w.b[3] = 0 + + var l int + l, w.err = w.w.Write(w.b[:4]) + w.tot += l return l, w.err } @@ -63,7 +78,7 @@ func (w *Writer) WriteUint32(v uint32) (int, error) { var l int l, w.err = w.w.Write(w.b[:4]) - w.tot += uint64(l) + w.tot += l return l, w.err } @@ -82,14 +97,14 @@ func (w *Writer) WriteUint64(v uint64) (int, error) { var l int l, w.err = w.w.Write(w.b[:8]) - w.tot += uint64(l) + w.tot += l return l, w.err } -func (w *Writer) Tot() uint64 { +func (w *Writer) Tot() int { return w.tot } -func (w *Writer) Err() error { +func (w *Writer) Error() error { return w.err } diff --git a/xdr/xdr_test.go b/xdr/xdr_test.go index 859958ef8..154038617 100644 --- a/xdr/xdr_test.go +++ b/xdr/xdr_test.go @@ -30,8 +30,8 @@ func TestBytesNil(t *testing.T) { var r = NewReader(b) w.WriteBytes(bs) w.WriteBytes(bs) - r.ReadBytes(nil) - res := r.ReadBytes(nil) + r.ReadBytes() + res := r.ReadBytes() return bytes.Compare(bs, res) == 0 } if err := quick.Check(fn, nil); err != nil { @@ -47,8 +47,8 @@ func TestBytesGiven(t *testing.T) { w.WriteBytes(bs) w.WriteBytes(bs) res := make([]byte, 12) - res = r.ReadBytes(res) - res = r.ReadBytes(res) + res = r.ReadBytesInto(res) + res = r.ReadBytesInto(res) return bytes.Compare(bs, res) == 0 } if err := quick.Check(fn, nil); err != nil {