all: Add Prometheus-style metrics to expose some internal performance counters (fixes #5175) (#9003)

This commit is contained in:
Jakob Borg 2023-08-04 19:57:30 +02:00 committed by GitHub
parent 58042b3129
commit b9c08d3814
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 945 additions and 53 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/calmh/incontainer" "github.com/calmh/incontainer"
"github.com/julienschmidt/httprouter" "github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rcrowley/go-metrics" "github.com/rcrowley/go-metrics"
"github.com/thejerf/suture/v4" "github.com/thejerf/suture/v4"
"github.com/vitrun/qart/qr" "github.com/vitrun/qart/qr"
@ -351,6 +352,15 @@ func (s *service) Serve(ctx context.Context) error {
// Handle the special meta.js path // Handle the special meta.js path
mux.HandleFunc("/meta.js", s.getJSMetadata) mux.HandleFunc("/meta.js", s.getJSMetadata)
// Handle Prometheus metrics
promHttpHandler := promhttp.Handler()
mux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
// fetching metrics counts as an event, for the purpose of whether
// we should prepare folder summaries etc.
s.fss.OnEventRequest()
promHttpHandler.ServeHTTP(w, req)
})
guiCfg := s.cfg.GUI() guiCfg := s.cfg.GUI()
// Wrap everything in CSRF protection. The /rest prefix should be // Wrap everything in CSRF protection. The /rest prefix should be
@ -1214,6 +1224,12 @@ func (s *service) getSupportBundle(w http.ResponseWriter, r *http.Request) {
} }
} }
// Metrics data as text
buf := bytes.NewBuffer(nil)
wr := bufferedResponseWriter{Writer: buf}
promhttp.Handler().ServeHTTP(wr, &http.Request{Method: http.MethodGet})
files = append(files, fileEntry{name: "metrics.txt", data: buf.Bytes()})
// Heap and CPU Proofs as a pprof extension // Heap and CPU Proofs as a pprof extension
var heapBuffer, cpuBuffer bytes.Buffer var heapBuffer, cpuBuffer bytes.Buffer
filename := fmt.Sprintf("syncthing-heap-%s-%s-%s-%s.pprof", runtime.GOOS, runtime.GOARCH, build.Version, time.Now().Format("150405")) // hhmmss filename := fmt.Sprintf("syncthing-heap-%s-%s-%s-%s.pprof", runtime.GOOS, runtime.GOARCH, build.Version, time.Now().Format("150405")) // hhmmss
@ -2043,3 +2059,12 @@ func httpError(w http.ResponseWriter, err error) {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
} }
} }
type bufferedResponseWriter struct {
io.Writer
}
func (w bufferedResponseWriter) WriteHeader(int) {}
func (w bufferedResponseWriter) Header() http.Header {
return http.Header{}
}

View File

@ -297,6 +297,7 @@ loop:
case e := <-l.events: case e := <-l.events:
// Incoming events get sent // Incoming events get sent
l.sendEvent(e) l.sendEvent(e)
metricEvents.WithLabelValues(e.Type.String(), metricEventStateCreated).Inc()
case fn := <-l.funcs: case fn := <-l.funcs:
// Subscriptions are handled here. // Subscriptions are handled here.
@ -345,9 +346,11 @@ func (l *logger) sendEvent(e Event) {
select { select {
case s.events <- e: case s.events <- e:
metricEvents.WithLabelValues(e.Type.String(), metricEventStateDelivered).Inc()
case <-l.timeout.C: case <-l.timeout.C:
// if s.events is not ready, drop the event // if s.events is not ready, drop the event
timedOut = true timedOut = true
metricEvents.WithLabelValues(e.Type.String(), metricEventStateDropped).Inc()
} }
// If stop returns false it already sent something to the // If stop returns false it already sent something to the

25
lib/events/metrics.go Normal file
View File

@ -0,0 +1,25 @@
// Copyright (C) 2023 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package events
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var metricEvents = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "events",
Name: "total",
Help: "Total number of created/forwarded/dropped events",
}, []string{"event", "state"})
const (
metricEventStateCreated = "created"
metricEventStateDelivered = "delivered"
metricEventStateDropped = "dropped"
)

View File

@ -28,6 +28,7 @@ const (
filesystemWrapperTypeError filesystemWrapperTypeError
filesystemWrapperTypeWalk filesystemWrapperTypeWalk
filesystemWrapperTypeLog filesystemWrapperTypeLog
filesystemWrapperTypeMetrics
) )
type XattrFilter interface { type XattrFilter interface {
@ -275,6 +276,8 @@ func NewFilesystem(fsType FilesystemType, uri string, opts ...Option) Filesystem
fs = mtimeOpt.apply(fs) fs = mtimeOpt.apply(fs)
} }
fs = &metricsFS{next: fs}
if l.ShouldDebug("walkfs") { if l.ShouldDebug("walkfs") {
return NewWalkFilesystem(&logFilesystem{fs}) return NewWalkFilesystem(&logFilesystem{fs})
} }

View File

