diff --git a/lib/db/util.go b/lib/db/util.go index c67ca5508..5e9adb267 100644 --- a/lib/db/util.go +++ b/lib/db/util.go @@ -22,11 +22,10 @@ type FileInfoBatch struct { flushFn func([]protocol.FileInfo) error } +// NewFileInfoBatch returns a new FileInfoBatch that calls fn when it's time +// to flush. func NewFileInfoBatch(fn func([]protocol.FileInfo) error) *FileInfoBatch { - return &FileInfoBatch{ - infos: make([]protocol.FileInfo, 0, MaxBatchSizeFiles), - flushFn: fn, - } + return &FileInfoBatch{flushFn: fn} } func (b *FileInfoBatch) SetFlushFunc(fn func([]protocol.FileInfo) error) { @@ -34,6 +33,9 @@ func (b *FileInfoBatch) SetFlushFunc(fn func([]protocol.FileInfo) error) { } func (b *FileInfoBatch) Append(f protocol.FileInfo) { + if b.infos == nil { + b.infos = make([]protocol.FileInfo, 0, MaxBatchSizeFiles) + } b.infos = append(b.infos, f) b.size += f.ProtoSize() } @@ -61,7 +63,7 @@ func (b *FileInfoBatch) Flush() error { } func (b *FileInfoBatch) Reset() { - b.infos = b.infos[:0] + b.infos = nil b.size = 0 } diff --git a/lib/model/indexhandler_test.go b/lib/model/indexhandler_test.go new file mode 100644 index 000000000..ba571028d --- /dev/null +++ b/lib/model/indexhandler_test.go @@ -0,0 +1,84 @@ +// Copyright (C) 2024 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at https://mozilla.org/MPL/2.0/. + +package model_test + +import ( + "context" + "fmt" + "io" + "testing" + + "github.com/syncthing/syncthing/lib/db" + "github.com/syncthing/syncthing/lib/model/mocks" + "github.com/syncthing/syncthing/lib/protocol" + protomock "github.com/syncthing/syncthing/lib/protocol/mocks" + "github.com/syncthing/syncthing/lib/testutil" +) + +func TestIndexhandlerConcurrency(t *testing.T) { + // Verify that sending a lot of index update messages using the + // FileInfoBatch works and doesn't trigger the race detector. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ar, aw := io.Pipe() + br, bw := io.Pipe() + ci := &protomock.ConnectionInfo{} + + m1 := &mocks.Model{} + c1 := protocol.NewConnection(protocol.EmptyDeviceID, ar, bw, testutil.NoopCloser{}, m1, ci, protocol.CompressionNever, nil, nil) + c1.Start() + defer c1.Close(io.EOF) + + m2 := &mocks.Model{} + c2 := protocol.NewConnection(protocol.EmptyDeviceID, br, aw, testutil.NoopCloser{}, m2, ci, protocol.CompressionNever, nil, nil) + c2.Start() + defer c2.Close(io.EOF) + + c1.ClusterConfig(protocol.ClusterConfig{}) + c2.ClusterConfig(protocol.ClusterConfig{}) + c1.Index(ctx, "foo", nil) + c2.Index(ctx, "foo", nil) + + const msgs = 5e2 + const files = 1e3 + + recvd := 0 + m2.IndexUpdateCalls(func(_ protocol.Connection, idxUp *protocol.IndexUpdate) error { + for j := 0; j < files; j++ { + if n := idxUp.Files[j].Name; n != fmt.Sprintf("f%d-%d", recvd, j) { + t.Error("wrong filename", n) + } + } + recvd++ + return nil + }) + + b1 := db.NewFileInfoBatch(func(fs []protocol.FileInfo) error { + return c1.IndexUpdate(ctx, "foo", fs) + }) + for i := 0; i < msgs; i++ { + for j := 0; j < files; j++ { + b1.Append(protocol.FileInfo{ + Name: fmt.Sprintf("f%d-%d", i, j), + Blocks: []protocol.BlockInfo{{Hash: make([]byte, 32)}}, + }) + } + if err := b1.Flush(); err != nil { + t.Fatal(err) + } + } + + c1.Close(io.EOF) + c2.Close(io.EOF) + <-c1.Closed() + <-c2.Closed() + + if recvd != msgs-1 { + t.Error("didn't receive all expected messages") + } +} diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index ec408b6dc..fe5de5d7a 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -154,15 +154,35 @@ type RequestResponse interface { } type Connection interface { + // Send an index message. The connection will read and marshal the + // parameters asynchronously, so they should not be modified after + // calling Index(). + Index(ctx context.Context, folder string, files []FileInfo) error + + // Send an index update message. The connection will read and marshal + // the parameters asynchronously, so they should not be modified after + // calling IndexUpdate(). + IndexUpdate(ctx context.Context, folder string, files []FileInfo) error + + // Send a request message. The connection will read and marshal the + // parameters asynchronously, so they should not be modified after + // calling Request(). + Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) + + // Send a cluster configuration message. The connection will read and + // marshal the message asynchronously, so it should not be modified + // after calling ClusterConfig(). + ClusterConfig(config ClusterConfig) + + // Send a download progress message. The connection will read and + // marshal the parameters asynchronously, so they should not be modified + // after calling DownloadProgress(). + DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate) + Start() SetFolderPasswords(passwords map[string]string) Close(err error) DeviceID() DeviceID - Index(ctx context.Context, folder string, files []FileInfo) error - IndexUpdate(ctx context.Context, folder string, files []FileInfo) error - Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) - ClusterConfig(config ClusterConfig) - DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate) Statistics() Statistics Closed() <-chan struct{} ConnectionInfo diff --git a/lib/protocol/wireformat.go b/lib/protocol/wireformat.go index cca53cab1..f97a1ed08 100644 --- a/lib/protocol/wireformat.go +++ b/lib/protocol/wireformat.go @@ -14,25 +14,19 @@ type wireFormatConnection struct { } func (c wireFormatConnection) Index(ctx context.Context, folder string, fs []FileInfo) error { - var myFs = make([]FileInfo, len(fs)) - copy(myFs, fs) - for i := range fs { - myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name)) + fs[i].Name = norm.NFC.String(filepath.ToSlash(fs[i].Name)) } - return c.Connection.Index(ctx, folder, myFs) + return c.Connection.Index(ctx, folder, fs) } func (c wireFormatConnection) IndexUpdate(ctx context.Context, folder string, fs []FileInfo) error { - var myFs = make([]FileInfo, len(fs)) - copy(myFs, fs) - for i := range fs { - myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name)) + fs[i].Name = norm.NFC.String(filepath.ToSlash(fs[i].Name)) } - return c.Connection.IndexUpdate(ctx, folder, myFs) + return c.Connection.IndexUpdate(ctx, folder, fs) } func (c wireFormatConnection) Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {