From edc3a77b9824ffbba5ce7469748d5504522953bc Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Tue, 5 Apr 2022 21:32:06 +0200 Subject: [PATCH] lib/fs, lib/model: Add warning about kqueue resource usage (fixes #7855) (#8249) --- go.mod | 2 +- go.sum | 2 + lib/api/api_test.go | 18 +++- lib/fs/basicfs_watch_eventtypes_kqueue.go | 3 + lib/fs/basicfs_watch_notkqueue.go | 13 +++ lib/model/folder.go | 27 ++++++ lib/model/folder_summary.go | 107 ++++++++++++++++------ lib/model/mocks/folderSummaryService.go | 20 ++-- lib/syncthing/verboseservice.go | 15 ++- 9 files changed, 157 insertions(+), 50 deletions(-) create mode 100644 lib/fs/basicfs_watch_notkqueue.go diff --git a/go.mod b/go.mod index a2a55bc07..c1ae4cd37 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( golang.org/x/sys v0.0.0-20211013075003-97ac67df715c golang.org/x/text v0.3.7 golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac - golang.org/x/tools v0.1.6 + golang.org/x/tools v0.1.7 google.golang.org/protobuf v1.27.1 ) diff --git a/go.sum b/go.sum index abc7c8004..9f4fcebee 100644 --- a/go.sum +++ b/go.sum @@ -654,6 +654,8 @@ golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.6 h1:SIasE1FVIQOWz2GEAHFOmoW7xchJcqlucjSULTL0Ag4= golang.org/x/tools v0.1.6/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= +golang.org/x/tools v0.1.7 h1:6j8CgantCy3yc8JGBqkDLMKWqZ0RDU2g1HVgacojGWQ= +golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/lib/api/api_test.go b/lib/api/api_test.go index 1bfe5df08..944a01024 100644 --- a/lib/api/api_test.go +++ b/lib/api/api_test.go @@ -35,6 +35,7 @@ import ( "github.com/syncthing/syncthing/lib/locations" "github.com/syncthing/syncthing/lib/logger" loggermocks "github.com/syncthing/syncthing/lib/logger/mocks" + "github.com/syncthing/syncthing/lib/model" modelmocks "github.com/syncthing/syncthing/lib/model/mocks" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/svcutil" @@ -260,7 +261,7 @@ func TestAPIServiceRequests(t *testing.T) { if err != nil { t.Fatal(err) } - defer cancel() + t.Cleanup(cancel) cases := []httpTestCase{ // /rest/db @@ -298,6 +299,12 @@ func TestAPIServiceRequests(t *testing.T) { Type: "application/json", Prefix: "null", }, + { + URL: "/rest/db/status?folder=default", + Code: 200, + Type: "application/json", + Prefix: "", + }, // /rest/stats { @@ -466,14 +473,17 @@ func TestAPIServiceRequests(t *testing.T) { } for _, tc := range cases { - t.Log("Testing", tc.URL, "...") - testHTTPRequest(t, baseURL, tc, testAPIKey) + t.Run(cases[0].URL, func(t *testing.T) { + testHTTPRequest(t, baseURL, tc, testAPIKey) + }) } } // testHTTPRequest tries the given test case, comparing the result code, // content type, and result prefix. func testHTTPRequest(t *testing.T, baseURL string, tc httpTestCase, apikey string) { + t.Parallel() + timeout := time.Second if tc.Timeout > 0 { timeout = tc.Timeout @@ -608,7 +618,7 @@ func startHTTP(cfg config.Wrapper) (string, context.CancelFunc, error) { } addrChan := make(chan string) mockedSummary := &modelmocks.FolderSummaryService{} - mockedSummary.SummaryReturns(map[string]interface{}{"mocked": true}, nil) + mockedSummary.SummaryReturns(new(model.FolderSummary), nil) // Instantiate the API service urService := ur.New(cfg, m, connections, false) diff --git a/lib/fs/basicfs_watch_eventtypes_kqueue.go b/lib/fs/basicfs_watch_eventtypes_kqueue.go index af6fd0186..c8e34dfd6 100644 --- a/lib/fs/basicfs_watch_eventtypes_kqueue.go +++ b/lib/fs/basicfs_watch_eventtypes_kqueue.go @@ -18,4 +18,7 @@ const ( subEventMask = notify.NoteDelete | notify.NoteWrite | notify.NoteRename | notify.Create | notify.NoteAttrib | notify.NoteExtend permEventMask = 0 rmEventMask = notify.NoteDelete | notify.NoteRename + + // WatchKqueue indicates if kqueue is used for filesystem watching + WatchKqueue = true ) diff --git a/lib/fs/basicfs_watch_notkqueue.go b/lib/fs/basicfs_watch_notkqueue.go new file mode 100644 index 000000000..7d529cc27 --- /dev/null +++ b/lib/fs/basicfs_watch_notkqueue.go @@ -0,0 +1,13 @@ +// Copyright (C) 2022 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +//go:build !dragonfly && !freebsd && !netbsd && !openbsd +// +build !dragonfly,!freebsd,!netbsd,!openbsd + +package fs + +// WatchKqueue indicates if kqueue is used for filesystem watching +const WatchKqueue = false diff --git a/lib/model/folder.go b/lib/model/folder.go index 30fddd2d9..a318a64ee 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -34,6 +34,9 @@ import ( "github.com/syncthing/syncthing/lib/watchaggregator" ) +// Arbitrary limit that triggers a warning on kqueue systems +const kqueueItemCountThreshold = 10000 + type folder struct { stateTracker config.FolderConfiguration @@ -81,6 +84,8 @@ type folder struct { puller puller versioner versioner.Versioner + + warnedKqueue bool } type syncRequest struct { @@ -980,6 +985,19 @@ func (f *folder) monitorWatch(ctx context.Context) { warnedOutside := false var lastWatch time.Time pause := time.Minute + // Subscribe to folder summaries only on kqueue systems, to warn about potential high resource usage + var summarySub events.Subscription + var summaryChan <-chan events.Event + if fs.WatchKqueue && !f.warnedKqueue { + summarySub = f.evLogger.Subscribe(events.FolderCompletion) + summaryChan = summarySub.C() + } + defer func() { + aggrCancel() // aggrCancel might e re-assigned -> call within closure + if summaryChan != nil { + summarySub.Unsubscribe() + } + }() for { select { case <-failTimer.C: @@ -1024,6 +1042,15 @@ func (f *folder) monitorWatch(ctx context.Context) { aggrCancel() errChan = nil aggrCtx, aggrCancel = context.WithCancel(ctx) + case ev := <-summaryChan: + if data, ok := ev.Data.(FolderSummaryEventData); !ok { + f.evLogger.Log(events.Failure, "Unexpected type of folder-summary event in folder.monitorWatch") + } else if data.Summary.LocalTotalItems > kqueueItemCountThreshold { + f.warnedKqueue = true + summarySub.Unsubscribe() + summaryChan = nil + l.Warnf("Filesystem watching (kqueue) is enabled on %v with a lot of files/directories, and that requires a lot of resources and might slow down your system significantly", f.Description()) + } case <-ctx.Done(): return } diff --git a/lib/model/folder_summary.go b/lib/model/folder_summary.go index 58427b318..405a8e54b 100644 --- a/lib/model/folder_summary.go +++ b/lib/model/folder_summary.go @@ -29,7 +29,7 @@ const maxDurationSinceLastEventReq = time.Minute type FolderSummaryService interface { suture.Service - Summary(folder string) (map[string]interface{}, error) + Summary(folder string) (*FolderSummary, error) OnEventRequest() } @@ -76,8 +76,58 @@ func (c *folderSummaryService) String() string { return fmt.Sprintf("FolderSummaryService@%p", c) } -func (c *folderSummaryService) Summary(folder string) (map[string]interface{}, error) { - var res = make(map[string]interface{}) +// FolderSummary replaces the previously used map[string]interface{}, and needs +// to keep the structure/naming for api backwards compatibility +type FolderSummary struct { + Errors int `json:"errors"` + PullErrors int `json:"pullErrors"` // deprecated + + Invalid string `json:"invalid"` // deprecated + + GlobalFiles int `json:"globalFiles"` + GlobalDirectories int `json:"globalDirectories"` + GlobalSymlinks int `json:"globalSymlinks"` + GlobalDeleted int `json:"globalDeleted"` + GlobalBytes int64 `json:"globalBytes"` + GlobalTotalItems int `json:"globalTotalItems"` + + LocalFiles int `json:"localFiles"` + LocalDirectories int `json:"localDirectories"` + LocalSymlinks int `json:"localSymlinks"` + LocalDeleted int `json:"localDeleted"` + LocalBytes int64 `json:"localBytes"` + LocalTotalItems int `json:"localTotalItems"` + + NeedFiles int `json:"needFiles"` + NeedDirectories int `json:"needDirectories"` + NeedSymlinks int `json:"needSymlinks"` + NeedDeletes int `json:"needDeletes"` + NeedBytes int64 `json:"needBytes"` + NeedTotalItems int `json:"needTotalItems"` + + ReceiveOnlyChangedFiles int `json:"receiveOnlyChangedFiles"` + ReceiveOnlyChangedDirectories int `json:"receiveOnlyChangedDirectories"` + ReceiveOnlyChangedSymlinks int `json:"receiveOnlyChangedSymlinks"` + ReceiveOnlyChangedDeletes int `json:"receiveOnlyChangedDeletes"` + ReceiveOnlyChangedBytes int64 `json:"receiveOnlyChangedBytes"` + ReceiveOnlyTotalItems int `json:"receiveOnlyTotalItems"` + + InSyncFiles int `json:"inSyncFiles"` + InSyncBytes int64 `json:"inSyncBytes"` + + State string `json:"state"` + StateChanged time.Time `json:"stateChanged"` + Error string `json:"error"` + + Version int64 `json:"version"` // deprecated + Sequence int64 `json:"sequence"` + + IgnorePatterns bool `json:"ignorePatterns"` + WatchError string `json:"watchError"` +} + +func (c *folderSummaryService) Summary(folder string) (*FolderSummary, error) { + res := new(FolderSummary) var local, global, need, ro db.Counts var ourSeq, remoteSeq int64 @@ -101,14 +151,14 @@ func (c *folderSummaryService) Summary(folder string) (map[string]interface{}, e return nil, err } - res["errors"] = len(errors) - res["pullErrors"] = len(errors) // deprecated + res.Errors = len(errors) + res.PullErrors = len(errors) // deprecated - res["invalid"] = "" // Deprecated, retains external API for now + res.Invalid = "" // Deprecated, retains external API for now - res["globalFiles"], res["globalDirectories"], res["globalSymlinks"], res["globalDeleted"], res["globalBytes"], res["globalTotalItems"] = global.Files, global.Directories, global.Symlinks, global.Deleted, global.Bytes, global.TotalItems() + res.GlobalFiles, res.GlobalDirectories, res.GlobalSymlinks, res.GlobalDeleted, res.GlobalBytes, res.GlobalTotalItems = global.Files, global.Directories, global.Symlinks, global.Deleted, global.Bytes, global.TotalItems() - res["localFiles"], res["localDirectories"], res["localSymlinks"], res["localDeleted"], res["localBytes"], res["localTotalItems"] = local.Files, local.Directories, local.Symlinks, local.Deleted, local.Bytes, local.TotalItems() + res.LocalFiles, res.LocalDirectories, res.LocalSymlinks, res.LocalDeleted, res.LocalBytes, res.LocalTotalItems = local.Files, local.Directories, local.Symlinks, local.Deleted, local.Bytes, local.TotalItems() fcfg, haveFcfg := c.cfg.Folder(folder) @@ -122,41 +172,41 @@ func (c *folderSummaryService) Summary(folder string) (map[string]interface{}, e if need.Bytes < 0 { need.Bytes = 0 } - res["needFiles"], res["needDirectories"], res["needSymlinks"], res["needDeletes"], res["needBytes"], res["needTotalItems"] = need.Files, need.Directories, need.Symlinks, need.Deleted, need.Bytes, need.TotalItems() + res.NeedFiles, res.NeedDirectories, res.NeedSymlinks, res.NeedDeletes, res.NeedBytes, res.NeedTotalItems = need.Files, need.Directories, need.Symlinks, need.Deleted, need.Bytes, need.TotalItems() if haveFcfg && (fcfg.Type == config.FolderTypeReceiveOnly || fcfg.Type == config.FolderTypeReceiveEncrypted) { // Add statistics for things that have changed locally in a receive // only or receive encrypted folder. - res["receiveOnlyChangedFiles"] = ro.Files - res["receiveOnlyChangedDirectories"] = ro.Directories - res["receiveOnlyChangedSymlinks"] = ro.Symlinks - res["receiveOnlyChangedDeletes"] = ro.Deleted - res["receiveOnlyChangedBytes"] = ro.Bytes - res["receiveOnlyTotalItems"] = ro.TotalItems() + res.ReceiveOnlyChangedFiles = ro.Files + res.ReceiveOnlyChangedDirectories = ro.Directories + res.ReceiveOnlyChangedSymlinks = ro.Symlinks + res.ReceiveOnlyChangedDeletes = ro.Deleted + res.ReceiveOnlyChangedBytes = ro.Bytes + res.ReceiveOnlyTotalItems = ro.TotalItems() } - res["inSyncFiles"], res["inSyncBytes"] = global.Files-need.Files, global.Bytes-need.Bytes + res.InSyncFiles, res.InSyncBytes = global.Files-need.Files, global.Bytes-need.Bytes - res["state"], res["stateChanged"], err = c.model.State(folder) + res.State, res.StateChanged, err = c.model.State(folder) if err != nil { - res["error"] = err.Error() + res.Error = err.Error() } - res["version"] = ourSeq + remoteSeq // legacy - res["sequence"] = ourSeq + remoteSeq // new name + res.Version = ourSeq + remoteSeq // legacy + res.Sequence = ourSeq + remoteSeq // new name ignorePatterns, _, _ := c.model.CurrentIgnores(folder) - res["ignorePatterns"] = false + res.IgnorePatterns = false for _, line := range ignorePatterns { if len(line) > 0 && !strings.HasPrefix(line, "//") { - res["ignorePatterns"] = true + res.IgnorePatterns = true break } } err = c.model.WatchError(folder) if err != nil { - res["watchError"] = err.Error() + res.WatchError = err.Error() } return res, nil @@ -322,6 +372,11 @@ func (c *folderSummaryService) foldersToHandle() []string { return res } +type FolderSummaryEventData struct { + Folder string `json:"folder"` + Summary *FolderSummary `json:"summary"` +} + // sendSummary send the summary events for a single folder func (c *folderSummaryService) sendSummary(ctx context.Context, folder string) { // The folder summary contains how many bytes, files etc @@ -330,9 +385,9 @@ func (c *folderSummaryService) sendSummary(ctx context.Context, folder string) { if err != nil { return } - c.evLogger.Log(events.FolderSummary, map[string]interface{}{ - "folder": folder, - "summary": data, + c.evLogger.Log(events.FolderSummary, FolderSummaryEventData{ + Folder: folder, + Summary: data, }) for _, devCfg := range c.cfg.Folders()[folder].Devices { diff --git a/lib/model/mocks/folderSummaryService.go b/lib/model/mocks/folderSummaryService.go index d4285d8c5..4dca3505f 100644 --- a/lib/model/mocks/folderSummaryService.go +++ b/lib/model/mocks/folderSummaryService.go @@ -24,17 +24,17 @@ type FolderSummaryService struct { serveReturnsOnCall map[int]struct { result1 error } - SummaryStub func(string) (map[string]interface{}, error) + SummaryStub func(string) (*model.FolderSummary, error) summaryMutex sync.RWMutex summaryArgsForCall []struct { arg1 string } summaryReturns struct { - result1 map[string]interface{} + result1 *model.FolderSummary result2 error } summaryReturnsOnCall map[int]struct { - result1 map[string]interface{} + result1 *model.FolderSummary result2 error } invocations map[string][][]interface{} @@ -126,7 +126,7 @@ func (fake *FolderSummaryService) ServeReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FolderSummaryService) Summary(arg1 string) (map[string]interface{}, error) { +func (fake *FolderSummaryService) Summary(arg1 string) (*model.FolderSummary, error) { fake.summaryMutex.Lock() ret, specificReturn := fake.summaryReturnsOnCall[len(fake.summaryArgsForCall)] fake.summaryArgsForCall = append(fake.summaryArgsForCall, struct { @@ -151,7 +151,7 @@ func (fake *FolderSummaryService) SummaryCallCount() int { return len(fake.summaryArgsForCall) } -func (fake *FolderSummaryService) SummaryCalls(stub func(string) (map[string]interface{}, error)) { +func (fake *FolderSummaryService) SummaryCalls(stub func(string) (*model.FolderSummary, error)) { fake.summaryMutex.Lock() defer fake.summaryMutex.Unlock() fake.SummaryStub = stub @@ -164,28 +164,28 @@ func (fake *FolderSummaryService) SummaryArgsForCall(i int) string { return argsForCall.arg1 } -func (fake *FolderSummaryService) SummaryReturns(result1 map[string]interface{}, result2 error) { +func (fake *FolderSummaryService) SummaryReturns(result1 *model.FolderSummary, result2 error) { fake.summaryMutex.Lock() defer fake.summaryMutex.Unlock() fake.SummaryStub = nil fake.summaryReturns = struct { - result1 map[string]interface{} + result1 *model.FolderSummary result2 error }{result1, result2} } -func (fake *FolderSummaryService) SummaryReturnsOnCall(i int, result1 map[string]interface{}, result2 error) { +func (fake *FolderSummaryService) SummaryReturnsOnCall(i int, result1 *model.FolderSummary, result2 error) { fake.summaryMutex.Lock() defer fake.summaryMutex.Unlock() fake.SummaryStub = nil if fake.summaryReturnsOnCall == nil { fake.summaryReturnsOnCall = make(map[int]struct { - result1 map[string]interface{} + result1 *model.FolderSummary result2 error }) } fake.summaryReturnsOnCall[i] = struct { - result1 map[string]interface{} + result1 *model.FolderSummary result2 error }{result1, result2} } diff --git a/lib/syncthing/verboseservice.go b/lib/syncthing/verboseservice.go index b23027df3..a78c2ff20 100644 --- a/lib/syncthing/verboseservice.go +++ b/lib/syncthing/verboseservice.go @@ -9,8 +9,10 @@ package syncthing import ( "context" "fmt" + "regexp" "github.com/syncthing/syncthing/lib/events" + "github.com/syncthing/syncthing/lib/model" ) // The verbose logging service subscribes to events and prints these in @@ -46,6 +48,8 @@ func (s *verboseService) Serve(ctx context.Context) error { } } +var folderSummaryRemoveDeprecatedRe = regexp.MustCompile(`(Invalid|IgnorePatterns|StateChanged):\S+\s?`) + func (s *verboseService) formatEvent(ev events.Event) string { switch ev.Type { case events.DownloadProgress, events.LocalIndexUpdated: @@ -116,15 +120,8 @@ func (s *verboseService) formatEvent(ev events.Event) string { return fmt.Sprintf("Completion for folder %q on device %v is %v%%", data["folder"], data["device"], data["completion"]) case events.FolderSummary: - data := ev.Data.(map[string]interface{}) - sum := make(map[string]interface{}) - for k, v := range data["summary"].(map[string]interface{}) { - if k == "invalid" || k == "ignorePatterns" || k == "stateChanged" { - continue - } - sum[k] = v - } - return fmt.Sprintf("Summary for folder %q is %v", data["folder"], sum) + data := ev.Data.(model.FolderSummaryEventData) + return folderSummaryRemoveDeprecatedRe.ReplaceAllString(fmt.Sprintf("Summary for folder %q is %+v", data.Folder, data.Summary), "") case events.FolderScanProgress: data := ev.Data.(map[string]interface{})