@ -320,7 +320,15 @@ func TestCopyRange(tttt *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if err := impl(src.(basicFile), dst.(basicFile), testCase.srcOffset, testCase.dstOffset, testCase.copySize); err != nil { srcBasic, ok := unwrap(src).(basicFile)
if !ok {
t.Fatal("src file is not a basic file")
}
dstBasic, ok := unwrap(dst).(basicFile)
if !ok {
t.Fatal("dst file is not a basic file")
}
if err := impl(srcBasic, dstBasic, testCase.srcOffset, testCase.dstOffset, testCase.copySize); err != nil {
if err == syscall.ENOTSUP { if err == syscall.ENOTSUP {
// Test runner can adjust directory in which to run the tests, that allow broader tests. // Test runner can adjust directory in which to run the tests, that allow broader tests.
t.Skip("Not supported on the current filesystem, set STFSTESTPATH env var.") t.Skip("Not supported on the current filesystem, set STFSTESTPATH env var.")

339
lib/fs/metrics.go Normal file
View File

@ -0,0 +1,339 @@
// Copyright (C) 2023 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package fs
import (
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/syncthing/syncthing/lib/protocol"
)
var (
metricTotalOperationSeconds = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "fs",
Name: "operation_seconds_total",
Help: "Total time spent in filesystem operations, per filesystem root and operation",
}, []string{"root", "operation"})
metricTotalOperationsCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "fs",
Name: "operations_total",
Help: "Total number of filesystem operations, per filesystem root and operation",
}, []string{"root", "operation"})
metricTotalBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "fs",
Name: "operation_bytes_total",
Help: "Total number of filesystem bytes transferred, per filesystem root and operation",
}, []string{"root", "operation"})
)
const (
// fs operations
metricOpChmod = "chmod"
metricOpLchmod = "lchmod"
metricOpChtimes = "chtimes"
metricOpCreate = "create"
metricOpCreateSymlink = "createsymlink"
metricOpDirNames = "dirnames"
metricOpLstat = "lstat"
metricOpMkdir = "mdkir"
metricOpMkdirAll = "mkdirall"
metricOpOpen = "open"
metricOpOpenFile = "openfile"
metricOpReadSymlink = "readsymlink"
metricOpRemove = "remove"
metricOpRemoveAll = "removeall"
metricOpRename = "rename"
metricOpStat = "stat"
metricOpSymlinksSupported = "symlinkssupported"
metricOpWalk = "walk"
metricOpWatch = "watch"
metricOpHide = "hide"
metricOpUnhide = "unhide"
metricOpGlob = "glob"
metricOpRoots = "roots"
metricOpUsage = "usage"
metricOpType = "type"
metricOpURI = "uri"
metricOpOptions = "options"
metricOpSameFile = "samefile"
metricOpPlatformData = "platformdata"
metricOpGetXattr = "getxattr"
metricOpSetXattr = "setxattr"
// file operations
metricOpRead = "read"
metricOpReadAt = "readat"
metricOpWrite = "write"
metricOpWriteAt = "writeat"
metricOpTruncate = "truncate"
metricOpSeek = "seek"
metricOpSync = "sync"
metricOpClose = "close"
metricOpName = "name"
)
type metricsFS struct {
next Filesystem
}
var _ Filesystem = (*metricsFS)(nil)
func (m *metricsFS) account(op string) func(bytes int) {
t0 := time.Now()
root := m.next.URI()
return func(bytes int) {
metricTotalOperationSeconds.WithLabelValues(root, op).Add(time.Since(t0).Seconds())
metricTotalOperationsCount.WithLabelValues(root, op).Inc()
if bytes >= 0 {
metricTotalBytesCount.WithLabelValues(root, op).Add(float64(bytes))
}
}
}
func (m *metricsFS) Chmod(name string, mode FileMode) error {
defer m.account(metricOpChmod)(-1)
return m.next.Chmod(name, mode)
}
func (m *metricsFS) Lchown(name string, uid, gid string) error {
defer m.account(metricOpLchmod)(-1)
return m.next.Lchown(name, uid, gid)
}
func (m *metricsFS) Chtimes(name string, atime time.Time, mtime time.Time) error {
defer m.account(metricOpChtimes)(-1)
return m.next.Chtimes(name, atime, mtime)
}
func (m *metricsFS) Create(name string) (File, error) {
defer m.account(metricOpCreate)(-1)
f, err := m.next.Create(name)
if err != nil {
return nil, err
}
return &metricsFile{next: f, fs: m}, nil
}
func (m *metricsFS) CreateSymlink(target, name string) error {
defer m.account(metricOpCreateSymlink)(-1)
return m.next.CreateSymlink(target, name)
}
func (m *metricsFS) DirNames(name string) ([]string, error) {
defer m.account(metricOpDirNames)(-1)
return m.next.DirNames(name)
}
func (m *metricsFS) Lstat(name string) (FileInfo, error) {
defer m.account(metricOpLstat)(-1)
return m.next.Lstat(name)
}
func (m *metricsFS) Mkdir(name string, perm FileMode) error {
defer m.account(metricOpMkdir)(-1)
return m.next.Mkdir(name, perm)
}
func (m *metricsFS) MkdirAll(name string, perm FileMode) error {
defer m.account(metricOpMkdirAll)(-1)
return m.next.MkdirAll(name, perm)
}
func (m *metricsFS) Open(name string) (File, error) {
defer m.account(metricOpOpen)(-1)
f, err := m.next.Open(name)
if err != nil {
return nil, err
}
return &metricsFile{next: f, fs: m}, nil
}
func (m *metricsFS) OpenFile(name string, flags int, mode FileMode) (File, error) {
defer m.account(metricOpOpenFile)(-1)
f, err := m.next.OpenFile(name, flags, mode)
if err != nil {
return nil, err
}
return &metricsFile{next: f, fs: m}, nil
}
func (m *metricsFS) ReadSymlink(name string) (string, error) {
defer m.account(metricOpReadSymlink)(-1)
return m.next.ReadSymlink(name)
}
func (m *metricsFS) Remove(name string) error {
defer m.account(metricOpRemove)(-1)
return m.next.Remove(name)
}
func (m *metricsFS) RemoveAll(name string) error {
defer m.account(metricOpRemoveAll)(-1)
return m.next.RemoveAll(name)
}
func (m *metricsFS) Rename(oldname, newname string) error {
defer m.account(metricOpRename)(-1)
return m.next.Rename(oldname, newname)
}
func (m *metricsFS) Stat(name string) (FileInfo, error) {
defer m.account(metricOpStat)(-1)
return m.next.Stat(name)
}
func (m *metricsFS) SymlinksSupported() bool {
defer m.account(metricOpSymlinksSupported)(-1)
return m.next.SymlinksSupported()
}
func (m *metricsFS) Walk(name string, walkFn WalkFunc) error {
defer m.account(metricOpWalk)(-1)
return m.next.Walk(name, walkFn)
}
func (m *metricsFS) Watch(path string, ignore Matcher, ctx context.Context, ignorePerms bool) (<-chan Event, <-chan error, error) {
defer m.account(metricOpWatch)(-1)
return m.next.Watch(path, ignore, ctx, ignorePerms)
}
func (m *metricsFS) Hide(name string) error {
defer m.account(metricOpHide)(-1)
return m.next.Hide(name)
}
func (m *metricsFS) Unhide(name string) error {
defer m.account(metricOpUnhide)(-1)
return m.next.Unhide(name)
}
func (m *metricsFS) Glob(pattern string) ([]string, error) {
defer m.account(metricOpGlob)(-1)
return m.next.Glob(pattern)
}
func (m *metricsFS) Roots() ([]string, error) {
defer m.account(metricOpRoots)(-1)
return m.next.Roots()
}
func (m *metricsFS) Usage(name string) (Usage, error) {
defer m.account(metricOpUsage)(-1)
return m.next.Usage(name)
}
func (m *metricsFS) Type() FilesystemType {
defer m.account(metricOpType)(-1)
return m.next.Type()
}
func (m *metricsFS) URI() string {
defer m.account(metricOpURI)(-1)
return m.next.URI()
}
func (m *metricsFS) Options() []Option {
defer m.account(metricOpOptions)(-1)
return m.next.Options()
}
func (m *metricsFS) SameFile(fi1, fi2 FileInfo) bool {
defer m.account(metricOpSameFile)(-1)
return m.next.SameFile(fi1, fi2)
}
func (m *metricsFS) PlatformData(name string, withOwnership, withXattrs bool, xattrFilter XattrFilter) (protocol.PlatformData, error) {
defer m.account(metricOpPlatformData)(-1)
return m.next.PlatformData(name, withOwnership, withXattrs, xattrFilter)
}
func (m *metricsFS) GetXattr(name string, xattrFilter XattrFilter) ([]protocol.Xattr, error) {
defer m.account(metricOpGetXattr)(-1)
return m.next.GetXattr(name, xattrFilter)
}
func (m *metricsFS) SetXattr(path string, xattrs []protocol.Xattr, xattrFilter XattrFilter) error {
defer m.account(metricOpSetXattr)(-1)
return m.next.SetXattr(path, xattrs, xattrFilter)
}
func (m *metricsFS) underlying() (Filesystem, bool) {
return m.next, true
}
func (m *metricsFS) wrapperType() filesystemWrapperType {
return filesystemWrapperTypeMetrics
}
type metricsFile struct {
fs *metricsFS
next File
}
func (m *metricsFile) Read(p []byte) (n int, err error) {
acc := m.fs.account(metricOpRead)
defer func() { acc(n) }()
return m.next.Read(p)
}
func (m *metricsFile) ReadAt(p []byte, off int64) (n int, err error) {
acc := m.fs.account(metricOpReadAt)
defer func() { acc(n) }()
return m.next.ReadAt(p, off)
}
func (m *metricsFile) Seek(offset int64, whence int) (int64, error) {
defer m.fs.account(metricOpSeek)(-1)
return m.next.Seek(offset, whence)
}
func (m *metricsFile) Stat() (FileInfo, error) {
defer m.fs.account(metricOpStat)(-1)
return m.next.Stat()
}
func (m *metricsFile) Sync() error {
defer m.fs.account(metricOpSync)(-1)
return m.next.Sync()
}
func (m *metricsFile) Truncate(size int64) error {
defer m.fs.account(metricOpTruncate)(-1)
return m.next.Truncate(size)
}
func (m *metricsFile) Write(p []byte) (n int, err error) {
acc := m.fs.account(metricOpWrite)
defer func() { acc(n) }()
return m.next.Write(p)
}
func (m *metricsFile) WriteAt(p []byte, off int64) (n int, err error) {
acc := m.fs.account(metricOpWriteAt)
defer func() { acc(n) }()
return m.next.WriteAt(p, off)
}
func (m *metricsFile) Close() error {
defer m.fs.account(metricOpClose)(-1)
return m.next.Close()
}
func (m *metricsFile) Name() string {
defer m.fs.account(metricOpName)(-1)
return m.next.Name()
}
func (m *metricsFile) unwrap() File {
return m.next
}

