diff --git a/lib/protocol/AUTHORS b/lib/protocol/AUTHORS new file mode 100644 index 000000000..d84404ee2 --- /dev/null +++ b/lib/protocol/AUTHORS @@ -0,0 +1,4 @@ +# This is the official list of Protocol Authors for copyright purposes. + +Audrius Butkevicius +Jakob Borg diff --git a/lib/protocol/CONTRIBUTING.md b/lib/protocol/CONTRIBUTING.md new file mode 100644 index 000000000..67e6a9c70 --- /dev/null +++ b/lib/protocol/CONTRIBUTING.md @@ -0,0 +1,76 @@ +## Reporting Bugs + +Please file bugs in the [Github Issue +Tracker](https://github.com/syncthing/protocol/issues). + +## Contributing Code + +Every contribution is welcome. Following the points below will make this +a smoother process. + +Individuals making significant and valuable contributions are given +commit-access to the project. If you make a significant contribution and +are not considered for commit-access, please contact any of the +Syncthing core team members. + +All nontrivial contributions should go through the pull request +mechanism for internal review. Determining what is "nontrivial" is left +at the discretion of the contributor. + +### Authorship + +All code authors are listed in the AUTHORS file. Commits must be made +with the same name and email as listed in the AUTHORS file. To +accomplish this, ensure that your git configuration is set correctly +prior to making your first commit; + + $ git config --global user.name "Jane Doe" + $ git config --global user.email janedoe@example.com + +You must be reachable on the given email address. If you do not wish to +use your real name for whatever reason, using a nickname or pseudonym is +perfectly acceptable. + +## Coding Style + +- Follow the conventions laid out in [Effective Go](https://golang.org/doc/effective_go.html) + as much as makes sense. + +- All text files use Unix line endings. + +- Each commit should be `go fmt` clean. + +- The commit message subject should be a single short sentence + describing the change, starting with a capital letter. + +- Commits that resolve an existing issue must include the issue number + as `(fixes #123)` at the end of the commit message subject. + +- Imports are grouped per `goimports` standard; that is, standard + library first, then third party libraries after a blank line. + +- A contribution solving a single issue or introducing a single new + feature should probably be a single commit based on the current + `master` branch. You may be asked to "rebase" or "squash" your pull + request to make sure this is the case, especially if there have been + amendments during review. + +## Licensing + +All contributions are made under the same MIT license as the rest of the +project, except documentation, user interface text and translation +strings which are licensed under the Creative Commons Attribution 4.0 +International License. You retain the copyright to code you have +written. + +When accepting your first contribution, the maintainer of the project +will ensure that you are added to the AUTHORS file. You are welcome to +add yourself as a separate commit in your first pull request. + +## Tests + +Yes please! + +## License + +MIT diff --git a/lib/protocol/LICENSE b/lib/protocol/LICENSE new file mode 100644 index 000000000..6f6960a75 --- /dev/null +++ b/lib/protocol/LICENSE @@ -0,0 +1,19 @@ +Copyright (C) 2014-2015 The Protocol Authors + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +- The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/lib/protocol/README.md b/lib/protocol/README.md new file mode 100644 index 000000000..bcba44b42 --- /dev/null +++ b/lib/protocol/README.md @@ -0,0 +1,13 @@ +The BEPv1 Protocol +================== + +[![Latest Build](http://img.shields.io/jenkins/s/http/build.syncthing.net/protocol.svg?style=flat-square)](http://build.syncthing.net/job/protocol/lastBuild/) +[![API Documentation](http://img.shields.io/badge/api-Godoc-blue.svg?style=flat-square)](http://godoc.org/github.com/syncthing/protocol) +[![MIT License](http://img.shields.io/badge/license-MIT-blue.svg?style=flat-square)](http://opensource.org/licenses/MIT) + +This is the protocol implementation used by Syncthing. + +License +======= + +MIT diff --git a/lib/protocol/common_test.go b/lib/protocol/common_test.go new file mode 100644 index 000000000..8f1028078 --- /dev/null +++ b/lib/protocol/common_test.go @@ -0,0 +1,81 @@ +// Copyright (C) 2014 The Protocol Authors. + +package protocol + +import ( + "io" + "time" +) + +type TestModel struct { + data []byte + folder string + name string + offset int64 + size int + hash []byte + flags uint32 + options []Option + closedCh chan bool +} + +func newTestModel() *TestModel { + return &TestModel{ + closedCh: make(chan bool), + } +} + +func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { +} + +func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { +} + +func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { + t.folder = folder + t.name = name + t.offset = offset + t.size = len(buf) + t.hash = hash + t.flags = flags + t.options = options + copy(buf, t.data) + return nil +} + +func (t *TestModel) Close(deviceID DeviceID, err error) { + close(t.closedCh) +} + +func (t *TestModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { +} + +func (t *TestModel) isClosed() bool { + select { + case <-t.closedCh: + return true + case <-time.After(1 * time.Second): + return false // Timeout + } +} + +type ErrPipe struct { + io.PipeWriter + written int + max int + err error + closed bool +} + +func (e *ErrPipe) Write(data []byte) (int, error) { + if e.closed { + return 0, e.err + } + if e.written+len(data) > e.max { + n, _ := e.PipeWriter.Write(data[:e.max-e.written]) + e.PipeWriter.CloseWithError(e.err) + e.closed = true + return n, e.err + } + return e.PipeWriter.Write(data) +} diff --git a/lib/protocol/compression.go b/lib/protocol/compression.go new file mode 100644 index 000000000..9e17213b6 --- /dev/null +++ b/lib/protocol/compression.go @@ -0,0 +1,53 @@ +// Copyright (C) 2015 The Protocol Authors. + +package protocol + +import "fmt" + +type Compression int + +const ( + CompressMetadata Compression = iota // zero value is the default, default should be "metadata" + CompressNever + CompressAlways + + compressionThreshold = 128 // don't bother compressing messages smaller than this many bytes +) + +var compressionMarshal = map[Compression]string{ + CompressNever: "never", + CompressMetadata: "metadata", + CompressAlways: "always", +} + +var compressionUnmarshal = map[string]Compression{ + // Legacy + "false": CompressNever, + "true": CompressMetadata, + + // Current + "never": CompressNever, + "metadata": CompressMetadata, + "always": CompressAlways, +} + +func (c Compression) String() string { + s, ok := compressionMarshal[c] + if !ok { + return fmt.Sprintf("unknown:%d", c) + } + return s +} + +func (c Compression) GoString() string { + return fmt.Sprintf("%q", c.String()) +} + +func (c Compression) MarshalText() ([]byte, error) { + return []byte(compressionMarshal[c]), nil +} + +func (c *Compression) UnmarshalText(bs []byte) error { + *c = compressionUnmarshal[string(bs)] + return nil +} diff --git a/lib/protocol/compression_test.go b/lib/protocol/compression_test.go new file mode 100644 index 000000000..90312344c --- /dev/null +++ b/lib/protocol/compression_test.go @@ -0,0 +1,49 @@ +// Copyright (C) 2015 The Protocol Authors. + +package protocol + +import "testing" + +func TestCompressionMarshal(t *testing.T) { + uTestcases := []struct { + s string + c Compression + }{ + {"true", CompressMetadata}, + {"false", CompressNever}, + {"never", CompressNever}, + {"metadata", CompressMetadata}, + {"always", CompressAlways}, + {"whatever", CompressMetadata}, + } + + mTestcases := []struct { + s string + c Compression + }{ + {"never", CompressNever}, + {"metadata", CompressMetadata}, + {"always", CompressAlways}, + } + + var c Compression + for _, tc := range uTestcases { + err := c.UnmarshalText([]byte(tc.s)) + if err != nil { + t.Error(err) + } + if c != tc.c { + t.Errorf("%s unmarshalled to %d, not %d", tc.s, c, tc.c) + } + } + + for _, tc := range mTestcases { + bs, err := tc.c.MarshalText() + if err != nil { + t.Error(err) + } + if s := string(bs); s != tc.s { + t.Errorf("%d marshalled to %q, not %q", tc.c, s, tc.s) + } + } +} diff --git a/lib/protocol/conflict_test.go b/lib/protocol/conflict_test.go new file mode 100644 index 000000000..ef5c44d7e --- /dev/null +++ b/lib/protocol/conflict_test.go @@ -0,0 +1,23 @@ +// Copyright (C) 2015 The Protocol Authors. + +package protocol + +import "testing" + +func TestWinsConflict(t *testing.T) { + testcases := [][2]FileInfo{ + // The first should always win over the second + {{Modified: 42}, {Modified: 41}}, + {{Modified: 41}, {Modified: 42, Flags: FlagDeleted}}, + {{Modified: 41, Version: Vector{{42, 2}, {43, 1}}}, {Modified: 41, Version: Vector{{42, 1}, {43, 2}}}}, + } + + for _, tc := range testcases { + if !tc[0].WinsConflict(tc[1]) { + t.Errorf("%v should win over %v", tc[0], tc[1]) + } + if tc[1].WinsConflict(tc[0]) { + t.Errorf("%v should not win over %v", tc[1], tc[0]) + } + } +} diff --git a/lib/protocol/counting.go b/lib/protocol/counting.go new file mode 100644 index 000000000..d441ed311 --- /dev/null +++ b/lib/protocol/counting.go @@ -0,0 +1,62 @@ +// Copyright (C) 2014 The Protocol Authors. + +package protocol + +import ( + "io" + "sync/atomic" + "time" +) + +type countingReader struct { + io.Reader + tot int64 // bytes + last int64 // unix nanos +} + +var ( + totalIncoming int64 + totalOutgoing int64 +) + +func (c *countingReader) Read(bs []byte) (int, error) { + n, err := c.Reader.Read(bs) + atomic.AddInt64(&c.tot, int64(n)) + atomic.AddInt64(&totalIncoming, int64(n)) + atomic.StoreInt64(&c.last, time.Now().UnixNano()) + return n, err +} + +func (c *countingReader) Tot() int64 { + return atomic.LoadInt64(&c.tot) +} + +func (c *countingReader) Last() time.Time { + return time.Unix(0, atomic.LoadInt64(&c.last)) +} + +type countingWriter struct { + io.Writer + tot int64 // bytes + last int64 // unix nanos +} + +func (c *countingWriter) Write(bs []byte) (int, error) { + n, err := c.Writer.Write(bs) + atomic.AddInt64(&c.tot, int64(n)) + atomic.AddInt64(&totalOutgoing, int64(n)) + atomic.StoreInt64(&c.last, time.Now().UnixNano()) + return n, err +} + +func (c *countingWriter) Tot() int64 { + return atomic.LoadInt64(&c.tot) +} + +func (c *countingWriter) Last() time.Time { + return time.Unix(0, atomic.LoadInt64(&c.last)) +} + +func TotalInOut() (int64, int64) { + return atomic.LoadInt64(&totalIncoming), atomic.LoadInt64(&totalOutgoing) +} diff --git a/lib/protocol/debug.go b/lib/protocol/debug.go new file mode 100644 index 000000000..435d7f5d2 --- /dev/null +++ b/lib/protocol/debug.go @@ -0,0 +1,15 @@ +// Copyright (C) 2014 The Protocol Authors. + +package protocol + +import ( + "os" + "strings" + + "github.com/calmh/logger" +) + +var ( + debug = strings.Contains(os.Getenv("STTRACE"), "protocol") || os.Getenv("STTRACE") == "all" + l = logger.DefaultLogger +) diff --git a/lib/protocol/deviceid.go b/lib/protocol/deviceid.go new file mode 100644 index 000000000..2e0334a6a --- /dev/null +++ b/lib/protocol/deviceid.go @@ -0,0 +1,163 @@ +// Copyright (C) 2014 The Protocol Authors. + +package protocol + +import ( + "bytes" + "crypto/sha256" + "encoding/base32" + "encoding/binary" + "errors" + "fmt" + "regexp" + "strings" + + "github.com/calmh/luhn" +) + +type DeviceID [32]byte + +var LocalDeviceID = DeviceID{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} + +// NewDeviceID generates a new device ID from the raw bytes of a certificate +func NewDeviceID(rawCert []byte) DeviceID { + var n DeviceID + hf := sha256.New() + hf.Write(rawCert) + hf.Sum(n[:0]) + return n +} + +func DeviceIDFromString(s string) (DeviceID, error) { + var n DeviceID + err := n.UnmarshalText([]byte(s)) + return n, err +} + +func DeviceIDFromBytes(bs []byte) DeviceID { + var n DeviceID + if len(bs) != len(n) { + panic("incorrect length of byte slice representing device ID") + } + copy(n[:], bs) + return n +} + +// String returns the canonical string representation of the device ID +func (n DeviceID) String() string { + id := base32.StdEncoding.EncodeToString(n[:]) + id = strings.Trim(id, "=") + id, err := luhnify(id) + if err != nil { + // Should never happen + panic(err) + } + id = chunkify(id) + return id +} + +func (n DeviceID) GoString() string { + return n.String() +} + +func (n DeviceID) Compare(other DeviceID) int { + return bytes.Compare(n[:], other[:]) +} + +func (n DeviceID) Equals(other DeviceID) bool { + return bytes.Compare(n[:], other[:]) == 0 +} + +// Short returns an integer representing bits 0-63 of the device ID. +func (n DeviceID) Short() uint64 { + return binary.BigEndian.Uint64(n[:]) +} + +func (n *DeviceID) MarshalText() ([]byte, error) { + return []byte(n.String()), nil +} + +func (n *DeviceID) UnmarshalText(bs []byte) error { + id := string(bs) + id = strings.Trim(id, "=") + id = strings.ToUpper(id) + id = untypeoify(id) + id = unchunkify(id) + + var err error + switch len(id) { + case 56: + // New style, with check digits + id, err = unluhnify(id) + if err != nil { + return err + } + fallthrough + case 52: + // Old style, no check digits + dec, err := base32.StdEncoding.DecodeString(id + "====") + if err != nil { + return err + } + copy(n[:], dec) + return nil + default: + return errors.New("device ID invalid: incorrect length") + } +} + +func luhnify(s string) (string, error) { + if len(s) != 52 { + panic("unsupported string length") + } + + res := make([]string, 0, 4) + for i := 0; i < 4; i++ { + p := s[i*13 : (i+1)*13] + l, err := luhn.Base32.Generate(p) + if err != nil { + return "", err + } + res = append(res, fmt.Sprintf("%s%c", p, l)) + } + return res[0] + res[1] + res[2] + res[3], nil +} + +func unluhnify(s string) (string, error) { + if len(s) != 56 { + return "", fmt.Errorf("unsupported string length %d", len(s)) + } + + res := make([]string, 0, 4) + for i := 0; i < 4; i++ { + p := s[i*14 : (i+1)*14-1] + l, err := luhn.Base32.Generate(p) + if err != nil { + return "", err + } + if g := fmt.Sprintf("%s%c", p, l); g != s[i*14:(i+1)*14] { + return "", errors.New("check digit incorrect") + } + res = append(res, p) + } + return res[0] + res[1] + res[2] + res[3], nil +} + +func chunkify(s string) string { + s = regexp.MustCompile("(.{7})").ReplaceAllString(s, "$1-") + s = strings.Trim(s, "-") + return s +} + +func unchunkify(s string) string { + s = strings.Replace(s, "-", "", -1) + s = strings.Replace(s, " ", "", -1) + return s +} + +func untypeoify(s string) string { + s = strings.Replace(s, "0", "O", -1) + s = strings.Replace(s, "1", "I", -1) + s = strings.Replace(s, "8", "B", -1) + return s +} diff --git a/lib/protocol/deviceid_test.go b/lib/protocol/deviceid_test.go new file mode 100644 index 000000000..613557d32 --- /dev/null +++ b/lib/protocol/deviceid_test.go @@ -0,0 +1,76 @@ +// Copyright (C) 2014 The Protocol Authors. + +package protocol + +import "testing" + +var formatted = "P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2" +var formatCases = []string{ + "P56IOI-7MZJNU-2IQGDR-EYDM2M-GTMGL3-BXNPQ6-W5BTBB-Z4TJXZ-WICQ", + "P56IOI-7MZJNU2Y-IQGDR-EYDM2M-GTI-MGL3-BXNPQ6-W5BM-TBB-Z4TJXZ-WICQ2", + "P56IOI7 MZJNU2I QGDREYD M2MGTMGL 3BXNPQ6W 5BTB BZ4T JXZWICQ", + "P56IOI7 MZJNU2Y IQGDREY DM2MGTI MGL3BXN PQ6W5BM TBBZ4TJ XZWICQ2", + "P56IOI7MZJNU2IQGDREYDM2MGTMGL3BXNPQ6W5BTBBZ4TJXZWICQ", + "p56ioi7mzjnu2iqgdreydm2mgtmgl3bxnpq6w5btbbz4tjxzwicq", + "P56IOI7MZJNU2YIQGDREYDM2MGTIMGL3BXNPQ6W5BMTBBZ4TJXZWICQ2", + "P561017MZJNU2YIQGDREYDM2MGTIMGL3BXNPQ6W5BMT88Z4TJXZWICQ2", + "p56ioi7mzjnu2yiqgdreydm2mgtimgl3bxnpq6w5bmtbbz4tjxzwicq2", + "p561017mzjnu2yiqgdreydm2mgtimgl3bxnpq6w5bmt88z4tjxzwicq2", +} + +func TestFormatDeviceID(t *testing.T) { + for i, tc := range formatCases { + var id DeviceID + err := id.UnmarshalText([]byte(tc)) + if err != nil { + t.Errorf("#%d UnmarshalText(%q); %v", i, tc, err) + } else if f := id.String(); f != formatted { + t.Errorf("#%d FormatDeviceID(%q)\n\t%q !=\n\t%q", i, tc, f, formatted) + } + } +} + +var validateCases = []struct { + s string + ok bool +}{ + {"", false}, + {"P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2", true}, + {"P56IOI7-MZJNU2-IQGDREY-DM2MGT-MGL3BXN-PQ6W5B-TBBZ4TJ-XZWICQ", true}, + {"P56IOI7 MZJNU2I QGDREYD M2MGTMGL 3BXNPQ6W 5BTB BZ4T JXZWICQ", true}, + {"P56IOI7MZJNU2IQGDREYDM2MGTMGL3BXNPQ6W5BTBBZ4TJXZWICQ", true}, + {"P56IOI7MZJNU2IQGDREYDM2MGTMGL3BXNPQ6W5BTBBZ4TJXZWICQCCCC", false}, + {"p56ioi7mzjnu2iqgdreydm2mgtmgl3bxnpq6w5btbbz4tjxzwicq", true}, + {"p56ioi7mzjnu2iqgdreydm2mgtmgl3bxnpq6w5btbbz4tjxzwicqCCCC", false}, +} + +func TestValidateDeviceID(t *testing.T) { + for _, tc := range validateCases { + var id DeviceID + err := id.UnmarshalText([]byte(tc.s)) + if (err == nil && !tc.ok) || (err != nil && tc.ok) { + t.Errorf("ValidateDeviceID(%q); %v != %v", tc.s, err, tc.ok) + } + } +} + +func TestMarshallingDeviceID(t *testing.T) { + n0 := DeviceID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 10, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32} + n1 := DeviceID{} + n2 := DeviceID{} + + bs, _ := n0.MarshalText() + n1.UnmarshalText(bs) + bs, _ = n1.MarshalText() + n2.UnmarshalText(bs) + + if n2.String() != n0.String() { + t.Errorf("String marshalling error; %q != %q", n2.String(), n0.String()) + } + if !n2.Equals(n0) { + t.Error("Equals error") + } + if n2.Compare(n0) != 0 { + t.Error("Compare error") + } +} diff --git a/lib/protocol/doc.go b/lib/protocol/doc.go new file mode 100644 index 000000000..2c6ea8ef2 --- /dev/null +++ b/lib/protocol/doc.go @@ -0,0 +1,4 @@ +// Copyright (C) 2014 The Protocol Authors. + +// Package protocol implements the Block Exchange Protocol. +package protocol diff --git a/lib/protocol/errors.go b/lib/protocol/errors.go new file mode 100755 index 000000000..31d27af0d --- /dev/null +++ b/lib/protocol/errors.go @@ -0,0 +1,51 @@ +// Copyright (C) 2014 The Protocol Authors. + +package protocol + +import ( + "errors" +) + +const ( + ecNoError int32 = iota + ecGeneric + ecNoSuchFile + ecInvalid +) + +var ( + ErrNoError error = nil + ErrGeneric = errors.New("generic error") + ErrNoSuchFile = errors.New("no such file") + ErrInvalid = errors.New("file is invalid") +) + +var lookupError = map[int32]error{ + ecNoError: ErrNoError, + ecGeneric: ErrGeneric, + ecNoSuchFile: ErrNoSuchFile, + ecInvalid: ErrInvalid, +} + +var lookupCode = map[error]int32{ + ErrNoError: ecNoError, + ErrGeneric: ecGeneric, + ErrNoSuchFile: ecNoSuchFile, + ErrInvalid: ecInvalid, +} + +func codeToError(errcode int32) error { + err, ok := lookupError[errcode] + if !ok { + return ErrGeneric + } + return err +} + +func errorToCode(err error) int32 { + code, ok := lookupCode[err] + if !ok { + return ecGeneric + } + return code +} diff --git a/lib/protocol/fuzz.go b/lib/protocol/fuzz.go new file mode 100644 index 000000000..9b82abe7c --- /dev/null +++ b/lib/protocol/fuzz.go @@ -0,0 +1,70 @@ +// Copyright (C) 2015 The Protocol Authors. + +// +build gofuzz + +package protocol + +import ( + "bytes" + "encoding/binary" + "encoding/hex" + "fmt" + "reflect" + "sync" +) + +func Fuzz(data []byte) int { + // Regenerate the length, or we'll most commonly exit quickly due to an + // unexpected eof which is unintestering. + if len(data) > 8 { + binary.BigEndian.PutUint32(data[4:], uint32(len(data))-8) + } + + // Setup a rawConnection we'll use to parse the message. + c := rawConnection{ + cr: &countingReader{Reader: bytes.NewReader(data)}, + closed: make(chan struct{}), + pool: sync.Pool{ + New: func() interface{} { + return make([]byte, BlockSize) + }, + }, + } + + // Attempt to parse the message. + hdr, msg, err := c.readMessage() + if err != nil { + return 0 + } + + // If parsing worked, attempt to encode it again. + newBs, err := msg.AppendXDR(nil) + if err != nil { + panic("not encodable") + } + + // Create an appriate header for the re-encoding. + newMsg := make([]byte, 8) + binary.BigEndian.PutUint32(newMsg, encodeHeader(hdr)) + binary.BigEndian.PutUint32(newMsg[4:], uint32(len(newBs))) + newMsg = append(newMsg, newBs...) + + // Use the rawConnection to parse the re-encoding. + c.cr = &countingReader{Reader: bytes.NewReader(newMsg)} + hdr2, msg2, err := c.readMessage() + if err != nil { + fmt.Println("Initial:\n" + hex.Dump(data)) + fmt.Println("New:\n" + hex.Dump(newMsg)) + panic("not parseable after re-encode: " + err.Error()) + } + + // Make sure the data is the same as it was before. + if hdr != hdr2 { + panic("headers differ") + } + if !reflect.DeepEqual(msg, msg2) { + panic("contents differ") + } + + return 1 +} diff --git a/lib/protocol/fuzz_test.go b/lib/protocol/fuzz_test.go new file mode 100644 index 000000000..65c2d9010 --- /dev/null +++ b/lib/protocol/fuzz_test.go @@ -0,0 +1,89 @@ +// Copyright (C) 2015 The Protocol Authors. + +// +build gofuzz + +package protocol + +import ( + "encoding/binary" + "fmt" + "io/ioutil" + "os" + "strings" + "testing" + "testing/quick" +) + +// This can be used to generate a corpus of valid messages as a starting point +// for the fuzzer. +func TestGenerateCorpus(t *testing.T) { + t.Skip("Use to generate initial corpus only") + + n := 0 + check := func(idx IndexMessage) bool { + for i := range idx.Options { + if len(idx.Options[i].Key) > 64 { + idx.Options[i].Key = idx.Options[i].Key[:64] + } + } + hdr := header{ + version: 0, + msgID: 42, + msgType: messageTypeIndex, + compression: false, + } + + msgBs := idx.MustMarshalXDR() + + buf := make([]byte, 8) + binary.BigEndian.PutUint32(buf, encodeHeader(hdr)) + binary.BigEndian.PutUint32(buf[4:], uint32(len(msgBs))) + buf = append(buf, msgBs...) + + ioutil.WriteFile(fmt.Sprintf("testdata/corpus/test-%03d.xdr", n), buf, 0644) + n++ + return true + } + + if err := quick.Check(check, &quick.Config{MaxCount: 1000}); err != nil { + t.Fatal(err) + } +} + +// Tests any crashers found by the fuzzer, for closer investigation. +func TestCrashers(t *testing.T) { + testFiles(t, "testdata/crashers") +} + +// Tests the entire corpus, which should PASS before the fuzzer starts +// fuzzing. +func TestCorpus(t *testing.T) { + testFiles(t, "testdata/corpus") +} + +func testFiles(t *testing.T, dir string) { + fd, err := os.Open(dir) + if err != nil { + t.Fatal(err) + } + crashers, err := fd.Readdirnames(-1) + if err != nil { + t.Fatal(err) + } + for _, name := range crashers { + if strings.HasSuffix(name, ".output") { + continue + } + if strings.HasSuffix(name, ".quoted") { + continue + } + + t.Log(name) + crasher, err := ioutil.ReadFile(dir + "/" + name) + if err != nil { + t.Fatal(err) + } + + Fuzz(crasher) + } +} diff --git a/lib/protocol/header.go b/lib/protocol/header.go new file mode 100644 index 000000000..846ee48cd --- /dev/null +++ b/lib/protocol/header.go @@ -0,0 +1,43 @@ +// Copyright (C) 2014 The Protocol Authors. + +package protocol + +import "github.com/calmh/xdr" + +type header struct { + version int + msgID int + msgType int + compression bool +} + +func (h header) encodeXDR(xw *xdr.Writer) (int, error) { + u := encodeHeader(h) + return xw.WriteUint32(u) +} + +func (h *header) decodeXDR(xr *xdr.Reader) error { + u := xr.ReadUint32() + *h = decodeHeader(u) + return xr.Error() +} + +func encodeHeader(h header) uint32 { + var isComp uint32 + if h.compression { + isComp = 1 << 0 // the zeroth bit is the compression bit + } + return uint32(h.version&0xf)<<28 + + uint32(h.msgID&0xfff)<<16 + + uint32(h.msgType&0xff)<<8 + + isComp +} + +func decodeHeader(u uint32) header { + return header{ + version: int(u>>28) & 0xf, + msgID: int(u>>16) & 0xfff, + msgType: int(u>>8) & 0xff, + compression: u&1 == 1, + } +} diff --git a/lib/protocol/message.go b/lib/protocol/message.go new file mode 100644 index 000000000..2a37136b5 --- /dev/null +++ b/lib/protocol/message.go @@ -0,0 +1,152 @@ +// Copyright (C) 2014 The Protocol Authors. + +//go:generate -command genxdr go run ../syncthing/Godeps/_workspace/src/github.com/calmh/xdr/cmd/genxdr/main.go +//go:generate genxdr -o message_xdr.go message.go + +package protocol + +import "fmt" + +type IndexMessage struct { + Folder string + Files []FileInfo // max:1000000 + Flags uint32 + Options []Option // max:64 +} + +type FileInfo struct { + Name string // max:8192 + Flags uint32 + Modified int64 + Version Vector + LocalVersion int64 + CachedSize int64 // noencode (cache only) + Blocks []BlockInfo // max:1000000 +} + +func (f FileInfo) String() string { + return fmt.Sprintf("File{Name:%q, Flags:0%o, Modified:%d, Version:%v, Size:%d, Blocks:%v}", + f.Name, f.Flags, f.Modified, f.Version, f.Size(), f.Blocks) +} + +func (f FileInfo) Size() (bytes int64) { + if f.IsDeleted() || f.IsDirectory() { + return 128 + } + for _, b := range f.Blocks { + bytes += int64(b.Size) + } + return +} + +func (f FileInfo) IsDeleted() bool { + return f.Flags&FlagDeleted != 0 +} + +func (f FileInfo) IsInvalid() bool { + return f.Flags&FlagInvalid != 0 +} + +func (f FileInfo) IsDirectory() bool { + return f.Flags&FlagDirectory != 0 +} + +func (f FileInfo) IsSymlink() bool { + return f.Flags&FlagSymlink != 0 +} + +func (f FileInfo) HasPermissionBits() bool { + return f.Flags&FlagNoPermBits == 0 +} + +// WinsConflict returns true if "f" is the one to choose when it is in +// conflict with "other". +func (f FileInfo) WinsConflict(other FileInfo) bool { + // If a modification is in conflict with a delete, we pick the + // modification. + if !f.IsDeleted() && other.IsDeleted() { + return true + } + if f.IsDeleted() && !other.IsDeleted() { + return false + } + + // The one with the newer modification time wins. + if f.Modified > other.Modified { + return true + } + if f.Modified < other.Modified { + return false + } + + // The modification times were equal. Use the device ID in the version + // vector as tie breaker. + return f.Version.Compare(other.Version) == ConcurrentGreater +} + +type BlockInfo struct { + Offset int64 // noencode (cache only) + Size int32 + Hash []byte // max:64 +} + +func (b BlockInfo) String() string { + return fmt.Sprintf("Block{%d/%d/%x}", b.Offset, b.Size, b.Hash) +} + +type RequestMessage struct { + Folder string // max:64 + Name string // max:8192 + Offset int64 + Size int32 + Hash []byte // max:64 + Flags uint32 + Options []Option // max:64 +} + +type ResponseMessage struct { + Data []byte + Code int32 +} + +type ClusterConfigMessage struct { + ClientName string // max:64 + ClientVersion string // max:64 + Folders []Folder // max:1000000 + Options []Option // max:64 +} + +func (o *ClusterConfigMessage) GetOption(key string) string { + for _, option := range o.Options { + if option.Key == key { + return option.Value + } + } + return "" +} + +type Folder struct { + ID string // max:64 + Devices []Device // max:1000000 + Flags uint32 + Options []Option // max:64 +} + +type Device struct { + ID []byte // max:32 + MaxLocalVersion int64 + Flags uint32 + Options []Option // max:64 +} + +type Option struct { + Key string // max:64 + Value string // max:1024 +} + +type CloseMessage struct { + Reason string // max:1024 + Code int32 +} + +type EmptyMessage struct{} diff --git a/lib/protocol/message_xdr.go b/lib/protocol/message_xdr.go new file mode 100644 index 000000000..876fbb77c --- /dev/null +++ b/lib/protocol/message_xdr.go @@ -0,0 +1,1136 @@ +// ************************************************************ +// This file is automatically generated by genxdr. Do not edit. +// ************************************************************ + +package protocol + +import ( + "bytes" + "io" + + "github.com/calmh/xdr" +) + +/* + +IndexMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Folder | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Folder (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Files | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more FileInfo Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Flags | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Options | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Option Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct IndexMessage { + string Folder<>; + FileInfo Files<1000000>; + unsigned int Flags; + Option Options<64>; +} + +*/ + +func (o IndexMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o IndexMessage) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o IndexMessage) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o IndexMessage) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o IndexMessage) EncodeXDRInto(xw *xdr.Writer) (int, error) { + xw.WriteString(o.Folder) + if l := len(o.Files); l > 1000000 { + return xw.Tot(), xdr.ElementSizeExceeded("Files", l, 1000000) + } + xw.WriteUint32(uint32(len(o.Files))) + for i := range o.Files { + _, err := o.Files[i].EncodeXDRInto(xw) + if err != nil { + return xw.Tot(), err + } + } + xw.WriteUint32(o.Flags) + if l := len(o.Options); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("Options", l, 64) + } + xw.WriteUint32(uint32(len(o.Options))) + for i := range o.Options { + _, err := o.Options[i].EncodeXDRInto(xw) + if err != nil { + return xw.Tot(), err + } + } + return xw.Tot(), xw.Error() +} + +func (o *IndexMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *IndexMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *IndexMessage) DecodeXDRFrom(xr *xdr.Reader) error { + o.Folder = xr.ReadString() + _FilesSize := int(xr.ReadUint32()) + if _FilesSize < 0 { + return xdr.ElementSizeExceeded("Files", _FilesSize, 1000000) + } + if _FilesSize > 1000000 { + return xdr.ElementSizeExceeded("Files", _FilesSize, 1000000) + } + o.Files = make([]FileInfo, _FilesSize) + for i := range o.Files { + (&o.Files[i]).DecodeXDRFrom(xr) + } + o.Flags = xr.ReadUint32() + _OptionsSize := int(xr.ReadUint32()) + if _OptionsSize < 0 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + if _OptionsSize > 64 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + o.Options = make([]Option, _OptionsSize) + for i := range o.Options { + (&o.Options[i]).DecodeXDRFrom(xr) + } + return xr.Error() +} + +/* + +FileInfo Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Name | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Name (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Flags | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Modified (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Vector Structure \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Local Version (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Blocks | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more BlockInfo Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct FileInfo { + string Name<8192>; + unsigned int Flags; + hyper Modified; + Vector Version; + hyper LocalVersion; + BlockInfo Blocks<1000000>; +} + +*/ + +func (o FileInfo) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o FileInfo) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o FileInfo) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o FileInfo) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o FileInfo) EncodeXDRInto(xw *xdr.Writer) (int, error) { + if l := len(o.Name); l > 8192 { + return xw.Tot(), xdr.ElementSizeExceeded("Name", l, 8192) + } + xw.WriteString(o.Name) + xw.WriteUint32(o.Flags) + xw.WriteUint64(uint64(o.Modified)) + _, err := o.Version.EncodeXDRInto(xw) + if err != nil { + return xw.Tot(), err + } + xw.WriteUint64(uint64(o.LocalVersion)) + if l := len(o.Blocks); l > 1000000 { + return xw.Tot(), xdr.ElementSizeExceeded("Blocks", l, 1000000) + } + xw.WriteUint32(uint32(len(o.Blocks))) + for i := range o.Blocks { + _, err := o.Blocks[i].EncodeXDRInto(xw) + if err != nil { + return xw.Tot(), err + } + } + return xw.Tot(), xw.Error() +} + +func (o *FileInfo) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *FileInfo) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *FileInfo) DecodeXDRFrom(xr *xdr.Reader) error { + o.Name = xr.ReadStringMax(8192) + o.Flags = xr.ReadUint32() + o.Modified = int64(xr.ReadUint64()) + (&o.Version).DecodeXDRFrom(xr) + o.LocalVersion = int64(xr.ReadUint64()) + _BlocksSize := int(xr.ReadUint32()) + if _BlocksSize < 0 { + return xdr.ElementSizeExceeded("Blocks", _BlocksSize, 1000000) + } + if _BlocksSize > 1000000 { + return xdr.ElementSizeExceeded("Blocks", _BlocksSize, 1000000) + } + o.Blocks = make([]BlockInfo, _BlocksSize) + for i := range o.Blocks { + (&o.Blocks[i]).DecodeXDRFrom(xr) + } + return xr.Error() +} + +/* + +BlockInfo Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Size | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Hash | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Hash (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct BlockInfo { + int Size; + opaque Hash<64>; +} + +*/ + +func (o BlockInfo) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o BlockInfo) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o BlockInfo) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o BlockInfo) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o BlockInfo) EncodeXDRInto(xw *xdr.Writer) (int, error) { + xw.WriteUint32(uint32(o.Size)) + if l := len(o.Hash); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("Hash", l, 64) + } + xw.WriteBytes(o.Hash) + return xw.Tot(), xw.Error() +} + +func (o *BlockInfo) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *BlockInfo) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *BlockInfo) DecodeXDRFrom(xr *xdr.Reader) error { + o.Size = int32(xr.ReadUint32()) + o.Hash = xr.ReadBytesMax(64) + return xr.Error() +} + +/* + +RequestMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Folder | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Folder (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Name | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Name (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Offset (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Size | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Hash | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Hash (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Flags | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Options | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Option Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct RequestMessage { + string Folder<64>; + string Name<8192>; + hyper Offset; + int Size; + opaque Hash<64>; + unsigned int Flags; + Option Options<64>; +} + +*/ + +func (o RequestMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o RequestMessage) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o RequestMessage) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o RequestMessage) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o RequestMessage) EncodeXDRInto(xw *xdr.Writer) (int, error) { + if l := len(o.Folder); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("Folder", l, 64) + } + xw.WriteString(o.Folder) + if l := len(o.Name); l > 8192 { + return xw.Tot(), xdr.ElementSizeExceeded("Name", l, 8192) + } + xw.WriteString(o.Name) + xw.WriteUint64(uint64(o.Offset)) + xw.WriteUint32(uint32(o.Size)) + if l := len(o.Hash); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("Hash", l, 64) + } + xw.WriteBytes(o.Hash) + xw.WriteUint32(o.Flags) + if l := len(o.Options); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("Options", l, 64) + } + xw.WriteUint32(uint32(len(o.Options))) + for i := range o.Options { + _, err := o.Options[i].EncodeXDRInto(xw) + if err != nil { + return xw.Tot(), err + } + } + return xw.Tot(), xw.Error() +} + +func (o *RequestMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *RequestMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *RequestMessage) DecodeXDRFrom(xr *xdr.Reader) error { + o.Folder = xr.ReadStringMax(64) + o.Name = xr.ReadStringMax(8192) + o.Offset = int64(xr.ReadUint64()) + o.Size = int32(xr.ReadUint32()) + o.Hash = xr.ReadBytesMax(64) + o.Flags = xr.ReadUint32() + _OptionsSize := int(xr.ReadUint32()) + if _OptionsSize < 0 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + if _OptionsSize > 64 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + o.Options = make([]Option, _OptionsSize) + for i := range o.Options { + (&o.Options[i]).DecodeXDRFrom(xr) + } + return xr.Error() +} + +/* + +ResponseMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Data | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Data (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Code | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct ResponseMessage { + opaque Data<>; + int Code; +} + +*/ + +func (o ResponseMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o ResponseMessage) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o ResponseMessage) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o ResponseMessage) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o ResponseMessage) EncodeXDRInto(xw *xdr.Writer) (int, error) { + xw.WriteBytes(o.Data) + xw.WriteUint32(uint32(o.Code)) + return xw.Tot(), xw.Error() +} + +func (o *ResponseMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *ResponseMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *ResponseMessage) DecodeXDRFrom(xr *xdr.Reader) error { + o.Data = xr.ReadBytes() + o.Code = int32(xr.ReadUint32()) + return xr.Error() +} + +/* + +ClusterConfigMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Client Name | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Client Name (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Client Version | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Client Version (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Folders | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Folder Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Options | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Option Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct ClusterConfigMessage { + string ClientName<64>; + string ClientVersion<64>; + Folder Folders<1000000>; + Option Options<64>; +} + +*/ + +func (o ClusterConfigMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o ClusterConfigMessage) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o ClusterConfigMessage) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o ClusterConfigMessage) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o ClusterConfigMessage) EncodeXDRInto(xw *xdr.Writer) (int, error) { + if l := len(o.ClientName); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("ClientName", l, 64) + } + xw.WriteString(o.ClientName) + if l := len(o.ClientVersion); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("ClientVersion", l, 64) + } + xw.WriteString(o.ClientVersion) + if l := len(o.Folders); l > 1000000 { + return xw.Tot(), xdr.ElementSizeExceeded("Folders", l, 1000000) + } + xw.WriteUint32(uint32(len(o.Folders))) + for i := range o.Folders { + _, err := o.Folders[i].EncodeXDRInto(xw) + if err != nil { + return xw.Tot(), err + } + } + if l := len(o.Options); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("Options", l, 64) + } + xw.WriteUint32(uint32(len(o.Options))) + for i := range o.Options { + _, err := o.Options[i].EncodeXDRInto(xw) + if err != nil { + return xw.Tot(), err + } + } + return xw.Tot(), xw.Error() +} + +func (o *ClusterConfigMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *ClusterConfigMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *ClusterConfigMessage) DecodeXDRFrom(xr *xdr.Reader) error { + o.ClientName = xr.ReadStringMax(64) + o.ClientVersion = xr.ReadStringMax(64) + _FoldersSize := int(xr.ReadUint32()) + if _FoldersSize < 0 { + return xdr.ElementSizeExceeded("Folders", _FoldersSize, 1000000) + } + if _FoldersSize > 1000000 { + return xdr.ElementSizeExceeded("Folders", _FoldersSize, 1000000) + } + o.Folders = make([]Folder, _FoldersSize) + for i := range o.Folders { + (&o.Folders[i]).DecodeXDRFrom(xr) + } + _OptionsSize := int(xr.ReadUint32()) + if _OptionsSize < 0 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + if _OptionsSize > 64 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + o.Options = make([]Option, _OptionsSize) + for i := range o.Options { + (&o.Options[i]).DecodeXDRFrom(xr) + } + return xr.Error() +} + +/* + +Folder Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of ID | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ ID (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Devices | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Device Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Flags | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Options | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Option Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct Folder { + string ID<64>; + Device Devices<1000000>; + unsigned int Flags; + Option Options<64>; +} + +*/ + +func (o Folder) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o Folder) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o Folder) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o Folder) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o Folder) EncodeXDRInto(xw *xdr.Writer) (int, error) { + if l := len(o.ID); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("ID", l, 64) + } + xw.WriteString(o.ID) + if l := len(o.Devices); l > 1000000 { + return xw.Tot(), xdr.ElementSizeExceeded("Devices", l, 1000000) + } + xw.WriteUint32(uint32(len(o.Devices))) + for i := range o.Devices { + _, err := o.Devices[i].EncodeXDRInto(xw) + if err != nil { + return xw.Tot(), err + } + } + xw.WriteUint32(o.Flags) + if l := len(o.Options); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("Options", l, 64) + } + xw.WriteUint32(uint32(len(o.Options))) + for i := range o.Options { + _, err := o.Options[i].EncodeXDRInto(xw) + if err != nil { + return xw.Tot(), err + } + } + return xw.Tot(), xw.Error() +} + +func (o *Folder) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *Folder) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *Folder) DecodeXDRFrom(xr *xdr.Reader) error { + o.ID = xr.ReadStringMax(64) + _DevicesSize := int(xr.ReadUint32()) + if _DevicesSize < 0 { + return xdr.ElementSizeExceeded("Devices", _DevicesSize, 1000000) + } + if _DevicesSize > 1000000 { + return xdr.ElementSizeExceeded("Devices", _DevicesSize, 1000000) + } + o.Devices = make([]Device, _DevicesSize) + for i := range o.Devices { + (&o.Devices[i]).DecodeXDRFrom(xr) + } + o.Flags = xr.ReadUint32() + _OptionsSize := int(xr.ReadUint32()) + if _OptionsSize < 0 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + if _OptionsSize > 64 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + o.Options = make([]Option, _OptionsSize) + for i := range o.Options { + (&o.Options[i]).DecodeXDRFrom(xr) + } + return xr.Error() +} + +/* + +Device Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of ID | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ ID (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Max Local Version (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Flags | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Options | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Option Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct Device { + opaque ID<32>; + hyper MaxLocalVersion; + unsigned int Flags; + Option Options<64>; +} + +*/ + +func (o Device) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o Device) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o Device) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o Device) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o Device) EncodeXDRInto(xw *xdr.Writer) (int, error) { + if l := len(o.ID); l > 32 { + return xw.Tot(), xdr.ElementSizeExceeded("ID", l, 32) + } + xw.WriteBytes(o.ID) + xw.WriteUint64(uint64(o.MaxLocalVersion)) + xw.WriteUint32(o.Flags) + if l := len(o.Options); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("Options", l, 64) + } + xw.WriteUint32(uint32(len(o.Options))) + for i := range o.Options { + _, err := o.Options[i].EncodeXDRInto(xw) + if err != nil { + return xw.Tot(), err + } + } + return xw.Tot(), xw.Error() +} + +func (o *Device) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *Device) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *Device) DecodeXDRFrom(xr *xdr.Reader) error { + o.ID = xr.ReadBytesMax(32) + o.MaxLocalVersion = int64(xr.ReadUint64()) + o.Flags = xr.ReadUint32() + _OptionsSize := int(xr.ReadUint32()) + if _OptionsSize < 0 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + if _OptionsSize > 64 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + o.Options = make([]Option, _OptionsSize) + for i := range o.Options { + (&o.Options[i]).DecodeXDRFrom(xr) + } + return xr.Error() +} + +/* + +Option Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Key | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Key (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Value | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Value (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct Option { + string Key<64>; + string Value<1024>; +} + +*/ + +func (o Option) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o Option) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o Option) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o Option) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o Option) EncodeXDRInto(xw *xdr.Writer) (int, error) { + if l := len(o.Key); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("Key", l, 64) + } + xw.WriteString(o.Key) + if l := len(o.Value); l > 1024 { + return xw.Tot(), xdr.ElementSizeExceeded("Value", l, 1024) + } + xw.WriteString(o.Value) + return xw.Tot(), xw.Error() +} + +func (o *Option) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *Option) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *Option) DecodeXDRFrom(xr *xdr.Reader) error { + o.Key = xr.ReadStringMax(64) + o.Value = xr.ReadStringMax(1024) + return xr.Error() +} + +/* + +CloseMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Reason | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Reason (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Code | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct CloseMessage { + string Reason<1024>; + int Code; +} + +*/ + +func (o CloseMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o CloseMessage) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o CloseMessage) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o CloseMessage) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o CloseMessage) EncodeXDRInto(xw *xdr.Writer) (int, error) { + if l := len(o.Reason); l > 1024 { + return xw.Tot(), xdr.ElementSizeExceeded("Reason", l, 1024) + } + xw.WriteString(o.Reason) + xw.WriteUint32(uint32(o.Code)) + return xw.Tot(), xw.Error() +} + +func (o *CloseMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *CloseMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *CloseMessage) DecodeXDRFrom(xr *xdr.Reader) error { + o.Reason = xr.ReadStringMax(1024) + o.Code = int32(xr.ReadUint32()) + return xr.Error() +} + +/* + +EmptyMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct EmptyMessage { +} + +*/ + +func (o EmptyMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o EmptyMessage) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o EmptyMessage) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o EmptyMessage) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o EmptyMessage) EncodeXDRInto(xw *xdr.Writer) (int, error) { + return xw.Tot(), xw.Error() +} + +func (o *EmptyMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *EmptyMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *EmptyMessage) DecodeXDRFrom(xr *xdr.Reader) error { + return xr.Error() +} diff --git a/lib/protocol/nativemodel_darwin.go b/lib/protocol/nativemodel_darwin.go new file mode 100644 index 000000000..eb755a6e4 --- /dev/null +++ b/lib/protocol/nativemodel_darwin.go @@ -0,0 +1,40 @@ +// Copyright (C) 2014 The Protocol Authors. + +// +build darwin + +package protocol + +// Darwin uses NFD normalization + +import "golang.org/x/text/unicode/norm" + +type nativeModel struct { + next Model +} + +func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { + for i := range files { + files[i].Name = norm.NFD.String(files[i].Name) + } + m.next.Index(deviceID, folder, files, flags, options) +} + +func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { + for i := range files { + files[i].Name = norm.NFD.String(files[i].Name) + } + m.next.IndexUpdate(deviceID, folder, files, flags, options) +} + +func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { + name = norm.NFD.String(name) + return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) +} + +func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { + m.next.ClusterConfig(deviceID, config) +} + +func (m nativeModel) Close(deviceID DeviceID, err error) { + m.next.Close(deviceID, err) +} diff --git a/lib/protocol/nativemodel_unix.go b/lib/protocol/nativemodel_unix.go new file mode 100644 index 000000000..0611865e1 --- /dev/null +++ b/lib/protocol/nativemodel_unix.go @@ -0,0 +1,31 @@ +// Copyright (C) 2014 The Protocol Authors. + +// +build !windows,!darwin + +package protocol + +// Normal Unixes uses NFC and slashes, which is the wire format. + +type nativeModel struct { + next Model +} + +func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { + m.next.Index(deviceID, folder, files, flags, options) +} + +func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { + m.next.IndexUpdate(deviceID, folder, files, flags, options) +} + +func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { + return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) +} + +func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { + m.next.ClusterConfig(deviceID, config) +} + +func (m nativeModel) Close(deviceID DeviceID, err error) { + m.next.Close(deviceID, err) +} diff --git a/lib/protocol/nativemodel_windows.go b/lib/protocol/nativemodel_windows.go new file mode 100644 index 000000000..36a1d2749 --- /dev/null +++ b/lib/protocol/nativemodel_windows.go @@ -0,0 +1,63 @@ +// Copyright (C) 2014 The Protocol Authors. + +// +build windows + +package protocol + +// Windows uses backslashes as file separator and disallows a bunch of +// characters in the filename + +import ( + "path/filepath" + "strings" +) + +var disallowedCharacters = string([]rune{ + '<', '>', ':', '"', '|', '?', '*', + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, + 31, +}) + +type nativeModel struct { + next Model +} + +func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { + fixupFiles(folder, files) + m.next.Index(deviceID, folder, files, flags, options) +} + +func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) { + fixupFiles(folder, files) + m.next.IndexUpdate(deviceID, folder, files, flags, options) +} + +func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error { + name = filepath.FromSlash(name) + return m.next.Request(deviceID, folder, name, offset, hash, flags, options, buf) +} + +func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) { + m.next.ClusterConfig(deviceID, config) +} + +func (m nativeModel) Close(deviceID DeviceID, err error) { + m.next.Close(deviceID, err) +} + +func fixupFiles(folder string, files []FileInfo) { + for i, f := range files { + if strings.ContainsAny(f.Name, disallowedCharacters) { + if f.IsDeleted() { + // Don't complain if the file is marked as deleted, since it + // can't possibly exist here anyway. + continue + } + files[i].Flags |= FlagInvalid + l.Warnf("File name %q (folder %q) contains invalid characters; marked as invalid.", f.Name, folder) + } + files[i].Name = filepath.FromSlash(files[i].Name) + } +} diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go new file mode 100644 index 000000000..4c1364eaf --- /dev/null +++ b/lib/protocol/protocol.go @@ -0,0 +1,782 @@ +// Copyright (C) 2014 The Protocol Authors. + +package protocol + +import ( + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "io" + "sync" + "time" + + lz4 "github.com/bkaradzic/go-lz4" +) + +const ( + // Data block size (128 KiB) + BlockSize = 128 << 10 + + // We reject messages larger than this when encountered on the wire. (64 MiB) + MaxMessageLen = 64 << 20 +) + +const ( + messageTypeClusterConfig = 0 + messageTypeIndex = 1 + messageTypeRequest = 2 + messageTypeResponse = 3 + messageTypePing = 4 + messageTypeIndexUpdate = 6 + messageTypeClose = 7 +) + +const ( + stateInitial = iota + stateReady +) + +// FileInfo flags +const ( + FlagDeleted uint32 = 1 << 12 + FlagInvalid = 1 << 13 + FlagDirectory = 1 << 14 + FlagNoPermBits = 1 << 15 + FlagSymlink = 1 << 16 + FlagSymlinkMissingTarget = 1 << 17 + + FlagsAll = (1 << 18) - 1 + + SymlinkTypeMask = FlagDirectory | FlagSymlinkMissingTarget +) + +// IndexMessage message flags (for IndexUpdate) +const ( + FlagIndexTemporary uint32 = 1 << iota +) + +// Request message flags +const ( + FlagRequestTemporary uint32 = 1 << iota +) + +// ClusterConfigMessage.Folders.Devices flags +const ( + FlagShareTrusted uint32 = 1 << 0 + FlagShareReadOnly = 1 << 1 + FlagIntroducer = 1 << 2 + FlagShareBits = 0x000000ff +) + +var ( + ErrClosed = errors.New("connection closed") + ErrTimeout = errors.New("read timeout") +) + +// Specific variants of empty messages... +type pingMessage struct{ EmptyMessage } + +type Model interface { + // An index was received from the peer device + Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) + // An index update was received from the peer device + IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) + // A request was made by the peer device + Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error + // A cluster configuration message was received + ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) + // The peer device closed the connection + Close(deviceID DeviceID, err error) +} + +type Connection interface { + Start() + ID() DeviceID + Name() string + Index(folder string, files []FileInfo, flags uint32, options []Option) error + IndexUpdate(folder string, files []FileInfo, flags uint32, options []Option) error + Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) + ClusterConfig(config ClusterConfigMessage) + Statistics() Statistics +} + +type rawConnection struct { + id DeviceID + name string + receiver Model + + cr *countingReader + cw *countingWriter + + awaiting [4096]chan asyncResult + awaitingMut sync.Mutex + + idxMut sync.Mutex // ensures serialization of Index calls + + nextID chan int + outbox chan hdrMsg + closed chan struct{} + once sync.Once + pool sync.Pool + compression Compression + + rdbuf0 []byte // used & reused by readMessage + rdbuf1 []byte // used & reused by readMessage +} + +type asyncResult struct { + val []byte + err error +} + +type hdrMsg struct { + hdr header + msg encodable + done chan struct{} +} + +type encodable interface { + AppendXDR([]byte) ([]byte, error) +} + +type isEofer interface { + IsEOF() bool +} + +const ( + // We make sure to send a message at least this often, by triggering pings. + PingSendInterval = 90 * time.Second + // If we haven't received a message from the other side for this long, close the connection. + ReceiveTimeout = 300 * time.Second +) + +func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiver Model, name string, compress Compression) Connection { + cr := &countingReader{Reader: reader} + cw := &countingWriter{Writer: writer} + + c := rawConnection{ + id: deviceID, + name: name, + receiver: nativeModel{receiver}, + cr: cr, + cw: cw, + outbox: make(chan hdrMsg), + nextID: make(chan int), + closed: make(chan struct{}), + pool: sync.Pool{ + New: func() interface{} { + return make([]byte, BlockSize) + }, + }, + compression: compress, + } + + return wireFormatConnection{&c} +} + +// Start creates the goroutines for sending and receiving of messages. It must +// be called exactly once after creating a connection. +func (c *rawConnection) Start() { + go c.readerLoop() + go c.writerLoop() + go c.pingSender() + go c.pingReceiver() + go c.idGenerator() +} + +func (c *rawConnection) ID() DeviceID { + return c.id +} + +func (c *rawConnection) Name() string { + return c.name +} + +// Index writes the list of file information to the connected peer device +func (c *rawConnection) Index(folder string, idx []FileInfo, flags uint32, options []Option) error { + select { + case <-c.closed: + return ErrClosed + default: + } + c.idxMut.Lock() + c.send(-1, messageTypeIndex, IndexMessage{ + Folder: folder, + Files: idx, + Flags: flags, + Options: options, + }, nil) + c.idxMut.Unlock() + return nil +} + +// IndexUpdate writes the list of file information to the connected peer device as an update +func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32, options []Option) error { + select { + case <-c.closed: + return ErrClosed + default: + } + c.idxMut.Lock() + c.send(-1, messageTypeIndexUpdate, IndexMessage{ + Folder: folder, + Files: idx, + Flags: flags, + Options: options, + }, nil) + c.idxMut.Unlock() + return nil +} + +// Request returns the bytes for the specified block after fetching them from the connected peer. +func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { + var id int + select { + case id = <-c.nextID: + case <-c.closed: + return nil, ErrClosed + } + + c.awaitingMut.Lock() + if ch := c.awaiting[id]; ch != nil { + panic("id taken") + } + rc := make(chan asyncResult, 1) + c.awaiting[id] = rc + c.awaitingMut.Unlock() + + ok := c.send(id, messageTypeRequest, RequestMessage{ + Folder: folder, + Name: name, + Offset: offset, + Size: int32(size), + Hash: hash, + Flags: flags, + Options: options, + }, nil) + if !ok { + return nil, ErrClosed + } + + res, ok := <-rc + if !ok { + return nil, ErrClosed + } + return res.val, res.err +} + +// ClusterConfig send the cluster configuration message to the peer and returns any error +func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) { + c.send(-1, messageTypeClusterConfig, config, nil) +} + +func (c *rawConnection) ping() bool { + var id int + select { + case id = <-c.nextID: + case <-c.closed: + return false + } + + return c.send(id, messageTypePing, nil, nil) +} + +func (c *rawConnection) readerLoop() (err error) { + defer func() { + c.close(err) + }() + + state := stateInitial + for { + select { + case <-c.closed: + return ErrClosed + default: + } + + hdr, msg, err := c.readMessage() + if err != nil { + return err + } + + switch msg := msg.(type) { + case ClusterConfigMessage: + if state != stateInitial { + return fmt.Errorf("protocol error: cluster config message in state %d", state) + } + go c.receiver.ClusterConfig(c.id, msg) + state = stateReady + + case IndexMessage: + switch hdr.msgType { + case messageTypeIndex: + if state != stateReady { + return fmt.Errorf("protocol error: index message in state %d", state) + } + c.handleIndex(msg) + state = stateReady + + case messageTypeIndexUpdate: + if state != stateReady { + return fmt.Errorf("protocol error: index update message in state %d", state) + } + c.handleIndexUpdate(msg) + state = stateReady + } + + case RequestMessage: + if state != stateReady { + return fmt.Errorf("protocol error: request message in state %d", state) + } + // Requests are handled asynchronously + go c.handleRequest(hdr.msgID, msg) + + case ResponseMessage: + if state != stateReady { + return fmt.Errorf("protocol error: response message in state %d", state) + } + c.handleResponse(hdr.msgID, msg) + + case pingMessage: + if state != stateReady { + return fmt.Errorf("protocol error: ping message in state %d", state) + } + // Nothing + + case CloseMessage: + return errors.New(msg.Reason) + + default: + return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType) + } + } +} + +func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) { + if cap(c.rdbuf0) < 8 { + c.rdbuf0 = make([]byte, 8) + } else { + c.rdbuf0 = c.rdbuf0[:8] + } + _, err = io.ReadFull(c.cr, c.rdbuf0) + if err != nil { + return + } + + hdr = decodeHeader(binary.BigEndian.Uint32(c.rdbuf0[0:4])) + msglen := int(binary.BigEndian.Uint32(c.rdbuf0[4:8])) + + if debug { + l.Debugf("read header %v (msglen=%d)", hdr, msglen) + } + + if msglen > MaxMessageLen { + err = fmt.Errorf("message length %d exceeds maximum %d", msglen, MaxMessageLen) + return + } + + if hdr.version != 0 { + err = fmt.Errorf("unknown protocol version 0x%x", hdr.version) + return + } + + if cap(c.rdbuf0) < msglen { + c.rdbuf0 = make([]byte, msglen) + } else { + c.rdbuf0 = c.rdbuf0[:msglen] + } + _, err = io.ReadFull(c.cr, c.rdbuf0) + if err != nil { + return + } + + if debug { + l.Debugf("read %d bytes", len(c.rdbuf0)) + } + + msgBuf := c.rdbuf0 + if hdr.compression && msglen > 0 { + c.rdbuf1 = c.rdbuf1[:cap(c.rdbuf1)] + c.rdbuf1, err = lz4.Decode(c.rdbuf1, c.rdbuf0) + if err != nil { + return + } + msgBuf = c.rdbuf1 + if debug { + l.Debugf("decompressed to %d bytes", len(msgBuf)) + } + } + + if debug { + if len(msgBuf) > 1024 { + l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024])) + } else { + l.Debugf("message data:\n%s", hex.Dump(msgBuf)) + } + } + + // We check each returned error for the XDRError.IsEOF() method. + // IsEOF()==true here means that the message contained fewer fields than + // expected. It does not signify an EOF on the socket, because we've + // successfully read a size value and that many bytes already. New fields + // we expected but the other peer didn't send should be interpreted as + // zero/nil, and if that's not valid we'll verify it somewhere else. + + switch hdr.msgType { + case messageTypeIndex, messageTypeIndexUpdate: + var idx IndexMessage + err = idx.UnmarshalXDR(msgBuf) + if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() { + err = nil + } + msg = idx + + case messageTypeRequest: + var req RequestMessage + err = req.UnmarshalXDR(msgBuf) + if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() { + err = nil + } + msg = req + + case messageTypeResponse: + var resp ResponseMessage + err = resp.UnmarshalXDR(msgBuf) + if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() { + err = nil + } + msg = resp + + case messageTypePing: + msg = pingMessage{} + + case messageTypeClusterConfig: + var cc ClusterConfigMessage + err = cc.UnmarshalXDR(msgBuf) + if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() { + err = nil + } + msg = cc + + case messageTypeClose: + var cm CloseMessage + err = cm.UnmarshalXDR(msgBuf) + if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() { + err = nil + } + msg = cm + + default: + err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType) + } + + return +} + +func (c *rawConnection) handleIndex(im IndexMessage) { + if debug { + l.Debugf("Index(%v, %v, %d file, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options) + } + c.receiver.Index(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options) +} + +func (c *rawConnection) handleIndexUpdate(im IndexMessage) { + if debug { + l.Debugf("queueing IndexUpdate(%v, %v, %d files, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options) + } + c.receiver.IndexUpdate(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options) +} + +func filterIndexMessageFiles(fs []FileInfo) []FileInfo { + var out []FileInfo + for i, f := range fs { + switch f.Name { + case "", ".", "..", "/": // A few obviously invalid filenames + l.Infof("Dropping invalid filename %q from incoming index", f.Name) + if out == nil { + // Most incoming updates won't contain anything invalid, so we + // delay the allocation and copy to output slice until we + // really need to do it, then copy all the so var valid files + // to it. + out = make([]FileInfo, i, len(fs)-1) + copy(out, fs) + } + default: + if out != nil { + out = append(out, f) + } + } + } + if out != nil { + return out + } + return fs +} + +func (c *rawConnection) handleRequest(msgID int, req RequestMessage) { + size := int(req.Size) + usePool := size <= BlockSize + + var buf []byte + var done chan struct{} + + if usePool { + buf = c.pool.Get().([]byte)[:size] + done = make(chan struct{}) + } else { + buf = make([]byte, size) + } + + err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), req.Hash, req.Flags, req.Options, buf) + if err != nil { + c.send(msgID, messageTypeResponse, ResponseMessage{ + Data: nil, + Code: errorToCode(err), + }, done) + } else { + c.send(msgID, messageTypeResponse, ResponseMessage{ + Data: buf, + Code: errorToCode(err), + }, done) + } + + if usePool { + <-done + c.pool.Put(buf) + } +} + +func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) { + c.awaitingMut.Lock() + if rc := c.awaiting[msgID]; rc != nil { + c.awaiting[msgID] = nil + rc <- asyncResult{resp.Data, codeToError(resp.Code)} + close(rc) + } + c.awaitingMut.Unlock() +} + +func (c *rawConnection) handlePong(msgID int) { + c.awaitingMut.Lock() + if rc := c.awaiting[msgID]; rc != nil { + c.awaiting[msgID] = nil + rc <- asyncResult{} + close(rc) + } + c.awaitingMut.Unlock() +} + +func (c *rawConnection) send(msgID int, msgType int, msg encodable, done chan struct{}) bool { + if msgID < 0 { + select { + case id := <-c.nextID: + msgID = id + case <-c.closed: + return false + } + } + + hdr := header{ + version: 0, + msgID: msgID, + msgType: msgType, + } + + select { + case c.outbox <- hdrMsg{hdr, msg, done}: + return true + case <-c.closed: + return false + } +} + +func (c *rawConnection) writerLoop() { + var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused + var uncBuf []byte // buffer for uncompressed message, kept and reused + for { + var tempBuf []byte + var err error + + select { + case hm := <-c.outbox: + if hm.msg != nil { + // Uncompressed message in uncBuf + uncBuf, err = hm.msg.AppendXDR(uncBuf[:0]) + if hm.done != nil { + close(hm.done) + } + if err != nil { + c.close(err) + return + } + + compress := false + switch c.compression { + case CompressAlways: + compress = true + case CompressMetadata: + compress = hm.hdr.msgType != messageTypeResponse + } + + if compress && len(uncBuf) >= compressionThreshold { + // Use compression for large messages + hm.hdr.compression = true + + // Make sure we have enough space for the compressed message plus header in msgBug + msgBuf = msgBuf[:cap(msgBuf)] + if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) { + msgBuf = make([]byte, maxLen) + } + + // Compressed is written to msgBuf, we keep tb for the length only + tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf) + binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf))) + msgBuf = msgBuf[0 : len(tempBuf)+8] + + if debug { + l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf)) + } + } else { + // No point in compressing very short messages + hm.hdr.compression = false + + msgBuf = msgBuf[:cap(msgBuf)] + if l := len(uncBuf) + 8; l > len(msgBuf) { + msgBuf = make([]byte, l) + } + + binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf))) + msgBuf = msgBuf[0 : len(uncBuf)+8] + copy(msgBuf[8:], uncBuf) + + if debug { + l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf)) + } + } + } else { + if debug { + l.Debugf("write empty message; %v", hm.hdr) + } + binary.BigEndian.PutUint32(msgBuf[4:8], 0) + msgBuf = msgBuf[:8] + } + + binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr)) + + if err == nil { + var n int + n, err = c.cw.Write(msgBuf) + if debug { + l.Debugf("wrote %d bytes on the wire", n) + } + } + if err != nil { + c.close(err) + return + } + case <-c.closed: + return + } + } +} + +func (c *rawConnection) close(err error) { + c.once.Do(func() { + close(c.closed) + + c.awaitingMut.Lock() + for i, ch := range c.awaiting { + if ch != nil { + close(ch) + c.awaiting[i] = nil + } + } + c.awaitingMut.Unlock() + + go c.receiver.Close(c.id, err) + }) +} + +func (c *rawConnection) idGenerator() { + nextID := 0 + for { + nextID = (nextID + 1) & 0xfff + select { + case c.nextID <- nextID: + case <-c.closed: + return + } + } +} + +// The pingSender makes sure that we've sent a message within the last +// PingSendInterval. If we already have something sent in the last +// PingSendInterval/2, we do nothing. Otherwise we send a ping message. This +// results in an effecting ping interval of somewhere between +// PingSendInterval/2 and PingSendInterval. +func (c *rawConnection) pingSender() { + ticker := time.Tick(PingSendInterval / 2) + + for { + select { + case <-ticker: + d := time.Since(c.cw.Last()) + if d < PingSendInterval/2 { + if debug { + l.Debugln(c.id, "ping skipped after wr", d) + } + continue + } + + if debug { + l.Debugln(c.id, "ping -> after", d) + } + c.ping() + + case <-c.closed: + return + } + } +} + +// The pingReciever checks that we've received a message (any message will do, +// but we expect pings in the absence of other messages) within the last +// ReceiveTimeout. If not, we close the connection with an ErrTimeout. +func (c *rawConnection) pingReceiver() { + ticker := time.Tick(ReceiveTimeout / 2) + + for { + select { + case <-ticker: + d := time.Since(c.cr.Last()) + if d > ReceiveTimeout { + if debug { + l.Debugln(c.id, "ping timeout", d) + } + c.close(ErrTimeout) + } + + if debug { + l.Debugln(c.id, "last read within", d) + } + + case <-c.closed: + return + } + } +} + +type Statistics struct { + At time.Time + InBytesTotal int64 + OutBytesTotal int64 +} + +func (c *rawConnection) Statistics() Statistics { + return Statistics{ + At: time.Now(), + InBytesTotal: c.cr.Tot(), + OutBytesTotal: c.cw.Tot(), + } +} diff --git a/lib/protocol/protocol_test.go b/lib/protocol/protocol_test.go new file mode 100644 index 000000000..8a4708843 --- /dev/null +++ b/lib/protocol/protocol_test.go @@ -0,0 +1,316 @@ +// Copyright (C) 2014 The Protocol Authors. + +package protocol + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "os" + "reflect" + "strings" + "testing" + "testing/quick" + + "github.com/calmh/xdr" +) + +var ( + c0ID = NewDeviceID([]byte{1}) + c1ID = NewDeviceID([]byte{2}) +) + +func TestHeaderFunctions(t *testing.T) { + f := func(ver, id, typ int) bool { + ver = int(uint(ver) % 16) + id = int(uint(id) % 4096) + typ = int(uint(typ) % 256) + h0 := header{version: ver, msgID: id, msgType: typ} + h1 := decodeHeader(encodeHeader(h0)) + return h0 == h1 + } + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +} + +func TestHeaderLayout(t *testing.T) { + var e, a uint32 + + // Version are the first four bits + e = 0xf0000000 + a = encodeHeader(header{version: 0xf}) + if a != e { + t.Errorf("Header layout incorrect; %08x != %08x", a, e) + } + + // Message ID are the following 12 bits + e = 0x0fff0000 + a = encodeHeader(header{msgID: 0xfff}) + if a != e { + t.Errorf("Header layout incorrect; %08x != %08x", a, e) + } + + // Type are the last 8 bits before reserved + e = 0x0000ff00 + a = encodeHeader(header{msgType: 0xff}) + if a != e { + t.Errorf("Header layout incorrect; %08x != %08x", a, e) + } +} + +func TestPing(t *testing.T) { + ar, aw := io.Pipe() + br, bw := io.Pipe() + + c0 := NewConnection(c0ID, ar, bw, newTestModel(), "name", CompressAlways).(wireFormatConnection).next.(*rawConnection) + c0.Start() + c1 := NewConnection(c1ID, br, aw, newTestModel(), "name", CompressAlways).(wireFormatConnection).next.(*rawConnection) + c1.Start() + c0.ClusterConfig(ClusterConfigMessage{}) + c1.ClusterConfig(ClusterConfigMessage{}) + + if ok := c0.ping(); !ok { + t.Error("c0 ping failed") + } + if ok := c1.ping(); !ok { + t.Error("c1 ping failed") + } +} + +func TestVersionErr(t *testing.T) { + m0 := newTestModel() + m1 := newTestModel() + + ar, aw := io.Pipe() + br, bw := io.Pipe() + + c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).next.(*rawConnection) + c0.Start() + c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways) + c1.Start() + c0.ClusterConfig(ClusterConfigMessage{}) + c1.ClusterConfig(ClusterConfigMessage{}) + + w := xdr.NewWriter(c0.cw) + w.WriteUint32(encodeHeader(header{ + version: 2, + msgID: 0, + msgType: 0, + })) + w.WriteUint32(0) // Avoids reader closing due to EOF + + if !m1.isClosed() { + t.Error("Connection should close due to unknown version") + } +} + +func TestTypeErr(t *testing.T) { + m0 := newTestModel() + m1 := newTestModel() + + ar, aw := io.Pipe() + br, bw := io.Pipe() + + c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).next.(*rawConnection) + c0.Start() + c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways) + c1.Start() + c0.ClusterConfig(ClusterConfigMessage{}) + c1.ClusterConfig(ClusterConfigMessage{}) + + w := xdr.NewWriter(c0.cw) + w.WriteUint32(encodeHeader(header{ + version: 0, + msgID: 0, + msgType: 42, + })) + w.WriteUint32(0) // Avoids reader closing due to EOF + + if !m1.isClosed() { + t.Error("Connection should close due to unknown message type") + } +} + +func TestClose(t *testing.T) { + m0 := newTestModel() + m1 := newTestModel() + + ar, aw := io.Pipe() + br, bw := io.Pipe() + + c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).next.(*rawConnection) + c0.Start() + c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways) + c1.Start() + c0.ClusterConfig(ClusterConfigMessage{}) + c1.ClusterConfig(ClusterConfigMessage{}) + + c0.close(nil) + + <-c0.closed + if !m0.isClosed() { + t.Fatal("Connection should be closed") + } + + // None of these should panic, some should return an error + + if c0.ping() { + t.Error("Ping should not return true") + } + + c0.Index("default", nil, 0, nil) + c0.Index("default", nil, 0, nil) + + if _, err := c0.Request("default", "foo", 0, 0, nil, 0, nil); err == nil { + t.Error("Request should return an error") + } +} + +func TestElementSizeExceededNested(t *testing.T) { + m := ClusterConfigMessage{ + Folders: []Folder{ + {ID: "longstringlongstringlongstringinglongstringlongstringlonlongstringlongstringlon"}, + }, + } + _, err := m.EncodeXDR(ioutil.Discard) + if err == nil { + t.Errorf("ID length %d > max 64, but no error", len(m.Folders[0].ID)) + } +} + +func TestMarshalIndexMessage(t *testing.T) { + var quickCfg = &quick.Config{MaxCountScale: 10} + if testing.Short() { + quickCfg = nil + } + + f := func(m1 IndexMessage) bool { + for i, f := range m1.Files { + m1.Files[i].CachedSize = 0 + for j := range f.Blocks { + f.Blocks[j].Offset = 0 + if len(f.Blocks[j].Hash) == 0 { + f.Blocks[j].Hash = nil + } + } + } + + return testMarshal(t, "index", &m1, &IndexMessage{}) + } + + if err := quick.Check(f, quickCfg); err != nil { + t.Error(err) + } +} + +func TestMarshalRequestMessage(t *testing.T) { + var quickCfg = &quick.Config{MaxCountScale: 10} + if testing.Short() { + quickCfg = nil + } + + f := func(m1 RequestMessage) bool { + return testMarshal(t, "request", &m1, &RequestMessage{}) + } + + if err := quick.Check(f, quickCfg); err != nil { + t.Error(err) + } +} + +func TestMarshalResponseMessage(t *testing.T) { + var quickCfg = &quick.Config{MaxCountScale: 10} + if testing.Short() { + quickCfg = nil + } + + f := func(m1 ResponseMessage) bool { + if len(m1.Data) == 0 { + m1.Data = nil + } + return testMarshal(t, "response", &m1, &ResponseMessage{}) + } + + if err := quick.Check(f, quickCfg); err != nil { + t.Error(err) + } +} + +func TestMarshalClusterConfigMessage(t *testing.T) { + var quickCfg = &quick.Config{MaxCountScale: 10} + if testing.Short() { + quickCfg = nil + } + + f := func(m1 ClusterConfigMessage) bool { + return testMarshal(t, "clusterconfig", &m1, &ClusterConfigMessage{}) + } + + if err := quick.Check(f, quickCfg); err != nil { + t.Error(err) + } +} + +func TestMarshalCloseMessage(t *testing.T) { + var quickCfg = &quick.Config{MaxCountScale: 10} + if testing.Short() { + quickCfg = nil + } + + f := func(m1 CloseMessage) bool { + return testMarshal(t, "close", &m1, &CloseMessage{}) + } + + if err := quick.Check(f, quickCfg); err != nil { + t.Error(err) + } +} + +type message interface { + EncodeXDR(io.Writer) (int, error) + DecodeXDR(io.Reader) error +} + +func testMarshal(t *testing.T, prefix string, m1, m2 message) bool { + var buf bytes.Buffer + + failed := func(bc []byte) { + bs, _ := json.MarshalIndent(m1, "", " ") + ioutil.WriteFile(prefix+"-1.txt", bs, 0644) + bs, _ = json.MarshalIndent(m2, "", " ") + ioutil.WriteFile(prefix+"-2.txt", bs, 0644) + if len(bc) > 0 { + f, _ := os.Create(prefix + "-data.txt") + fmt.Fprint(f, hex.Dump(bc)) + f.Close() + } + } + + _, err := m1.EncodeXDR(&buf) + if err != nil && strings.Contains(err.Error(), "exceeds size") { + return true + } + if err != nil { + failed(nil) + t.Fatal(err) + } + + bc := make([]byte, len(buf.Bytes())) + copy(bc, buf.Bytes()) + + err = m2.DecodeXDR(&buf) + if err != nil { + failed(bc) + t.Fatal(err) + } + + ok := reflect.DeepEqual(m1, m2) + if !ok { + failed(bc) + } + return ok +} diff --git a/lib/protocol/vector.go b/lib/protocol/vector.go new file mode 100644 index 000000000..edd156143 --- /dev/null +++ b/lib/protocol/vector.go @@ -0,0 +1,115 @@ +// Copyright (C) 2015 The Protocol Authors. + +package protocol + +// The Vector type represents a version vector. The zero value is a usable +// version vector. The vector has slice semantics and some operations on it +// are "append-like" in that they may return the same vector modified, or a +// new allocated Vector with the modified contents. +type Vector []Counter + +// Counter represents a single counter in the version vector. +type Counter struct { + ID uint64 + Value uint64 +} + +// Update returns a Vector with the index for the specific ID incremented by +// one. If it is possible, the vector v is updated and returned. If it is not, +// a copy will be created, updated and returned. +func (v Vector) Update(ID uint64) Vector { + for i := range v { + if v[i].ID == ID { + // Update an existing index + v[i].Value++ + return v + } else if v[i].ID > ID { + // Insert a new index + nv := make(Vector, len(v)+1) + copy(nv, v[:i]) + nv[i].ID = ID + nv[i].Value = 1 + copy(nv[i+1:], v[i:]) + return nv + } + } + // Append a new new index + return append(v, Counter{ID, 1}) +} + +// Merge returns the vector containing the maximum indexes from a and b. If it +// is possible, the vector a is updated and returned. If it is not, a copy +// will be created, updated and returned. +func (a Vector) Merge(b Vector) Vector { + var ai, bi int + for bi < len(b) { + if ai == len(a) { + // We've reach the end of a, all that remains are appends + return append(a, b[bi:]...) + } + + if a[ai].ID > b[bi].ID { + // The index from b should be inserted here + n := make(Vector, len(a)+1) + copy(n, a[:ai]) + n[ai] = b[bi] + copy(n[ai+1:], a[ai:]) + a = n + } + + if a[ai].ID == b[bi].ID { + if v := b[bi].Value; v > a[ai].Value { + a[ai].Value = v + } + } + + if bi < len(b) && a[ai].ID == b[bi].ID { + bi++ + } + ai++ + } + + return a +} + +// Copy returns an identical vector that is not shared with v. +func (v Vector) Copy() Vector { + nv := make(Vector, len(v)) + copy(nv, v) + return nv +} + +// Equal returns true when the two vectors are equivalent. +func (a Vector) Equal(b Vector) bool { + return a.Compare(b) == Equal +} + +// LesserEqual returns true when the two vectors are equivalent or a is Lesser +// than b. +func (a Vector) LesserEqual(b Vector) bool { + comp := a.Compare(b) + return comp == Lesser || comp == Equal +} + +// LesserEqual returns true when the two vectors are equivalent or a is Greater +// than b. +func (a Vector) GreaterEqual(b Vector) bool { + comp := a.Compare(b) + return comp == Greater || comp == Equal +} + +// Concurrent returns true when the two vectors are concrurrent. +func (a Vector) Concurrent(b Vector) bool { + comp := a.Compare(b) + return comp == ConcurrentGreater || comp == ConcurrentLesser +} + +// Counter returns the current value of the given counter ID. +func (v Vector) Counter(id uint64) uint64 { + for _, c := range v { + if c.ID == id { + return c.Value + } + } + return 0 +} diff --git a/lib/protocol/vector_compare.go b/lib/protocol/vector_compare.go new file mode 100644 index 000000000..9735ec9d1 --- /dev/null +++ b/lib/protocol/vector_compare.go @@ -0,0 +1,89 @@ +// Copyright (C) 2015 The Protocol Authors. + +package protocol + +// Ordering represents the relationship between two Vectors. +type Ordering int + +const ( + Equal Ordering = iota + Greater + Lesser + ConcurrentLesser + ConcurrentGreater +) + +// There's really no such thing as "concurrent lesser" and "concurrent +// greater" in version vectors, just "concurrent". But it's useful to be able +// to get a strict ordering between versions for stable sorts and so on, so we +// return both variants. The convenience method Concurrent() can be used to +// check for either case. + +// Compare returns the Ordering that describes a's relation to b. +func (a Vector) Compare(b Vector) Ordering { + var ai, bi int // index into a and b + var av, bv Counter // value at current index + + result := Equal + + for ai < len(a) || bi < len(b) { + var aMissing, bMissing bool + + if ai < len(a) { + av = a[ai] + } else { + av = Counter{} + aMissing = true + } + + if bi < len(b) { + bv = b[bi] + } else { + bv = Counter{} + bMissing = true + } + + switch { + case av.ID == bv.ID: + // We have a counter value for each side + if av.Value > bv.Value { + if result == Lesser { + return ConcurrentLesser + } + result = Greater + } else if av.Value < bv.Value { + if result == Greater { + return ConcurrentGreater + } + result = Lesser + } + + case !aMissing && av.ID < bv.ID || bMissing: + // Value is missing on the b side + if av.Value > 0 { + if result == Lesser { + return ConcurrentLesser + } + result = Greater + } + + case !bMissing && bv.ID < av.ID || aMissing: + // Value is missing on the a side + if bv.Value > 0 { + if result == Greater { + return ConcurrentGreater + } + result = Lesser + } + } + + if ai < len(a) && (av.ID <= bv.ID || bMissing) { + ai++ + } + if bi < len(b) && (bv.ID <= av.ID || aMissing) { + bi++ + } + } + + return result +} diff --git a/lib/protocol/vector_compare_test.go b/lib/protocol/vector_compare_test.go new file mode 100644 index 000000000..78b6abe43 --- /dev/null +++ b/lib/protocol/vector_compare_test.go @@ -0,0 +1,249 @@ +// Copyright (C) 2015 The Protocol Authors. + +package protocol + +import ( + "math" + "testing" +) + +func TestCompare(t *testing.T) { + testcases := []struct { + a, b Vector + r Ordering + }{ + // Empty vectors are identical + {Vector{}, Vector{}, Equal}, + {Vector{}, nil, Equal}, + {nil, Vector{}, Equal}, + {nil, Vector{Counter{42, 0}}, Equal}, + {Vector{}, Vector{Counter{42, 0}}, Equal}, + {Vector{Counter{42, 0}}, nil, Equal}, + {Vector{Counter{42, 0}}, Vector{}, Equal}, + + // Zero is the implied value for a missing Counter + { + Vector{Counter{42, 0}}, + Vector{Counter{77, 0}}, + Equal, + }, + + // Equal vectors are equal + { + Vector{Counter{42, 33}}, + Vector{Counter{42, 33}}, + Equal, + }, + { + Vector{Counter{42, 33}, Counter{77, 24}}, + Vector{Counter{42, 33}, Counter{77, 24}}, + Equal, + }, + + // These a-vectors are all greater than the b-vector + { + Vector{Counter{42, 1}}, + nil, + Greater, + }, + { + Vector{Counter{42, 1}}, + Vector{}, + Greater, + }, + { + Vector{Counter{0, 1}}, + Vector{Counter{0, 0}}, + Greater, + }, + { + Vector{Counter{42, 1}}, + Vector{Counter{42, 0}}, + Greater, + }, + { + Vector{Counter{math.MaxUint64, 1}}, + Vector{Counter{math.MaxUint64, 0}}, + Greater, + }, + { + Vector{Counter{0, math.MaxUint64}}, + Vector{Counter{0, 0}}, + Greater, + }, + { + Vector{Counter{42, math.MaxUint64}}, + Vector{Counter{42, 0}}, + Greater, + }, + { + Vector{Counter{math.MaxUint64, math.MaxUint64}}, + Vector{Counter{math.MaxUint64, 0}}, + Greater, + }, + { + Vector{Counter{0, math.MaxUint64}}, + Vector{Counter{0, math.MaxUint64 - 1}}, + Greater, + }, + { + Vector{Counter{42, math.MaxUint64}}, + Vector{Counter{42, math.MaxUint64 - 1}}, + Greater, + }, + { + Vector{Counter{math.MaxUint64, math.MaxUint64}}, + Vector{Counter{math.MaxUint64, math.MaxUint64 - 1}}, + Greater, + }, + { + Vector{Counter{42, 2}}, + Vector{Counter{42, 1}}, + Greater, + }, + { + Vector{Counter{22, 22}, Counter{42, 2}}, + Vector{Counter{22, 22}, Counter{42, 1}}, + Greater, + }, + { + Vector{Counter{42, 2}, Counter{77, 3}}, + Vector{Counter{42, 1}, Counter{77, 3}}, + Greater, + }, + { + Vector{Counter{22, 22}, Counter{42, 2}, Counter{77, 3}}, + Vector{Counter{22, 22}, Counter{42, 1}, Counter{77, 3}}, + Greater, + }, + { + Vector{Counter{22, 23}, Counter{42, 2}, Counter{77, 4}}, + Vector{Counter{22, 22}, Counter{42, 1}, Counter{77, 3}}, + Greater, + }, + + // These a-vectors are all lesser than the b-vector + {nil, Vector{Counter{42, 1}}, Lesser}, + {Vector{}, Vector{Counter{42, 1}}, Lesser}, + { + Vector{Counter{42, 0}}, + Vector{Counter{42, 1}}, + Lesser, + }, + { + Vector{Counter{42, 1}}, + Vector{Counter{42, 2}}, + Lesser, + }, + { + Vector{Counter{22, 22}, Counter{42, 1}}, + Vector{Counter{22, 22}, Counter{42, 2}}, + Lesser, + }, + { + Vector{Counter{42, 1}, Counter{77, 3}}, + Vector{Counter{42, 2}, Counter{77, 3}}, + Lesser, + }, + { + Vector{Counter{22, 22}, Counter{42, 1}, Counter{77, 3}}, + Vector{Counter{22, 22}, Counter{42, 2}, Counter{77, 3}}, + Lesser, + }, + { + Vector{Counter{22, 22}, Counter{42, 1}, Counter{77, 3}}, + Vector{Counter{22, 23}, Counter{42, 2}, Counter{77, 4}}, + Lesser, + }, + + // These are all in conflict + { + Vector{Counter{42, 2}}, + Vector{Counter{43, 1}}, + ConcurrentGreater, + }, + { + Vector{Counter{43, 1}}, + Vector{Counter{42, 2}}, + ConcurrentLesser, + }, + { + Vector{Counter{22, 23}, Counter{42, 1}}, + Vector{Counter{22, 22}, Counter{42, 2}}, + ConcurrentGreater, + }, + { + Vector{Counter{22, 21}, Counter{42, 2}}, + Vector{Counter{22, 22}, Counter{42, 1}}, + ConcurrentLesser, + }, + { + Vector{Counter{22, 21}, Counter{42, 2}, Counter{43, 1}}, + Vector{Counter{20, 1}, Counter{22, 22}, Counter{42, 1}}, + ConcurrentLesser, + }, + } + + for i, tc := range testcases { + // Test real Compare + if r := tc.a.Compare(tc.b); r != tc.r { + t.Errorf("%d: %+v.Compare(%+v) == %v (expected %v)", i, tc.a, tc.b, r, tc.r) + } + + // Test convenience functions + switch tc.r { + case Greater: + if tc.a.Equal(tc.b) { + t.Errorf("%+v == %+v", tc.a, tc.b) + } + if tc.a.Concurrent(tc.b) { + t.Errorf("%+v concurrent %+v", tc.a, tc.b) + } + if !tc.a.GreaterEqual(tc.b) { + t.Errorf("%+v not >= %+v", tc.a, tc.b) + } + if tc.a.LesserEqual(tc.b) { + t.Errorf("%+v <= %+v", tc.a, tc.b) + } + case Lesser: + if tc.a.Concurrent(tc.b) { + t.Errorf("%+v concurrent %+v", tc.a, tc.b) + } + if tc.a.Equal(tc.b) { + t.Errorf("%+v == %+v", tc.a, tc.b) + } + if tc.a.GreaterEqual(tc.b) { + t.Errorf("%+v >= %+v", tc.a, tc.b) + } + if !tc.a.LesserEqual(tc.b) { + t.Errorf("%+v not <= %+v", tc.a, tc.b) + } + case Equal: + if tc.a.Concurrent(tc.b) { + t.Errorf("%+v concurrent %+v", tc.a, tc.b) + } + if !tc.a.Equal(tc.b) { + t.Errorf("%+v not == %+v", tc.a, tc.b) + } + if !tc.a.GreaterEqual(tc.b) { + t.Errorf("%+v not <= %+v", tc.a, tc.b) + } + if !tc.a.LesserEqual(tc.b) { + t.Errorf("%+v not <= %+v", tc.a, tc.b) + } + case ConcurrentLesser, ConcurrentGreater: + if !tc.a.Concurrent(tc.b) { + t.Errorf("%+v not concurrent %+v", tc.a, tc.b) + } + if tc.a.Equal(tc.b) { + t.Errorf("%+v == %+v", tc.a, tc.b) + } + if tc.a.GreaterEqual(tc.b) { + t.Errorf("%+v >= %+v", tc.a, tc.b) + } + if tc.a.LesserEqual(tc.b) { + t.Errorf("%+v <= %+v", tc.a, tc.b) + } + } + } +} diff --git a/lib/protocol/vector_test.go b/lib/protocol/vector_test.go new file mode 100644 index 000000000..c01255e7a --- /dev/null +++ b/lib/protocol/vector_test.go @@ -0,0 +1,134 @@ +// Copyright (C) 2015 The Protocol Authors. + +package protocol + +import "testing" + +func TestUpdate(t *testing.T) { + var v Vector + + // Append + + v = v.Update(42) + expected := Vector{Counter{42, 1}} + + if v.Compare(expected) != Equal { + t.Errorf("Update error, %+v != %+v", v, expected) + } + + // Insert at front + + v = v.Update(36) + expected = Vector{Counter{36, 1}, Counter{42, 1}} + + if v.Compare(expected) != Equal { + t.Errorf("Update error, %+v != %+v", v, expected) + } + + // Insert in moddle + + v = v.Update(37) + expected = Vector{Counter{36, 1}, Counter{37, 1}, Counter{42, 1}} + + if v.Compare(expected) != Equal { + t.Errorf("Update error, %+v != %+v", v, expected) + } + + // Update existing + + v = v.Update(37) + expected = Vector{Counter{36, 1}, Counter{37, 2}, Counter{42, 1}} + + if v.Compare(expected) != Equal { + t.Errorf("Update error, %+v != %+v", v, expected) + } +} + +func TestCopy(t *testing.T) { + v0 := Vector{Counter{42, 1}} + v1 := v0.Copy() + v1.Update(42) + if v0.Compare(v1) != Lesser { + t.Errorf("Copy error, %+v should be ancestor of %+v", v0, v1) + } +} + +func TestMerge(t *testing.T) { + testcases := []struct { + a, b, m Vector + }{ + // No-ops + { + Vector{}, + Vector{}, + Vector{}, + }, + { + Vector{Counter{22, 1}, Counter{42, 1}}, + Vector{Counter{22, 1}, Counter{42, 1}}, + Vector{Counter{22, 1}, Counter{42, 1}}, + }, + + // Appends + { + Vector{}, + Vector{Counter{22, 1}, Counter{42, 1}}, + Vector{Counter{22, 1}, Counter{42, 1}}, + }, + { + Vector{Counter{22, 1}}, + Vector{Counter{42, 1}}, + Vector{Counter{22, 1}, Counter{42, 1}}, + }, + { + Vector{Counter{22, 1}}, + Vector{Counter{22, 1}, Counter{42, 1}}, + Vector{Counter{22, 1}, Counter{42, 1}}, + }, + + // Insert + { + Vector{Counter{22, 1}, Counter{42, 1}}, + Vector{Counter{22, 1}, Counter{23, 2}, Counter{42, 1}}, + Vector{Counter{22, 1}, Counter{23, 2}, Counter{42, 1}}, + }, + { + Vector{Counter{42, 1}}, + Vector{Counter{22, 1}}, + Vector{Counter{22, 1}, Counter{42, 1}}, + }, + + // Update + { + Vector{Counter{22, 1}, Counter{42, 2}}, + Vector{Counter{22, 2}, Counter{42, 1}}, + Vector{Counter{22, 2}, Counter{42, 2}}, + }, + + // All of the above + { + Vector{Counter{10, 1}, Counter{20, 2}, Counter{30, 1}}, + Vector{Counter{5, 1}, Counter{10, 2}, Counter{15, 1}, Counter{20, 1}, Counter{25, 1}, Counter{35, 1}}, + Vector{Counter{5, 1}, Counter{10, 2}, Counter{15, 1}, Counter{20, 2}, Counter{25, 1}, Counter{30, 1}, Counter{35, 1}}, + }, + } + + for i, tc := range testcases { + if m := tc.a.Merge(tc.b); m.Compare(tc.m) != Equal { + t.Errorf("%d: %+v.Merge(%+v) == %+v (expected %+v)", i, tc.a, tc.b, m, tc.m) + } + } +} + +func TestCounterValue(t *testing.T) { + v0 := Vector{Counter{42, 1}, Counter{64, 5}} + if v0.Counter(42) != 1 { + t.Error("Counter error, %d != %d", v0.Counter(42), 1) + } + if v0.Counter(64) != 5 { + t.Error("Counter error, %d != %d", v0.Counter(64), 5) + } + if v0.Counter(72) != 0 { + t.Error("Counter error, %d != %d", v0.Counter(72), 0) + } +} diff --git a/lib/protocol/vector_xdr.go b/lib/protocol/vector_xdr.go new file mode 100644 index 000000000..01efa7e4e --- /dev/null +++ b/lib/protocol/vector_xdr.go @@ -0,0 +1,43 @@ +// Copyright (C) 2015 The Protocol Authors. + +package protocol + +import "github.com/calmh/xdr" + +// This stuff is hacked up manually because genxdr doesn't support 'type +// Vector []Counter' declarations and it was tricky when I tried to add it... + +type xdrWriter interface { + WriteUint32(uint32) (int, error) + WriteUint64(uint64) (int, error) +} +type xdrReader interface { + ReadUint32() uint32 + ReadUint64() uint64 +} + +// EncodeXDRInto encodes the vector as an XDR object into the given XDR +// encoder. +func (v Vector) EncodeXDRInto(w xdrWriter) (int, error) { + w.WriteUint32(uint32(len(v))) + for i := range v { + w.WriteUint64(v[i].ID) + w.WriteUint64(v[i].Value) + } + return 4 + 16*len(v), nil +} + +// DecodeXDRFrom decodes the XDR objects from the given reader into itself. +func (v *Vector) DecodeXDRFrom(r xdrReader) error { + l := int(r.ReadUint32()) + if l > 1e6 { + return xdr.ElementSizeExceeded("number of counters", l, 1e6) + } + n := make(Vector, l) + for i := range n { + n[i].ID = r.ReadUint64() + n[i].Value = r.ReadUint64() + } + *v = n + return nil +} diff --git a/lib/protocol/wireformat.go b/lib/protocol/wireformat.go new file mode 100644 index 000000000..66b02ed6f --- /dev/null +++ b/lib/protocol/wireformat.go @@ -0,0 +1,60 @@ +// Copyright (C) 2014 The Protocol Authors. + +package protocol + +import ( + "path/filepath" + + "golang.org/x/text/unicode/norm" +) + +type wireFormatConnection struct { + next Connection +} + +func (c wireFormatConnection) Start() { + c.next.Start() +} + +func (c wireFormatConnection) ID() DeviceID { + return c.next.ID() +} + +func (c wireFormatConnection) Name() string { + return c.next.Name() +} + +func (c wireFormatConnection) Index(folder string, fs []FileInfo, flags uint32, options []Option) error { + var myFs = make([]FileInfo, len(fs)) + copy(myFs, fs) + + for i := range fs { + myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name)) + } + + return c.next.Index(folder, myFs, flags, options) +} + +func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo, flags uint32, options []Option) error { + var myFs = make([]FileInfo, len(fs)) + copy(myFs, fs) + + for i := range fs { + myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name)) + } + + return c.next.IndexUpdate(folder, myFs, flags, options) +} + +func (c wireFormatConnection) Request(folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) { + name = norm.NFC.String(filepath.ToSlash(name)) + return c.next.Request(folder, name, offset, size, hash, flags, options) +} + +func (c wireFormatConnection) ClusterConfig(config ClusterConfigMessage) { + c.next.ClusterConfig(config) +} + +func (c wireFormatConnection) Statistics() Statistics { + return c.next.Statistics() +}