Merge pull request #1215 from syncthing/new-proto-bc

Add some new protocol fields
This commit is contained in:
Audrius Butkevicius 2015-01-08 21:54:01 +00:00
commit 735d420d40
6 changed files with 151 additions and 14 deletions

2
Godeps/Godeps.json generated
View File

@ -23,7 +23,7 @@
},
{
"ImportPath": "github.com/calmh/xdr",
"Rev": "45c46b7db7ff83b8b9ee09bbd95f36ab50043ece"
"Rev": "214788d8fedfc310c18eca9ed12be408a5054cd5"
},
{
"ImportPath": "github.com/juju/ratelimit",

View File

@ -154,6 +154,10 @@ func (e XDRError) Error() string {
return "xdr " + e.op + ": " + e.err.Error()
}
func (e XDRError) IsEOF() bool {
return e.err == io.EOF
}
func (r *Reader) Error() error {
if r.err == nil {
return nil

View File

@ -21,8 +21,10 @@ package protocol
import "fmt"
type IndexMessage struct {
Folder string // max:64
Files []FileInfo
Folder string // max:64
Files []FileInfo
Flags uint32
Options []Option // max:64
}
type FileInfo struct {
@ -150,14 +152,18 @@ func (b BlockInfo) String() string {
}
type RequestMessage struct {
Folder string // max:64
Name string // max:8192
Offset uint64
Size uint32
Folder string // max:64
Name string // max:8192
Offset uint64
Size uint32
Hash []byte // max:64
Flags uint32
Options []Option // max:64
}
type ResponseMessage struct {
Data []byte
Data []byte
Error uint32
}
type ClusterConfigMessage struct {
@ -194,6 +200,7 @@ type Option struct {
type CloseMessage struct {
Reason string // max:1024
Code uint32
}
type EmptyMessage struct{}

View File

@ -30,11 +30,21 @@ IndexMessage Structure:
\ Zero or more FileInfo Structures \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Flags |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Options |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Zero or more Option Structures \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct IndexMessage {
string Folder<64>;
FileInfo Files<>;
unsigned int Flags;
Option Options<64>;
}
*/
@ -75,6 +85,17 @@ func (o IndexMessage) encodeXDR(xw *xdr.Writer) (int, error) {
return xw.Tot(), err
}
}
xw.WriteUint32(o.Flags)
if l := len(o.Options); l > 64 {
return xw.Tot(), xdr.ElementSizeExceeded("Options", l, 64)
}
xw.WriteUint32(uint32(len(o.Options)))
for i := range o.Options {
_, err := o.Options[i].encodeXDR(xw)
if err != nil {
return xw.Tot(), err
}
}
return xw.Tot(), xw.Error()
}
@ -96,6 +117,15 @@ func (o *IndexMessage) decodeXDR(xr *xdr.Reader) error {
for i := range o.Files {
(&o.Files[i]).decodeXDR(xr)
}
o.Flags = xr.ReadUint32()
_OptionsSize := int(xr.ReadUint32())
if _OptionsSize > 64 {
return xdr.ElementSizeExceeded("Options", _OptionsSize, 64)
}
o.Options = make([]Option, _OptionsSize)
for i := range o.Options {
(&o.Options[i]).decodeXDR(xr)
}
return xr.Error()
}
@ -412,6 +442,20 @@ RequestMessage Structure:
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Size |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Hash |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Hash (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Flags |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of Options |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Zero or more Option Structures \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct RequestMessage {
@ -419,6 +463,9 @@ struct RequestMessage {
string Name<8192>;
unsigned hyper Offset;
unsigned int Size;
opaque Hash<64>;
unsigned int Flags;
Option Options<64>;
}
*/
@ -458,6 +505,21 @@ func (o RequestMessage) encodeXDR(xw *xdr.Writer) (int, error) {
xw.WriteString(o.Name)
xw.WriteUint64(o.Offset)
xw.WriteUint32(o.Size)
if l := len(o.Hash); l > 64 {
return xw.Tot(), xdr.ElementSizeExceeded("Hash", l, 64)
}
xw.WriteBytes(o.Hash)
xw.WriteUint32(o.Flags)
if l := len(o.Options); l > 64 {
return xw.Tot(), xdr.ElementSizeExceeded("Options", l, 64)
}
xw.WriteUint32(uint32(len(o.Options)))
for i := range o.Options {
_, err := o.Options[i].encodeXDR(xw)
if err != nil {
return xw.Tot(), err
}
}
return xw.Tot(), xw.Error()
}
@ -477,6 +539,16 @@ func (o *RequestMessage) decodeXDR(xr *xdr.Reader) error {
o.Name = xr.ReadStringMax(8192)
o.Offset = xr.ReadUint64()
o.Size = xr.ReadUint32()
o.Hash = xr.ReadBytesMax(64)
o.Flags = xr.ReadUint32()
_OptionsSize := int(xr.ReadUint32())
if _OptionsSize > 64 {
return xdr.ElementSizeExceeded("Options", _OptionsSize, 64)
}
o.Options = make([]Option, _OptionsSize)
for i := range o.Options {
(&o.Options[i]).decodeXDR(xr)
}
return xr.Error()
}
@ -493,10 +565,13 @@ ResponseMessage Structure:
\ Data (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Error |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct ResponseMessage {
opaque Data<>;
unsigned int Error;
}
*/
@ -527,6 +602,7 @@ func (o ResponseMessage) AppendXDR(bs []byte) ([]byte, error) {
func (o ResponseMessage) encodeXDR(xw *xdr.Writer) (int, error) {
xw.WriteBytes(o.Data)
xw.WriteUint32(o.Error)
return xw.Tot(), xw.Error()
}
@ -543,6 +619,7 @@ func (o *ResponseMessage) UnmarshalXDR(bs []byte) error {
func (o *ResponseMessage) decodeXDR(xr *xdr.Reader) error {
o.Data = xr.ReadBytes()
o.Error = xr.ReadUint32()
return xr.Error()
}
@ -940,10 +1017,13 @@ CloseMessage Structure:
\ Reason (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Code |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct CloseMessage {
string Reason<1024>;
unsigned int Code;
}
*/
@ -977,6 +1057,7 @@ func (o CloseMessage) encodeXDR(xw *xdr.Writer) (int, error) {
return xw.Tot(), xdr.ElementSizeExceeded("Reason", l, 1024)
}
xw.WriteString(o.Reason)
xw.WriteUint32(o.Code)
return xw.Tot(), xw.Error()
}
@ -993,6 +1074,7 @@ func (o *CloseMessage) UnmarshalXDR(bs []byte) error {
func (o *CloseMessage) decodeXDR(xr *xdr.Reader) error {
o.Reason = xr.ReadStringMax(1024)
o.Code = xr.ReadUint32()
return xr.Error()
}

View File

@ -133,6 +133,10 @@ type encodable interface {
AppendXDR([]byte) ([]byte, error)
}
type isEofer interface {
IsEOF() bool
}
const (
pingTimeout = 30 * time.Second
pingIdleTime = 60 * time.Second
@ -183,7 +187,10 @@ func (c *rawConnection) Index(folder string, idx []FileInfo) error {
default:
}
c.idxMut.Lock()
c.send(-1, messageTypeIndex, IndexMessage{folder, idx})
c.send(-1, messageTypeIndex, IndexMessage{
Folder: folder,
Files: idx,
})
c.idxMut.Unlock()
return nil
}
@ -196,7 +203,10 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo) error {
default:
}
c.idxMut.Lock()
c.send(-1, messageTypeIndexUpdate, IndexMessage{folder, idx})
c.send(-1, messageTypeIndexUpdate, IndexMessage{
Folder: folder,
Files: idx,
})
c.idxMut.Unlock()
return nil
}
@ -218,7 +228,12 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
c.awaiting[id] = rc
c.awaitingMut.Unlock()
ok := c.send(id, messageTypeRequest, RequestMessage{folder, name, uint64(offset), uint32(size)})
ok := c.send(id, messageTypeRequest, RequestMessage{
Folder: folder,
Name: name,
Offset: uint64(offset),
Size: uint32(size),
})
if !ok {
return nil, ErrClosed
}
@ -341,6 +356,11 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
l.Debugf("read header %v (msglen=%d)", hdr, msglen)
}
if hdr.version != 0 {
err = fmt.Errorf("unknown protocol version 0x%x", hdr.version)
return
}
if cap(c.rdbuf0) < msglen {
c.rdbuf0 = make([]byte, msglen)
} else {
@ -376,20 +396,36 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
}
}
// We check each returned error for the XDRError.IsEOF() method.
// IsEOF()==true here means that the message contained fewer fields than
// expected. It does not signify an EOF on the socket, because we've
// successfully read a size value and that many bytes already. New fields
// we expected but the other peer didn't send should be interpreted as
// zero/nil, and if that's not valid we'll verify it somewhere else.
switch hdr.msgType {
case messageTypeIndex, messageTypeIndexUpdate:
var idx IndexMessage
err = idx.UnmarshalXDR(msgBuf)
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
msg = idx
case messageTypeRequest:
var req RequestMessage
err = req.UnmarshalXDR(msgBuf)
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
msg = req
case messageTypeResponse:
var resp ResponseMessage
err = resp.UnmarshalXDR(msgBuf)
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
msg = resp
case messageTypePing, messageTypePong:
@ -398,11 +434,17 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
case messageTypeClusterConfig:
var cc ClusterConfigMessage
err = cc.UnmarshalXDR(msgBuf)
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
msg = cc
case messageTypeClose:
var cm CloseMessage
err = cm.UnmarshalXDR(msgBuf)
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
msg = cm
default:
@ -429,7 +471,9 @@ func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
data, _ := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), int(req.Size))
c.send(msgID, messageTypeResponse, ResponseMessage{data})
c.send(msgID, messageTypeResponse, ResponseMessage{
Data: data,
})
}
func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {

View File

@ -189,7 +189,7 @@ func TestVersionErr(t *testing.T) {
msgID: 0,
msgType: 0,
}))
w.WriteUint32(0)
w.WriteUint32(0) // Avoids reader closing due to EOF
if !m1.isClosed() {
t.Error("Connection should close due to unknown version")
@ -212,7 +212,7 @@ func TestTypeErr(t *testing.T) {
msgID: 0,
msgType: 42,
}))
w.WriteUint32(0)
w.WriteUint32(0) // Avoids reader closing due to EOF
if !m1.isClosed() {
t.Error("Connection should close due to unknown message type")