lib/api: Improve folder summary event, verbose service (#9370)

This makes a couple of small improvements to the folder summary
mechanism:

- The folder summary includes the local and remote sequence numbers in
clear text, rather than some odd sum that I'm not sure what it was
intended to represent.
- The folder summary event is generated when appropriate, regardless of
whether there is an event listener. We did this before because
generating it was expensive, and we wanted to avoid doing it
unnecessarily. Nowadays, however, it's mostly just reading out
pre-calculated metadata, and anyway, it's nice if it shows up reliably
when running with -verbose.

The point of all this is to make it easier to use these events to judge
when devices are, in fact, in sync. As-is, if I'm looking at two
devices, it's very difficult to reliably determine if they are in sync
or not. The reason is that while we can ask device A if it thinks it's
in sync, we can't see if the answer is "yes" because it has processed
all changes from B, or if it just doesn't know about the changes from B
yet. With proper sequence numbers in the event we can compare the two
and determine the truth. This makes testing a lot easier.
This commit is contained in:
Jakob Borg 2024-01-31 08:24:39 +01:00 committed by GitHub
parent bda4016109
commit f16817632f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 39 additions and 61 deletions

View File

@ -355,12 +355,7 @@ func (s *service) Serve(ctx context.Context) error {
// Handle Prometheus metrics
promHttpHandler := promhttp.Handler()
mux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
// fetching metrics counts as an event, for the purpose of whether
// we should prepare folder summaries etc.
s.fss.OnEventRequest()
promHttpHandler.ServeHTTP(w, req)
})
mux.Handle("/metrics", promHttpHandler)
guiCfg := s.cfg.GUI()
@ -1395,11 +1390,7 @@ func (s *service) getDiskEvents(w http.ResponseWriter, r *http.Request) {
s.getEvents(w, r, sub)
}
func (s *service) getEvents(w http.ResponseWriter, r *http.Request, eventSub events.BufferedSubscription) {
if eventSub.Mask()&(events.FolderSummary|events.FolderCompletion) != 0 {
s.fss.OnEventRequest()
}
func (*service) getEvents(w http.ResponseWriter, r *http.Request, eventSub events.BufferedSubscription) {
qs := r.URL.Query()
sinceStr := qs.Get("since")
limitStr := qs.Get("limit")

View File

@ -338,17 +338,22 @@ func (s *Snapshot) Sequence(device protocol.DeviceID) int64 {
return s.meta.Counts(device, 0).Sequence
}
// RemoteSequence returns the change version for the given folder, as
// sent by remote peers. This is guaranteed to increment if the contents of
// the remote or global folder has changed.
func (s *Snapshot) RemoteSequence() int64 {
var ver int64
// RemoteSequences returns a map of the sequence numbers seen for each
// remote device sharing this folder.
func (s *Snapshot) RemoteSequences() map[protocol.DeviceID]int64 {
res := make(map[protocol.DeviceID]int64)
for _, device := range s.meta.devices() {
ver += s.Sequence(device)
switch device {
case protocol.EmptyDeviceID, protocol.LocalDeviceID, protocol.GlobalDeviceID:
continue
default:
if seq := s.Sequence(device); seq > 0 {
res[device] = seq
}
}
}
return ver
return res
}
func (s *Snapshot) LocalSize() Counts {

View File

@ -25,12 +25,9 @@ import (
"github.com/syncthing/syncthing/lib/sync"
)
const maxDurationSinceLastEventReq = time.Minute
type FolderSummaryService interface {
suture.Service
Summary(folder string) (*FolderSummary, error)
OnEventRequest()
}
// The folderSummaryService adds summary information events (FolderSummary and
@ -47,23 +44,18 @@ type folderSummaryService struct {
// For keeping track of folders to recalculate for
foldersMut sync.Mutex
folders map[string]struct{}
// For keeping track of when the last event request on the API was
lastEventReq time.Time
lastEventReqMut sync.Mutex
}
func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID, evLogger events.Logger) FolderSummaryService {
service := &folderSummaryService{
Supervisor: suture.New("folderSummaryService", svcutil.SpecWithDebugLogger(l)),
cfg: cfg,
model: m,
id: id,
evLogger: evLogger,
immediate: make(chan string),
folders: make(map[string]struct{}),
foldersMut: sync.NewMutex(),
lastEventReqMut: sync.NewMutex(),
Supervisor: suture.New("folderSummaryService", svcutil.SpecWithDebugLogger(l)),
cfg: cfg,
model: m,
id: id,
evLogger: evLogger,
immediate: make(chan string),
folders: make(map[string]struct{}),
foldersMut: sync.NewMutex(),
}
service.Add(svcutil.AsService(service.listenForUpdates, fmt.Sprintf("%s/listenForUpdates", service)))
@ -119,8 +111,9 @@ type FolderSummary struct {
StateChanged time.Time `json:"stateChanged"`
Error string `json:"error"`
Version int64 `json:"version"` // deprecated
Sequence int64 `json:"sequence"`
Version int64 `json:"version"` // deprecated
Sequence int64 `json:"sequence"`
RemoteSequence map[protocol.DeviceID]int64 `json:"remoteSequence"`
IgnorePatterns bool `json:"ignorePatterns"`
WatchError string `json:"watchError"`
@ -130,7 +123,8 @@ func (c *folderSummaryService) Summary(folder string) (*FolderSummary, error) {
res := new(FolderSummary)
var local, global, need, ro db.Counts
var ourSeq, remoteSeq int64
var ourSeq int64
var remoteSeq map[protocol.DeviceID]int64
errors, err := c.model.FolderErrors(folder)
if err == nil {
var snap *db.Snapshot
@ -140,7 +134,7 @@ func (c *folderSummaryService) Summary(folder string) (*FolderSummary, error) {
need = snap.NeedSize(protocol.LocalDeviceID)
ro = snap.ReceiveOnlyChangedSize()
ourSeq = snap.Sequence(protocol.LocalDeviceID)
remoteSeq = snap.Sequence(protocol.GlobalDeviceID)
remoteSeq = snap.RemoteSequences()
snap.Release()
}
}
@ -192,8 +186,9 @@ func (c *folderSummaryService) Summary(folder string) (*FolderSummary, error) {
res.Error = err.Error()
}
res.Version = ourSeq + remoteSeq // legacy
res.Sequence = ourSeq + remoteSeq // new name
res.Version = ourSeq // legacy
res.Sequence = ourSeq
res.RemoteSequence = remoteSeq
ignorePatterns, _, _ := c.model.CurrentIgnores(folder)
res.IgnorePatterns = false
@ -212,12 +207,6 @@ func (c *folderSummaryService) Summary(folder string) (*FolderSummary, error) {
return res, nil
}
func (c *folderSummaryService) OnEventRequest() {
c.lastEventReqMut.Lock()
c.lastEventReq = time.Now()
c.lastEventReqMut.Unlock()
}
// listenForUpdates subscribes to the event bus and makes note of folders that
// need their data recalculated.
func (c *folderSummaryService) listenForUpdates(ctx context.Context) error {
@ -357,17 +346,6 @@ func (c *folderSummaryService) calculateSummaries(ctx context.Context) error {
// foldersToHandle returns the list of folders needing a summary update, and
// clears the list.
func (c *folderSummaryService) foldersToHandle() []string {
// We only recalculate summaries if someone is listening to events
// (a request to /rest/events has been made within the last
// pingEventInterval).
c.lastEventReqMut.Lock()
last := c.lastEventReq
c.lastEventReqMut.Unlock()
if time.Since(last) > maxDurationSinceLastEventReq {
return nil
}
c.foldersMut.Lock()
res := make([]string, 0, len(c.folders))
for folder := range c.folders {

View File

@ -52,7 +52,7 @@ var folderSummaryRemoveDeprecatedRe = regexp.MustCompile(`(Invalid|IgnorePattern
func (*verboseService) formatEvent(ev events.Event) string {
switch ev.Type {
case events.DownloadProgress, events.LocalIndexUpdated:
case events.DownloadProgress:
// Skip
return ""
@ -86,9 +86,13 @@ func (*verboseService) formatEvent(ev events.Event) string {
data := ev.Data.(map[string]string)
return fmt.Sprintf("Remote change detected in folder %q: %s %s %s", data["folder"], data["action"], data["type"], data["path"])
case events.LocalIndexUpdated:
data := ev.Data.(map[string]interface{})
return fmt.Sprintf("Local index update for %q with %d items (seq: %d)", data["folder"], data["items"], data["sequence"])
case events.RemoteIndexUpdated:
data := ev.Data.(map[string]interface{})
return fmt.Sprintf("Device %v sent an index update for %q with %d items", data["device"], data["folder"], data["items"])
return fmt.Sprintf("Device %v sent an index update for %q with %d items (seq: %d)", data["device"], data["folder"], data["items"], data["sequence"])
case events.DeviceRejected:
data := ev.Data.(map[string]string)
@ -117,7 +121,7 @@ func (*verboseService) formatEvent(ev events.Event) string {
case events.FolderCompletion:
data := ev.Data.(map[string]interface{})
return fmt.Sprintf("Completion for folder %q on device %v is %v%% (state: %s)", data["folder"], data["device"], data["completion"], data["remoteState"])
return fmt.Sprintf("Completion for folder %q on device %v is %v%% (state: %s, seq: %d)", data["folder"], data["device"], data["completion"], data["remoteState"], data["sequence"])
case events.FolderSummary:
data := ev.Data.(model.FolderSummaryEventData)