From 0e67c036bbd0c25bbeb2b624658d03f1e7de6710 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Sun, 12 Apr 2020 10:26:57 +0200 Subject: [PATCH] lib/db: Make database GC a service, stop on Stop() (#6518) This makes the GC runner a service that will stop fairly quickly when told to. As a bonus, STTRACE=app will print the service tree on the way out, including any errors they've flagged. --- lib/db/lowlevel.go | 40 ++++++++++++++++++++++++++----------- lib/syncthing/debug.go | 4 ++++ lib/syncthing/syncthing.go | 41 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 12 deletions(-) diff --git a/lib/db/lowlevel.go b/lib/db/lowlevel.go index 67ead0596..7ee44f65c 100644 --- a/lib/db/lowlevel.go +++ b/lib/db/lowlevel.go @@ -8,12 +8,15 @@ package db import ( "bytes" + "context" "encoding/binary" "time" "github.com/syncthing/syncthing/lib/db/backend" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/sync" + "github.com/syncthing/syncthing/lib/util" + "github.com/thejerf/suture" "github.com/willf/bloom" ) @@ -40,24 +43,30 @@ const ( // database can only be opened once, there should be only one Lowlevel for // any given backend. type Lowlevel struct { + *suture.Supervisor backend.Backend folderIdx *smallIndex deviceIdx *smallIndex keyer keyer gcMut sync.RWMutex gcKeyCount int - gcStop chan struct{} indirectGCInterval time.Duration recheckInterval time.Duration } func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel { db := &Lowlevel{ + Supervisor: suture.New("db.Lowlevel", suture.Spec{ + // Only log restarts in debug mode. + Log: func(line string) { + l.Debugln(line) + }, + PassThroughPanics: true, + }), Backend: backend, folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}), deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}), gcMut: sync.NewRWMutex(), - gcStop: make(chan struct{}), indirectGCInterval: indirectGCDefaultInterval, recheckInterval: recheckDefaultInterval, } @@ -65,7 +74,7 @@ func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel { opt(db) } db.keyer = newDefaultKeyer(db.folderIdx, db.deviceIdx) - go db.gcRunner() + db.Add(util.AsService(db.gcRunner, "db.Lowlevel/gcRunner")) return db } @@ -90,11 +99,6 @@ func WithIndirectGCInterval(dur time.Duration) Option { } } -func (db *Lowlevel) Close() error { - close(db.gcStop) - return db.Backend.Close() -} - // ListFolders returns the list of folders currently in the database func (db *Lowlevel) ListFolders() []string { return db.folderIdx.Values() @@ -515,7 +519,7 @@ func (db *Lowlevel) dropPrefix(prefix []byte) error { return t.Commit() } -func (db *Lowlevel) gcRunner() { +func (db *Lowlevel) gcRunner(ctx context.Context) { // Calculate the time for the next GC run. Even if we should run GC // directly, give the system a while to get up and running and do other // stuff first. (We might have migrations and stuff which would be @@ -530,10 +534,10 @@ func (db *Lowlevel) gcRunner() { for { select { - case <-db.gcStop: + case <-ctx.Done(): return case <-t.C: - if err := db.gcIndirect(); err != nil { + if err := db.gcIndirect(ctx); err != nil { l.Warnln("Database indirection GC failed:", err) } db.recordTime(indirectGCTimeKey) @@ -562,7 +566,7 @@ func (db *Lowlevel) timeUntil(key string, every time.Duration) time.Duration { return sleepTime } -func (db *Lowlevel) gcIndirect() error { +func (db *Lowlevel) gcIndirect(ctx context.Context) error { // The indirection GC uses bloom filters to track used block lists and // versions. This means iterating over all items, adding their hashes to // the filter, then iterating over the indirected items and removing @@ -602,6 +606,12 @@ func (db *Lowlevel) gcIndirect() error { } defer it.Release() for it.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + var bl BlocksHashOnly if err := bl.Unmarshal(it.Value()); err != nil { return err @@ -625,6 +635,12 @@ func (db *Lowlevel) gcIndirect() error { defer it.Release() matchedBlocks := 0 for it.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + key := blockListKey(it.Key()) if blockFilter.Test(key.BlocksHash()) { matchedBlocks++ diff --git a/lib/syncthing/debug.go b/lib/syncthing/debug.go index 203d07607..aa525e4a0 100644 --- a/lib/syncthing/debug.go +++ b/lib/syncthing/debug.go @@ -13,3 +13,7 @@ import ( var ( l = logger.DefaultLogger.NewFacility("app", "Main run facility") ) + +func shouldDebug() bool { + return l.ShouldDebug("app") +} diff --git a/lib/syncthing/syncthing.go b/lib/syncthing/syncthing.go index fe9a37eff..6b7615abc 100644 --- a/lib/syncthing/syncthing.go +++ b/lib/syncthing/syncthing.go @@ -12,7 +12,9 @@ import ( "fmt" "io" "net/http" + "os" "runtime" + "sort" "strings" "sync" "time" @@ -126,6 +128,7 @@ func (a *App) startup() error { }, PassThroughPanics: true, }) + a.mainService.Add(a.ll) a.mainService.ServeBackground() if a.opts.AuditWriter != nil { @@ -371,6 +374,10 @@ func (a *App) startup() error { func (a *App) run() { <-a.stop + if shouldDebug() { + l.Debugln("Services before stop:") + printServiceTree(os.Stdout, a.mainService, 0) + } a.mainService.Stop() done := make(chan struct{}) @@ -475,3 +482,37 @@ func (e *controller) Shutdown() { func (e *controller) ExitUpgrading() { e.Stop(ExitUpgrade) } + +type supervisor interface{ Services() []suture.Service } + +func printServiceTree(w io.Writer, sup supervisor, level int) { + printService(w, sup, level) + + svcs := sup.Services() + sort.Slice(svcs, func(a, b int) bool { + return fmt.Sprint(svcs[a]) < fmt.Sprint(svcs[b]) + }) + + for _, svc := range svcs { + if sub, ok := svc.(supervisor); ok { + printServiceTree(w, sub, level+1) + } else { + printService(w, svc, level+1) + } + } +} + +func printService(w io.Writer, svc interface{}, level int) { + type errorer interface{ Error() error } + + t := "-" + if _, ok := svc.(supervisor); ok { + t = "+" + } + fmt.Fprintln(w, strings.Repeat(" ", level), t, svc) + if es, ok := svc.(errorer); ok { + if err := es.Error(); err != nil { + fmt.Fprintln(w, strings.Repeat(" ", level), " ->", err) + } + } +}