View File

@ -260,6 +260,8 @@ func newMtimeFS(path string, db database, options ...MtimeFSOption) *mtimeFS {
} }
func newMtimeFSWithWalk(path string, db database, options ...MtimeFSOption) (*mtimeFS, *walkFilesystem) { func newMtimeFSWithWalk(path string, db database, options ...MtimeFSOption) (*mtimeFS, *walkFilesystem) {
wfs := NewFilesystem(FilesystemTypeBasic, path, NewMtimeOption(db, options...)).(*walkFilesystem) fs := NewFilesystem(FilesystemTypeBasic, path, NewMtimeOption(db, options...))
return wfs.Filesystem.(*mtimeFS), wfs wfs, _ := unwrapFilesystem(fs, filesystemWrapperTypeWalk)
mfs, _ := unwrapFilesystem(fs, filesystemWrapperTypeMtime)
return mfs.(*mtimeFS), wfs.(*walkFilesystem)
} }

View File

@ -137,6 +137,9 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf
f.pullPause = f.pullBasePause() f.pullPause = f.pullBasePause()
f.pullFailTimer = time.NewTimer(0) f.pullFailTimer = time.NewTimer(0)
<-f.pullFailTimer.C <-f.pullFailTimer.C
registerFolderMetrics(f.ID)
return f return f
} }
@ -459,6 +462,11 @@ func (f *folder) scanSubdirs(subDirs []string) error {
} }
defer f.ioLimiter.Give(1) defer f.ioLimiter.Give(1)
metricFolderScans.WithLabelValues(f.ID).Inc()
ctx, cancel := context.WithCancel(f.ctx)
defer cancel()
go addTimeUntilCancelled(ctx, metricFolderScanSeconds.WithLabelValues(f.ID))
for i := range subDirs { for i := range subDirs {
sub := osutil.NativeFilename(subDirs[i]) sub := osutil.NativeFilename(subDirs[i])

View File

@ -8,6 +8,7 @@ package model
import ( import (
"bytes" "bytes"
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -162,11 +163,12 @@ func (f *sendReceiveFolder) pull() (bool, error) {
scanChan := make(chan string) scanChan := make(chan string)
go f.pullScannerRoutine(scanChan) go f.pullScannerRoutine(scanChan)
defer close(scanChan)
defer func() { metricFolderPulls.WithLabelValues(f.ID).Inc()
close(scanChan) ctx, cancel := context.WithCancel(f.ctx)
f.setState(FolderIdle) defer cancel()
}() go addTimeUntilCancelled(ctx, metricFolderPullSeconds.WithLabelValues(f.ID))
changed := 0 changed := 0
@ -573,9 +575,9 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, snap *db.Snapshot,
}) })
}() }()
mode := fs.FileMode(file.Permissions & 0777) mode := fs.FileMode(file.Permissions & 0o777)
if f.IgnorePerms || file.NoPermissions { if f.IgnorePerms || file.NoPermissions {
mode = 0777 mode = 0o777
} }
if shouldDebug() { if shouldDebug() {
@ -705,7 +707,7 @@ func (f *sendReceiveFolder) checkParent(file string, scanChan chan<- string) boo
return true return true
} }
l.Debugf("%v creating parent directory of %v", f, file) l.Debugf("%v creating parent directory of %v", f, file)
if err := f.mtimefs.MkdirAll(parent, 0755); err != nil { if err := f.mtimefs.MkdirAll(parent, 0o755); err != nil {
f.newPullError(file, fmt.Errorf("creating parent dir: %w", err)) f.newPullError(file, fmt.Errorf("creating parent dir: %w", err))
return false return false
} }
@ -1136,12 +1138,12 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, snap *db.Snapshot
func (f *sendReceiveFolder) reuseBlocks(blocks []protocol.BlockInfo, reused []int, file protocol.FileInfo, tempName string) ([]protocol.BlockInfo, []int) { func (f *sendReceiveFolder) reuseBlocks(blocks []protocol.BlockInfo, reused []int, file protocol.FileInfo, tempName string) ([]protocol.BlockInfo, []int) {
// Check for an old temporary file which might have some blocks we could // Check for an old temporary file which might have some blocks we could
// reuse. // reuse.
tempBlocks, err := scanner.HashFile(f.ctx, f.mtimefs, tempName, file.BlockSize(), nil, false) tempBlocks, err := scanner.HashFile(f.ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil, false)
if err != nil { if err != nil {
var caseErr *fs.ErrCaseConflict var caseErr *fs.ErrCaseConflict
if errors.As(err, &caseErr) { if errors.As(err, &caseErr) {
if rerr := f.mtimefs.Rename(caseErr.Real, tempName); rerr == nil { if rerr := f.mtimefs.Rename(caseErr.Real, tempName); rerr == nil {
tempBlocks, err = scanner.HashFile(f.ctx, f.mtimefs, tempName, file.BlockSize(), nil, false) tempBlocks, err = scanner.HashFile(f.ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil, false)
} }
} }
} }
@ -1235,7 +1237,7 @@ func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan ch
f.queue.Done(file.Name) f.queue.Done(file.Name)
if !f.IgnorePerms && !file.NoPermissions { if !f.IgnorePerms && !file.NoPermissions {
if err = f.mtimefs.Chmod(file.Name, fs.FileMode(file.Permissions&0777)); err != nil { if err = f.mtimefs.Chmod(file.Name, fs.FileMode(file.Permissions&0o777)); err != nil {
f.newPullError(file.Name, fmt.Errorf("shortcut file (setting permissions): %w", err)) f.newPullError(file.Name, fmt.Errorf("shortcut file (setting permissions): %w", err))
return return
} }
@ -1249,7 +1251,7 @@ func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan ch
// Still need to re-write the trailer with the new encrypted fileinfo. // Still need to re-write the trailer with the new encrypted fileinfo.
if f.Type == config.FolderTypeReceiveEncrypted { if f.Type == config.FolderTypeReceiveEncrypted {
err = inWritableDir(func(path string) error { err = inWritableDir(func(path string) error {
fd, err := f.mtimefs.OpenFile(path, fs.OptReadWrite, 0666) fd, err := f.mtimefs.OpenFile(path, fs.OptReadWrite, 0o666)
if err != nil { if err != nil {
return err return err
} }
@ -1329,7 +1331,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
// block of all zeroes, so then we should not skip it. // block of all zeroes, so then we should not skip it.
// Pretend we copied it. // Pretend we copied it.
state.copiedFromOrigin() state.skippedSparseBlock(block.Size)
state.copyDone(block) state.copyDone(block)
continue continue
} }
@ -1348,9 +1350,9 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
state.fail(fmt.Errorf("dst write: %w", err)) state.fail(fmt.Errorf("dst write: %w", err))
} }
if offset == block.Offset { if offset == block.Offset {
state.copiedFromOrigin() state.copiedFromOrigin(block.Size)
} else { } else {
state.copiedFromOriginShifted() state.copiedFromOriginShifted(block.Size)
} }
return false return false
@ -1398,7 +1400,9 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
state.fail(fmt.Errorf("dst write: %w", err)) state.fail(fmt.Errorf("dst write: %w", err))
} }
if path == state.file.Name { if path == state.file.Name {
state.copiedFromOrigin() state.copiedFromOrigin(block.Size)
} else {
state.copiedFromElsewhere(block.Size)
} }
return true return true
}) })
@ -1608,7 +1612,7 @@ loop:
func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCurFile bool, tempName string, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error { func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCurFile bool, tempName string, snap *db.Snapshot, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
// Set the correct permission bits on the new file // Set the correct permission bits on the new file
if !f.IgnorePerms && !file.NoPermissions { if !f.IgnorePerms && !file.NoPermissions {
if err := f.mtimefs.Chmod(tempName, fs.FileMode(file.Permissions&0777)); err != nil { if err := f.mtimefs.Chmod(tempName, fs.FileMode(file.Permissions&0o777)); err != nil {
return fmt.Errorf("setting permissions: %w", err) return fmt.Errorf("setting permissions: %w", err)
} }
} }

View File

@ -299,7 +299,7 @@ func TestCopierFinder(t *testing.T) {
} }
// Verify that the fetched blocks have actually been written to the temp file // Verify that the fetched blocks have actually been written to the temp file
blks, err := scanner.HashFile(context.TODO(), f.Filesystem(nil), tempFile, protocol.MinBlockSize, nil, false) blks, err := scanner.HashFile(context.TODO(), f.ID, f.Filesystem(nil), tempFile, protocol.MinBlockSize, nil, false)
if err != nil { if err != nil {
t.Log(err) t.Log(err)
} }

View File

@ -396,6 +396,24 @@ func (c *folderSummaryService) sendSummary(ctx context.Context, folder string) {
Summary: data, Summary: data,
}) })
metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeFiles).Set(float64(data.GlobalFiles))
metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeDirectories).Set(float64(data.GlobalDirectories))
metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeSymlinks).Set(float64(data.GlobalSymlinks))
metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeDeleted).Set(float64(data.GlobalDeleted))
metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeBytes).Set(float64(data.GlobalBytes))
metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeFiles).Set(float64(data.LocalFiles))
metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeDirectories).Set(float64(data.LocalDirectories))
metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeSymlinks).Set(float64(data.LocalSymlinks))
metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeDeleted).Set(float64(data.LocalDeleted))
metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeBytes).Set(float64(data.LocalBytes))
metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeFiles).Set(float64(data.NeedFiles))
metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeDirectories).Set(float64(data.NeedDirectories))
metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeSymlinks).Set(float64(data.NeedSymlinks))
metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeDeleted).Set(float64(data.NeedDeletes))
metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeBytes).Set(float64(data.NeedBytes))
for _, devCfg := range c.cfg.Folders()[folder].Devices { for _, devCfg := range c.cfg.Folders()[folder].Devices {
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@ -111,6 +111,10 @@ func (s *stateTracker) setState(newState folderState) {
return return
} }
defer func() {
metricFolderState.WithLabelValues(s.folderID).Set(float64(s.current))
}()
/* This should hold later... /* This should hold later...
if s.current != FolderIdle && (newState == FolderScanning || newState == FolderSyncing) { if s.current != FolderIdle && (newState == FolderScanning || newState == FolderSyncing) {
panic("illegal state transition " + s.current.String() + " -> " + newState.String()) panic("illegal state transition " + s.current.String() + " -> " + newState.String())
@ -148,6 +152,10 @@ func (s *stateTracker) setError(err error) {
s.mut.Lock() s.mut.Lock()
defer s.mut.Unlock() defer s.mut.Unlock()
defer func() {
metricFolderState.WithLabelValues(s.folderID).Set(float64(s.current))
}()
eventData := map[string]interface{}{ eventData := map[string]interface{}{
"folder": s.folderID, "folder": s.folderID,
"from": s.current.String(), "from": s.current.String(),

93
lib/model/metrics.go Normal file
View File

@ -0,0 +1,93 @@
// Copyright (C) 2023 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package model
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
metricFolderState = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "syncthing",
Subsystem: "model",
Name: "folder_state",
Help: "Current folder state",
}, []string{"folder"})
metricFolderSummary = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "syncthing",
Subsystem: "model",
Name: "folder_summary",
Help: "Current folder summary data (counts for global/local/need files/directories/symlinks/deleted/bytes)",
}, []string{"folder", "scope", "type"})
metricFolderPulls = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "model",
Name: "folder_pulls_total",
Help: "Total number of folder pull iterations, per folder ID",
}, []string{"folder"})
metricFolderPullSeconds = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "model",
Name: "folder_pull_seconds_total",
Help: "Total time spent in folder pull iterations, per folder ID",
}, []string{"folder"})
metricFolderScans = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "model",
Name: "folder_scans_total",
Help: "Total number of folder scan iterations, per folder ID",
}, []string{"folder"})
metricFolderScanSeconds = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "model",
Name: "folder_scan_seconds_total",
Help: "Total time spent in folder scan iterations, per folder ID",
}, []string{"folder"})
metricFolderProcessedBytesTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "model",
Name: "folder_processed_bytes_total",
Help: "Total amount of data processed during folder syncing, per folder ID and data source (network/local_origin/local_other/local_shifted/skipped)",
}, []string{"folder", "source"})
)
const (
metricSourceNetwork = "network" // from the network
metricSourceLocalOrigin = "local_origin" // from the existing version of the local file
metricSourceLocalOther = "local_other" // from a different local file
metricSourceLocalShifted = "local_shifted" // from the existing version of the local file, rolling hash shifted
metricSourceSkipped = "skipped" // block of all zeroes, invented out of thin air
metricScopeGlobal = "global"
metricScopeLocal = "local"
metricScopeNeed = "need"
metricTypeFiles = "files"
metricTypeDirectories = "directories"
metricTypeSymlinks = "symlinks"
metricTypeDeleted = "deleted"
metricTypeBytes = "bytes"
)
func registerFolderMetrics(folderID string) {
// Register metrics for this folder, so that counters are present even
// when zero.
metricFolderState.WithLabelValues(folderID)
metricFolderPulls.WithLabelValues(folderID)
metricFolderPullSeconds.WithLabelValues(folderID)
metricFolderScans.WithLabelValues(folderID)
metricFolderScanSeconds.WithLabelValues(folderID)
metricFolderProcessedBytesTotal.WithLabelValues(folderID, metricSourceNetwork)
metricFolderProcessedBytesTotal.WithLabelValues(folderID, metricSourceLocalOrigin)
metricFolderProcessedBytesTotal.WithLabelValues(folderID, metricSourceLocalOther)
metricFolderProcessedBytesTotal.WithLabelValues(folderID, metricSourceLocalShifted)
metricFolderProcessedBytesTotal.WithLabelValues(folderID, metricSourceSkipped)
}

View File

@ -91,7 +91,7 @@ func TestProgressEmitter(t *testing.T) {
expectEvent(w, t, 1) expectEvent(w, t, 1)
expectTimeout(w, t) expectTimeout(w, t)
s.copiedFromOrigin() s.copiedFromOrigin(1)
expectEvent(w, t, 1) expectEvent(w, t, 1)
expectTimeout(w, t) expectTimeout(w, t)

View File

@ -152,11 +152,11 @@ func (s *sharedPullerState) tempFileInWritableDir(_ string) error {
// permissions will be set to the final value later, but in the meantime // permissions will be set to the final value later, but in the meantime
// we don't want to have a temporary file with looser permissions than // we don't want to have a temporary file with looser permissions than
// the final outcome. // the final outcome.
mode := fs.FileMode(s.file.Permissions) | 0600 mode := fs.FileMode(s.file.Permissions) | 0o600
if s.ignorePerms { if s.ignorePerms {
// When ignorePerms is set we use a very permissive mode and let the // When ignorePerms is set we use a very permissive mode and let the
// system umask filter it. // system umask filter it.
mode = 0666 mode = 0o666
} }
// Attempt to create the temp file // Attempt to create the temp file
@ -261,19 +261,34 @@ func (s *sharedPullerState) copyDone(block protocol.BlockInfo) {
s.mut.Unlock() s.mut.Unlock()
} }
func (s *sharedPullerState) copiedFromOrigin() { func (s *sharedPullerState) copiedFromOrigin(bytes int) {
s.mut.Lock() s.mut.Lock()
s.copyOrigin++ s.copyOrigin++
s.updated = time.Now() s.updated = time.Now()
s.mut.Unlock() s.mut.Unlock()
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalOrigin).Add(float64(bytes))
} }
func (s *sharedPullerState) copiedFromOriginShifted() { func (s *sharedPullerState) copiedFromElsewhere(bytes int) {
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalOther).Add(float64(bytes))
}
func (s *sharedPullerState) skippedSparseBlock(bytes int) {
// pretend we copied it, historical
s.mut.Lock()
s.copyOrigin++
s.updated = time.Now()
s.mut.Unlock()
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceSkipped).Add(float64(bytes))
}
func (s *sharedPullerState) copiedFromOriginShifted(bytes int) {
s.mut.Lock() s.mut.Lock()
s.copyOrigin++ s.copyOrigin++
s.copyOriginShifted++ s.copyOriginShifted++
s.updated = time.Now() s.updated = time.Now()
s.mut.Unlock() s.mut.Unlock()
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalShifted).Add(float64(bytes))
} }
func (s *sharedPullerState) pullStarted() { func (s *sharedPullerState) pullStarted() {
@ -295,6 +310,7 @@ func (s *sharedPullerState) pullDone(block protocol.BlockInfo) {
s.availableUpdated = time.Now() s.availableUpdated = time.Now()
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded) l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
s.mut.Unlock() s.mut.Unlock()
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceNetwork).Add(float64(block.Size))
} }
// finalClose atomically closes and returns closed status of a file. A true // finalClose atomically closes and returns closed status of a file. A true

View File

@ -7,6 +7,7 @@
package model package model
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"path/filepath" "path/filepath"
@ -14,6 +15,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/prometheus/client_golang/prometheus"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ur" "github.com/syncthing/syncthing/lib/ur"
@ -117,11 +119,11 @@ func inWritableDir(fn func(string) error, targetFs fs.Filesystem, path string, i
const permBits = fs.ModePerm | fs.ModeSetuid | fs.ModeSetgid | fs.ModeSticky const permBits = fs.ModePerm | fs.ModeSetuid | fs.ModeSetgid | fs.ModeSticky
var parentErr error var parentErr error
if mode := info.Mode() & permBits; mode&0200 == 0 { if mode := info.Mode() & permBits; mode&0o200 == 0 {
// A non-writeable directory (for this user; we assume that's the // A non-writeable directory (for this user; we assume that's the
// relevant part). Temporarily change the mode so we can delete the // relevant part). Temporarily change the mode so we can delete the
// file or directory inside it. // file or directory inside it.
parentErr = targetFs.Chmod(dir, mode|0700) parentErr = targetFs.Chmod(dir, mode|0o700)
if parentErr != nil { if parentErr != nil {
l.Debugf("Failed to make parent directory writable: %v", parentErr) l.Debugf("Failed to make parent directory writable: %v", parentErr)
} else { } else {
@ -148,3 +150,27 @@ func inWritableDir(fn func(string) error, targetFs fs.Filesystem, path string, i
} }
return err return err
} }
// addTimeUntilCancelled adds time to the counter for the duration of the
// Context. We do this piecemeal so that polling the counter during a long
// operation shows a relevant value, instead of the counter just increasing
// by a large amount at the end of the operation.
func addTimeUntilCancelled(ctx context.Context, counter prometheus.Counter) {
t0 := time.Now()
defer func() {
counter.Add(time.Since(t0).Seconds())
}()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case t := <-ticker.C:
counter.Add(t.Sub(t0).Seconds())
t0 = t
case <-ctx.Done():
return
}
}
}

View File

@ -10,8 +10,9 @@ import (
type countingReader struct { type countingReader struct {
io.Reader io.Reader
tot atomic.Int64 // bytes idString string
last atomic.Int64 // unix nanos tot atomic.Int64 // bytes
last atomic.Int64 // unix nanos
} }
var ( var (
@ -24,6 +25,7 @@ func (c *countingReader) Read(bs []byte) (int, error) {
c.tot.Add(int64(n)) c.tot.Add(int64(n))
totalIncoming.Add(int64(n)) totalIncoming.Add(int64(n))
c.last.Store(time.Now().UnixNano()) c.last.Store(time.Now().UnixNano())
metricDeviceRecvBytes.WithLabelValues(c.idString).Add(float64(n))
return n, err return n, err
} }
@ -35,8 +37,9 @@ func (c *countingReader) Last() time.Time {
type countingWriter struct { type countingWriter struct {
io.Writer io.Writer
tot atomic.Int64 // bytes idString string
last atomic.Int64 // unix nanos tot atomic.Int64 // bytes
last atomic.Int64 // unix nanos
} }
func (c *countingWriter) Write(bs []byte) (int, error) { func (c *countingWriter) Write(bs []byte) (int, error) {
@ -44,6 +47,7 @@ func (c *countingWriter) Write(bs []byte) (int, error) {
c.tot.Add(int64(n)) c.tot.Add(int64(n))
totalOutgoing.Add(int64(n)) totalOutgoing.Add(int64(n))
c.last.Store(time.Now().UnixNano()) c.last.Store(time.Now().UnixNano())
metricDeviceSentBytes.WithLabelValues(c.idString).Add(float64(n))
return n, err return n, err
} }

62
lib/protocol/metrics.go Normal file
View File

@ -0,0 +1,62 @@
// Copyright (C) 2023 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package protocol
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
metricDeviceSentBytes = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "protocol",
Name: "sent_bytes_total",
Help: "Total amount of data sent, per device",
}, []string{"device"})
metricDeviceSentUncompressedBytes = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "protocol",
Name: "sent_uncompressed_bytes_total",
Help: "Total amount of data sent, before compression, per device",
}, []string{"device"})
metricDeviceSentMessages = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "protocol",
Name: "sent_messages_total",
Help: "Total number of messages sent, per device",
}, []string{"device"})
metricDeviceRecvBytes = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "protocol",
Name: "recv_bytes_total",
Help: "Total amount of data received, per device",
}, []string{"device"})
metricDeviceRecvDecompressedBytes = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "protocol",
Name: "recv_decompressed_bytes_total",
Help: "Total amount of data received, after decompression, per device",
}, []string{"device"})
metricDeviceRecvMessages = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "protocol",
Name: "recv_messages_total",
Help: "Total number of messages received, per device",
}, []string{"device"})
)
func registerDeviceMetrics(deviceID string) {
// Register metrics for this device, so that counters are present even
// when zero.
metricDeviceSentBytes.WithLabelValues(deviceID)
metricDeviceSentUncompressedBytes.WithLabelValues(deviceID)
metricDeviceSentMessages.WithLabelValues(deviceID)
metricDeviceRecvBytes.WithLabelValues(deviceID)
metricDeviceRecvMessages.WithLabelValues(deviceID)
}

View File

@ -183,6 +183,7 @@ type rawConnection struct {
ConnectionInfo ConnectionInfo
deviceID DeviceID deviceID DeviceID
idString string
model contextLessModel model contextLessModel
startTime time.Time startTime time.Time
@ -263,12 +264,15 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer
} }
func newRawConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, receiver contextLessModel, connInfo ConnectionInfo, compress Compression) *rawConnection { func newRawConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, receiver contextLessModel, connInfo ConnectionInfo, compress Compression) *rawConnection {
cr := &countingReader{Reader: reader} idString := deviceID.String()
cw := &countingWriter{Writer: writer} cr := &countingReader{Reader: reader, idString: idString}
cw := &countingWriter{Writer: writer, idString: idString}
registerDeviceMetrics(idString)
return &rawConnection{ return &rawConnection{
ConnectionInfo: connInfo, ConnectionInfo: connInfo,
deviceID: deviceID, deviceID: deviceID,
idString: deviceID.String(),
model: receiver, model: receiver,
cr: cr, cr: cr,
cw: cw, cw: cw,
@ -445,6 +449,8 @@ func (c *rawConnection) dispatcherLoop() (err error) {
return ErrClosed return ErrClosed
} }
metricDeviceRecvMessages.WithLabelValues(c.idString).Inc()
msgContext, err := messageContext(msg) msgContext, err := messageContext(msg)
if err != nil { if err != nil {
return fmt.Errorf("protocol error: %w", err) return fmt.Errorf("protocol error: %w", err)
@ -553,6 +559,8 @@ func (c *rawConnection) readMessageAfterHeader(hdr Header, fourByteBuf []byte) (
// ... and is then unmarshalled // ... and is then unmarshalled
metricDeviceRecvDecompressedBytes.WithLabelValues(c.idString).Add(float64(4 + len(buf)))
msg, err := newMessage(hdr.Type) msg, err := newMessage(hdr.Type)
if err != nil { if err != nil {
BufferPool.Put(buf) BufferPool.Put(buf)
@ -593,6 +601,8 @@ func (c *rawConnection) readHeader(fourByteBuf []byte) (Header, error) {
return Header{}, fmt.Errorf("unmarshalling header: %w", err) return Header{}, fmt.Errorf("unmarshalling header: %w", err)
} }
metricDeviceRecvDecompressedBytes.WithLabelValues(c.idString).Add(float64(2 + len(buf)))
return hdr, nil return hdr, nil
} }
@ -758,6 +768,10 @@ func (c *rawConnection) writeMessage(msg message) error {
msgContext, _ := messageContext(msg) msgContext, _ := messageContext(msg)
l.Debugf("Writing %v", msgContext) l.Debugf("Writing %v", msgContext)
defer func() {
metricDeviceSentMessages.WithLabelValues(c.idString).Inc()
}()
size := msg.ProtoSize() size := msg.ProtoSize()
hdr := Header{ hdr := Header{
Type: typeOf(msg), Type: typeOf(msg),
@ -784,6 +798,8 @@ func (c *rawConnection) writeMessage(msg message) error {
} }
} }
metricDeviceSentUncompressedBytes.WithLabelValues(c.idString).Add(float64(totSize))
// Header length // Header length
binary.BigEndian.PutUint16(buf, uint16(hdrSize)) binary.BigEndian.PutUint16(buf, uint16(hdrSize))
// Header // Header
@ -817,6 +833,9 @@ func (c *rawConnection) writeCompressedMessage(msg message, marshaled []byte) (o
} }
cOverhead := 2 + hdrSize + 4 cOverhead := 2 + hdrSize + 4
metricDeviceSentUncompressedBytes.WithLabelValues(c.idString).Add(float64(cOverhead + len(marshaled)))
// The compressed size may be at most n-n/32 = .96875*n bytes, // The compressed size may be at most n-n/32 = .96875*n bytes,
// I.e., if we can't save at least 3.125% bandwidth, we forgo compression. // I.e., if we can't save at least 3.125% bandwidth, we forgo compression.
// This number is arbitrary but cheap to compute. // This number is arbitrary but cheap to compute.

View File

@ -16,7 +16,7 @@ import (
) )
// HashFile hashes the files and returns a list of blocks representing the file. // HashFile hashes the files and returns a list of blocks representing the file.
func HashFile(ctx context.Context, fs fs.Filesystem, path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) { func HashFile(ctx context.Context, folderID string, fs fs.Filesystem, path string, blockSize int, counter Counter, useWeakHashes bool) ([]protocol.BlockInfo, error) {
fd, err := fs.Open(path) fd, err := fs.Open(path)
if err != nil { if err != nil {
l.Debugln("open:", err) l.Debugln("open:", err)
@ -42,6 +42,8 @@ func HashFile(ctx context.Context, fs fs.Filesystem, path string, blockSize int,
return nil, err return nil, err
} }
metricHashedBytes.WithLabelValues(folderID).Add(float64(size))
// Recheck the size and modtime again. If they differ, the file changed // Recheck the size and modtime again. If they differ, the file changed
// while we were reading it and our hash results are invalid. // while we were reading it and our hash results are invalid.
@ -62,22 +64,24 @@ func HashFile(ctx context.Context, fs fs.Filesystem, path string, blockSize int,
// workers are used in parallel. The outbox will become closed when the inbox // workers are used in parallel. The outbox will become closed when the inbox
// is closed and all items handled. // is closed and all items handled.
type parallelHasher struct { type parallelHasher struct {
fs fs.Filesystem folderID string
outbox chan<- ScanResult fs fs.Filesystem
inbox <-chan protocol.FileInfo outbox chan<- ScanResult
counter Counter inbox <-chan protocol.FileInfo
done chan<- struct{} counter Counter
wg sync.WaitGroup done chan<- struct{}
wg sync.WaitGroup
} }
func newParallelHasher(ctx context.Context, fs fs.Filesystem, workers int, outbox chan<- ScanResult, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}) { func newParallelHasher(ctx context.Context, folderID string, fs fs.Filesystem, workers int, outbox chan<- ScanResult, inbox <-chan protocol.FileInfo, counter Counter, done chan<- struct{}) {
ph := &parallelHasher{ ph := &parallelHasher{
fs: fs, folderID: folderID,
outbox: outbox, fs: fs,
inbox: inbox, outbox: outbox,
counter: counter, inbox: inbox,
done: done, counter: counter,
wg: sync.NewWaitGroup(), done: done,
wg: sync.NewWaitGroup(),
} }
ph.wg.Add(workers) ph.wg.Add(workers)
@ -104,7 +108,7 @@ func (ph *parallelHasher) hashFiles(ctx context.Context) {
panic("Bug. Asked to hash a directory or a deleted file.") panic("Bug. Asked to hash a directory or a deleted file.")
} }
blocks, err := HashFile(ctx, ph.fs, f.Name, f.BlockSize(), ph.counter, true) blocks, err := HashFile(ctx, ph.folderID, ph.fs, f.Name, f.BlockSize(), ph.counter, true)
if err != nil { if err != nil {
handleError(ctx, "hashing", f.Name, err, ph.outbox) handleError(ctx, "hashing", f.Name, err, ph.outbox)
continue continue

35
lib/scanner/metrics.go Normal file
View File

@ -0,0 +1,35 @@
// Copyright (C) 2023 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package scanner
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
metricHashedBytes = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "scanner",
Name: "hashed_bytes_total",
Help: "Total amount of data hashed, per folder",
}, []string{"folder"})
metricScannedItems = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "syncthing",
Subsystem: "scanner",
Name: "scanned_items_total",
Help: "Total number of items (files/directories) inspected, per folder",
}, []string{"folder"})
)
func registerFolderMetrics(folderID string) {
// Register metrics for this folder, so that counters are present even
// when zero.
metricHashedBytes.WithLabelValues(folderID)
metricScannedItems.WithLabelValues(folderID)
}

View File

@ -115,7 +115,7 @@ type fakeInfo struct {
} }
func (f fakeInfo) Name() string { return f.name } func (f fakeInfo) Name() string { return f.name }
func (fakeInfo) Mode() fs.FileMode { return 0755 } func (fakeInfo) Mode() fs.FileMode { return 0o755 }
func (f fakeInfo) Size() int64 { return f.size } func (f fakeInfo) Size() int64 { return f.size }
func (fakeInfo) ModTime() time.Time { return time.Unix(1234567890, 0) } func (fakeInfo) ModTime() time.Time { return time.Unix(1234567890, 0) }
func (f fakeInfo) IsDir() bool { func (f fakeInfo) IsDir() bool {

View File

@ -104,6 +104,7 @@ func newWalker(cfg Config) *walker {
w.Matcher = ignore.New(w.Filesystem) w.Matcher = ignore.New(w.Filesystem)
} }
registerFolderMetrics(w.Folder)
return w return w
} }
@ -132,7 +133,7 @@ func (w *walker) walk(ctx context.Context) chan ScanResult {
// We're not required to emit scan progress events, just kick off hashers, // We're not required to emit scan progress events, just kick off hashers,
// and feed inputs directly from the walker. // and feed inputs directly from the walker.
if w.ProgressTickIntervalS < 0 { if w.ProgressTickIntervalS < 0 {
newParallelHasher(ctx, w.Filesystem, w.Hashers, finishedChan, toHashChan, nil, nil) newParallelHasher(ctx, w.Folder, w.Filesystem, w.Hashers, finishedChan, toHashChan, nil, nil)
return finishedChan return finishedChan
} }
@ -163,7 +164,7 @@ func (w *walker) walk(ctx context.Context) chan ScanResult {
done := make(chan struct{}) done := make(chan struct{})
progress := newByteCounter() progress := newByteCounter()
newParallelHasher(ctx, w.Filesystem, w.Hashers, finishedChan, realToHashChan, progress, done) newParallelHasher(ctx, w.Folder, w.Filesystem, w.Hashers, finishedChan, realToHashChan, progress, done)
// A routine which actually emits the FolderScanProgress events // A routine which actually emits the FolderScanProgress events
// every w.ProgressTicker ticks, until the hasher routines terminate. // every w.ProgressTicker ticks, until the hasher routines terminate.
@ -255,6 +256,8 @@ func (w *walker) walkAndHashFiles(ctx context.Context, toHashChan chan<- protoco
default: default:
} }
metricScannedItems.WithLabelValues(w.Folder).Inc()
// Return value used when we are returning early and don't want to // Return value used when we are returning early and don't want to
// process the item. For directories, this means do-not-descend. // process the item. For directories, this means do-not-descend.
var skip error // nil var skip error // nil
@ -599,7 +602,7 @@ func (w *walker) updateFileInfo(dst, src protocol.FileInfo) protocol.FileInfo {
if dst.Type == protocol.FileInfoTypeFile && build.IsWindows { if dst.Type == protocol.FileInfoTypeFile && build.IsWindows {
// If we have an existing index entry, copy the executable bits // If we have an existing index entry, copy the executable bits
// from there. // from there.
dst.Permissions |= (src.Permissions & 0111) dst.Permissions |= (src.Permissions & 0o111)
} }
dst.Version = src.Version.Update(w.ShortID) dst.Version = src.Version.Update(w.ShortID)
dst.ModifiedBy = w.ShortID dst.ModifiedBy = w.ShortID

View File

@ -635,7 +635,7 @@ func BenchmarkHashFile(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
if _, err := HashFile(context.TODO(), testFs, testdataName, protocol.MinBlockSize, nil, true); err != nil { if _, err := HashFile(context.TODO(), "", testFs, testdataName, protocol.MinBlockSize, nil, true); err != nil {
b.Fatal(err) b.Fatal(err)
} }
} }

187
script/find-metrics.go Normal file
View File

@ -0,0 +1,187 @@
// Copyright (C) 2023 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
// Usage: go run script/find-metrics.go > metrics.md
//
// This script finds all of the metrics in the Syncthing codebase and prints
// them in Markdown format. It's used to generate the metrics documentation
// for the Syncthing docs.
package main
import (
"fmt"
"go/ast"
"go/token"
"log"
"strconv"
"strings"
"golang.org/x/exp/slices"
"golang.org/x/tools/go/packages"
)
type metric struct {
subsystem string
name string
help string
kind string
}
func main() {
opts := &packages.Config{
Mode: packages.NeedSyntax | packages.NeedName | packages.NeedTypes | packages.NeedTypesInfo | packages.NeedImports | packages.NeedDeps,
}
pkgs, err := packages.Load(opts, "github.com/syncthing/syncthing/...")
if err != nil {
log.Fatalln(err)
}
var coll metricCollector
for _, pkg := range pkgs {
for _, file := range pkg.Syntax {
ast.Inspect(file, coll.Visit)
}
}
coll.print()
}
type metricCollector struct {
metrics []metric
}
func (c *metricCollector) Visit(n ast.Node) bool {
if gen, ok := n.(*ast.GenDecl); ok {
// We're only interested in var declarations (var metricWhatever =
// promauto.NewCounter(...) etc).
if gen.Tok != token.VAR {
return false
}
for _, spec := range gen.Specs {
// We want to look at the value given to a var (the NewCounter()
// etc call).
if vsp, ok := spec.(*ast.ValueSpec); ok {
// There should be only one value.
if len(vsp.Values) != 1 {
continue
}
// The value should be a function call.
call, ok := vsp.Values[0].(*ast.CallExpr)
if !ok {
continue
}
// The call should be a selector expression
// (package.Identifer).
sel, ok := call.Fun.(*ast.SelectorExpr)
if !ok {
continue
}
// The package selector should be `promauto`.
selID, ok := sel.X.(*ast.Ident)
if !ok || selID.Name != "promauto" {
continue
}
// The function should be one of the New* functions.
var kind string
switch sel.Sel.Name {
case "NewCounter":
kind = "counter"
case "NewGauge":
kind = "gauge"
case "NewCounterVec":
kind = "counter vector"
case "NewGaugeVec":
kind = "gauge vector"
default:
continue
}
// The arguments to the function should be a single
// composite (struct literal). Grab all of the fields in the
// declaration into a map so we can easily access them.
args := make(map[string]string)
for _, el := range call.Args[0].(*ast.CompositeLit).Elts {
kv := el.(*ast.KeyValueExpr)
key := kv.Key.(*ast.Ident).Name // e.g., "Name"
val := kv.Value.(*ast.BasicLit).Value // e.g., `"foo"`
args[key], _ = strconv.Unquote(val)
}
// Build the full name of the metric from the namespace +
// subsystem + name, like Prometheus does.
var parts []string
if v := args["Namespace"]; v != "" {
parts = append(parts, v)
}
if v := args["Subsystem"]; v != "" {
parts = append(parts, v)
}
if v := args["Name"]; v != "" {
parts = append(parts, v)
}
fullName := strings.Join(parts, "_")
// Add the metric to the list.
c.metrics = append(c.metrics, metric{
subsystem: args["Subsystem"],
name: fullName,
help: args["Help"],
kind: kind,
})
}
}
}
return true
}
func (c *metricCollector) print() {
slices.SortFunc(c.metrics, func(a, b metric) bool {
if a.subsystem != b.subsystem {
return a.subsystem < b.subsystem
}
return a.name < b.name
})
var prevSubsystem string
for _, m := range c.metrics {
if m.subsystem != prevSubsystem {
fmt.Printf("## Package `%s`\n\n", m.subsystem)
prevSubsystem = m.subsystem
}
fmt.Printf("### `%v` (%s)\n\n%s\n\n", m.name, m.kind, wordwrap(sentenceize(m.help), 72))
}
}
func sentenceize(s string) string {
if s == "" {
return ""
}
if !strings.HasSuffix(s, ".") {
return s + "."
}
return s
}
func wordwrap(s string, width int) string {
var lines []string
for _, line := range strings.Split(s, "\n") {
for len(line) > width {
i := strings.LastIndex(line[:width], " ")
if i == -1 {
i = width
}
lines = append(lines, line[:i])
line = line[i+1:]
}
lines = append(lines, line)
}
return strings.Join(lines, "\n")
}