all: Implement suture v4-api (#6947)

This commit is contained in:
Simon Frei 2020-11-17 13:19:04 +01:00 committed by GitHub
parent e8fc465ea8
commit 9524b51708
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
65 changed files with 617 additions and 693 deletions

View File

@ -7,6 +7,7 @@
package main
import (
"context"
"crypto/rand"
"encoding/binary"
"flag"
@ -47,14 +48,16 @@ func main() {
log.Println("My ID:", myID)
}
runbeacon(beacon.NewMulticast(mc), fake)
runbeacon(beacon.NewBroadcast(bc), fake)
ctx := context.Background()
runbeacon(ctx, beacon.NewMulticast(mc), fake)
runbeacon(ctx, beacon.NewBroadcast(bc), fake)
select {}
}
func runbeacon(bc beacon.Interface, fake bool) {
go bc.Serve()
func runbeacon(ctx context.Context, bc beacon.Interface, fake bool) {
go bc.Serve(ctx)
go recv(bc)
if fake {
go send(bc)

View File

@ -66,12 +66,12 @@ func newAPISrv(addr string, cert tls.Certificate, db database, repl replicator,
}
}
func (s *apiSrv) Serve() {
func (s *apiSrv) Serve(ctx context.Context) error {
if s.useHTTP {
listener, err := net.Listen("tcp", s.addr)
if err != nil {
log.Println("Listen:", err)
return
return err
}
s.listener = listener
} else {
@ -93,7 +93,7 @@ func (s *apiSrv) Serve() {
tlsListener, err := tls.Listen("tcp", s.addr, tlsCfg)
if err != nil {
log.Println("Listen:", err)
return
return err
}
s.listener = tlsListener
}
@ -107,9 +107,11 @@ func (s *apiSrv) Serve() {
MaxHeaderBytes: httpMaxHeaderBytes,
}
if err := srv.Serve(s.listener); err != nil {
err := srv.Serve(s.listener)
if err != nil {
log.Println("Serve:", err)
}
return err
}
var topCtx = context.Background()

View File

@ -10,6 +10,7 @@
package main
import (
"context"
"log"
"sort"
"time"
@ -37,7 +38,6 @@ type database interface {
type levelDBStore struct {
db *leveldb.DB
inbox chan func()
stop chan struct{}
clock clock
marshalBuf []byte
}
@ -50,7 +50,6 @@ func newLevelDBStore(dir string) (*levelDBStore, error) {
return &levelDBStore{
db: db,
inbox: make(chan func(), 16),
stop: make(chan struct{}),
clock: defaultClock{},
}, nil
}
@ -155,7 +154,7 @@ func (s *levelDBStore) get(key string) (DatabaseRecord, error) {
return rec, nil
}
func (s *levelDBStore) Serve() {
func (s *levelDBStore) Serve(ctx context.Context) error {
t := time.NewTimer(0)
defer t.Stop()
defer s.db.Close()
@ -183,7 +182,7 @@ loop:
// the next.
t.Reset(databaseStatisticsInterval)
case <-s.stop:
case <-ctx.Done():
// We're done.
close(statisticsTrigger)
break loop
@ -192,6 +191,8 @@ loop:
// Also wait for statisticsServe to return
<-statisticsDone
return nil
}
func (s *levelDBStore) statisticsServe(trigger <-chan struct{}, done chan<- struct{}) {
@ -255,10 +256,6 @@ func (s *levelDBStore) statisticsServe(trigger <-chan struct{}, done chan<- stru
}
}
func (s *levelDBStore) Stop() {
close(s.stop)
}
// merge returns the merged result of the two database records a and b. The
// result is the union of the two address sets, with the newer expiry time
// chosen for any duplicates.

View File

@ -7,6 +7,7 @@
package main
import (
"context"
"fmt"
"os"
"testing"
@ -20,8 +21,9 @@ func TestDatabaseGetSet(t *testing.T) {
if err != nil {
t.Fatal(err)
}
go db.Serve()
defer db.Stop()
ctx, cancel := context.WithCancel(context.Background())
go db.Serve(ctx)
defer cancel()
// Check missing record

View File

@ -7,6 +7,7 @@
package main
import (
"context"
"crypto/tls"
"flag"
"log"
@ -21,7 +22,7 @@ import (
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/tlsutil"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
)
const (
@ -183,5 +184,5 @@ func main() {
}
// Engage!
main.Serve()
main.Serve(context.Background())
}

View File

@ -7,6 +7,7 @@
package main
import (
"context"
"crypto/tls"
"encoding/binary"
"fmt"
@ -32,7 +33,6 @@ type replicationSender struct {
cert tls.Certificate // our certificate
allowedIDs []protocol.DeviceID
outbox chan ReplicationRecord
stop chan struct{}
}
func newReplicationSender(dst string, cert tls.Certificate, allowedIDs []protocol.DeviceID) *replicationSender {
@ -41,11 +41,10 @@ func newReplicationSender(dst string, cert tls.Certificate, allowedIDs []protoco
cert: cert,
allowedIDs: allowedIDs,
outbox: make(chan ReplicationRecord, replicationOutboxSize),
stop: make(chan struct{}),
}
}
func (s *replicationSender) Serve() {
func (s *replicationSender) Serve(ctx context.Context) error {
// Sleep a little at startup. Peers often restart at the same time, and
// this avoid the service failing and entering backoff state
// unnecessarily, while also reducing the reconnect rate to something
@ -62,7 +61,7 @@ func (s *replicationSender) Serve() {
conn, err := tls.Dial("tcp", s.dst, tlsCfg)
if err != nil {
log.Println("Replication connect:", err)
return
return err
}
defer func() {
conn.SetWriteDeadline(time.Now().Add(time.Second))
@ -73,13 +72,13 @@ func (s *replicationSender) Serve() {
remoteID, err := deviceID(conn)
if err != nil {
log.Println("Replication connect:", err)
return
return err
}
// Verify it's in the set of allowed device IDs.
if !deviceIDIn(remoteID, s.allowedIDs) {
log.Println("Replication connect: unexpected device ID:", remoteID)
return
return err
}
heartBeatTicker := time.NewTicker(replicationHeartbeatInterval)
@ -122,20 +121,16 @@ func (s *replicationSender) Serve() {
replicationSendsTotal.WithLabelValues("error").Inc()
log.Println("Replication write:", err)
// Yes, we are loosing the replication event here.
return
return err
}
replicationSendsTotal.WithLabelValues("success").Inc()
case <-s.stop:
return
case <-ctx.Done():
return nil
}
}
}
func (s *replicationSender) Stop() {
close(s.stop)
}
func (s *replicationSender) String() string {
return fmt.Sprintf("replicationSender(%q)", s.dst)
}
@ -172,7 +167,6 @@ type replicationListener struct {
cert tls.Certificate
allowedIDs []protocol.DeviceID
db database
stop chan struct{}
}
func newReplicationListener(addr string, cert tls.Certificate, allowedIDs []protocol.DeviceID, db database) *replicationListener {
@ -181,11 +175,10 @@ func newReplicationListener(addr string, cert tls.Certificate, allowedIDs []prot
cert: cert,
allowedIDs: allowedIDs,
db: db,
stop: make(chan struct{}),
}
}
func (l *replicationListener) Serve() {
func (l *replicationListener) Serve(ctx context.Context) error {
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{l.cert},
ClientAuth: tls.RequestClientCert,
@ -196,14 +189,14 @@ func (l *replicationListener) Serve() {
lst, err := tls.Listen("tcp", l.addr, tlsCfg)
if err != nil {
log.Println("Replication listen:", err)
return
return err
}
defer lst.Close()
for {
select {
case <-l.stop:
return
case <-ctx.Done():
return nil
default:
}
@ -211,7 +204,7 @@ func (l *replicationListener) Serve() {
conn, err := lst.Accept()
if err != nil {
log.Println("Replication accept:", err)
return
return err
}
// Figure out the other side device ID
@ -231,19 +224,15 @@ func (l *replicationListener) Serve() {
continue
}
go l.handle(conn)
go l.handle(ctx, conn)
}
}
func (l *replicationListener) Stop() {
close(l.stop)
}
func (l *replicationListener) String() string {
return fmt.Sprintf("replicationListener(%q)", l.addr)
}
func (l *replicationListener) handle(conn net.Conn) {
func (l *replicationListener) handle(ctx context.Context, conn net.Conn) {
defer func() {
conn.SetWriteDeadline(time.Now().Add(time.Second))
conn.Close()
@ -253,7 +242,7 @@ func (l *replicationListener) handle(conn net.Conn) {
for {
select {
case <-l.stop:
case <-ctx.Done():
return
default:
}

View File

@ -3,6 +3,7 @@
package main
import (
"context"
"crypto/tls"
"flag"
"fmt"
@ -194,7 +195,9 @@ func main() {
mapping := mapping{natSvc.NewMapping(nat.TCP, addr.IP, addr.Port)}
if natEnabled {
go natSvc.Serve()
ctx, cancel := context.WithCancel(context.Background())
go natSvc.Serve(ctx)
defer cancel()
found := make(chan struct{})
mapping.OnChanged(func(_ *nat.Mapping, _, _ []nat.Address) {
select {

View File

@ -20,7 +20,8 @@ import (
)
func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.SetOutput(os.Stdout)
log.SetFlags(log.LstdFlags | log.Lshortfile)
@ -62,7 +63,7 @@ func main() {
}
log.Println("Created client")
go relay.Serve()
go relay.Serve(ctx)
recv := make(chan protocol.SessionInvitation)

View File

@ -8,6 +8,7 @@ package main
import (
"bytes"
"context"
"crypto/tls"
"flag"
"fmt"
@ -41,6 +42,7 @@ import (
"github.com/syncthing/syncthing/lib/syncthing"
"github.com/syncthing/syncthing/lib/tlsutil"
"github.com/syncthing/syncthing/lib/upgrade"
"github.com/syncthing/syncthing/lib/util"
"github.com/pkg/errors"
)
@ -321,7 +323,7 @@ func main() {
}
if err != nil {
l.Warnln("Command line options:", err)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
if options.logFile == "default" || options.logFile == "" {
@ -358,7 +360,7 @@ func main() {
)
if err != nil {
l.Warnln("Error reading device ID:", err)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
fmt.Println(protocol.NewDeviceID(cert.Certificate[0]))
@ -368,7 +370,7 @@ func main() {
if options.browserOnly {
if err := openGUI(protocol.EmptyDeviceID); err != nil {
l.Warnln("Failed to open web UI:", err)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
return
}
@ -376,7 +378,7 @@ func main() {
if options.generateDir != "" {
if err := generate(options.generateDir); err != nil {
l.Warnln("Failed to generate config and keys:", err)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
return
}
@ -384,14 +386,14 @@ func main() {
// Ensure that our home directory exists.
if err := ensureDir(locations.GetBaseDir(locations.ConfigBaseDir), 0700); err != nil {
l.Warnln("Failure on home directory:", err)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
if options.upgradeTo != "" {
err := upgrade.ToURL(options.upgradeTo)
if err != nil {
l.Warnln("Error while Upgrading:", err)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
l.Infoln("Upgraded from", options.upgradeTo)
return
@ -422,13 +424,13 @@ func main() {
os.Exit(exitCodeForUpgrade(err))
}
l.Infof("Upgraded to %q", release.Tag)
os.Exit(syncthing.ExitUpgrade.AsInt())
os.Exit(util.ExitUpgrade.AsInt())
}
if options.resetDatabase {
if err := resetDB(); err != nil {
l.Warnln("Resetting database:", err)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
l.Infoln("Successfully reset database - it will be rebuilt after next start.")
return
@ -601,13 +603,14 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
}
evLogger := events.NewLogger()
go evLogger.Serve()
defer evLogger.Stop()
ctx, cancel := context.WithCancel(context.Background())
go evLogger.Serve(ctx)
defer cancel()
cfg, err := syncthing.LoadConfigAtStartup(locations.Get(locations.ConfigFile), cert, evLogger, runtimeOptions.allowNewerConfig, noDefaultFolder)
if err != nil {
l.Warnln("Failed to initialize config:", err)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
// Candidate builds should auto upgrade. Make sure the option is set,
@ -653,7 +656,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
}
} else {
l.Infof("Upgraded to %q, exiting now.", release.Tag)
os.Exit(syncthing.ExitUpgrade.AsInt())
os.Exit(util.ExitUpgrade.AsInt())
}
}
@ -694,18 +697,18 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
f, err := os.Create(fmt.Sprintf("cpu-%d.pprof", os.Getpid()))
if err != nil {
l.Warnln("Creating profile:", err)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
if err := pprof.StartCPUProfile(f); err != nil {
l.Warnln("Starting profile:", err)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
}
go standbyMonitor(app, cfg)
if err := app.Start(); err != nil {
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
cleanConfigDirectory()
@ -718,6 +721,10 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
status := app.Wait()
if status == util.ExitError {
l.Warnln("Syncthing stopped with error:", app.Error())
}
if runtimeOptions.cpuProfile {
pprof.StopCPUProfile()
}
@ -733,7 +740,7 @@ func setupSignalHandling(app *syncthing.App) {
signal.Notify(restartSign, sigHup)
go func() {
<-restartSign
app.Stop(syncthing.ExitRestart)
app.Stop(util.ExitRestart)
}()
// Exit with "success" code (no restart) on INT/TERM
@ -742,7 +749,7 @@ func setupSignalHandling(app *syncthing.App) {
signal.Notify(stopSign, os.Interrupt, sigTerm)
go func() {
<-stopSign
app.Stop(syncthing.ExitSuccess)
app.Stop(util.ExitSuccess)
}()
}
@ -779,7 +786,7 @@ func auditWriter(auditFile string) io.Writer {
fd, err = os.OpenFile(auditFile, auditFlags, 0600)
if err != nil {
l.Warnln("Audit:", err)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
auditDest = auditFile
}
@ -829,7 +836,7 @@ func standbyMonitor(app *syncthing.App, cfg config.Wrapper) {
// things a moment to stabilize.
time.Sleep(restartDelay)
app.Stop(syncthing.ExitRestart)
app.Stop(util.ExitRestart)
return
}
now = time.Now()
@ -899,7 +906,7 @@ func autoUpgrade(cfg config.Wrapper, app *syncthing.App, evLogger events.Logger)
sub.Unsubscribe()
l.Warnf("Automatically upgraded to version %q. Restarting in 1 minute.", rel.Tag)
time.Sleep(time.Minute)
app.Stop(syncthing.ExitUpgrade)
app.Stop(util.ExitUpgrade)
return
}
}
@ -987,13 +994,13 @@ func setPauseState(cfg config.Wrapper, paused bool) {
}
if _, err := cfg.Replace(raw); err != nil {
l.Warnln("Cannot adjust paused state:", err)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
}
func exitCodeForUpgrade(err error) int {
if _, ok := err.(*errNoUpgrade); ok {
return syncthing.ExitNoUpgradeAvailable.AsInt()
return util.ExitNoUpgradeAvailable.AsInt()
}
return syncthing.ExitError.AsInt()
return util.ExitError.AsInt()
}

View File

@ -26,7 +26,7 @@ import (
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/syncthing"
"github.com/syncthing/syncthing/lib/util"
)
var (
@ -99,7 +99,7 @@ func monitorMain(runtimeOptions RuntimeOptions) {
if t := time.Since(restarts[0]); t < loopThreshold {
l.Warnf("%d restarts in %v; not retrying further", countRestarts, t)
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
copy(restarts[0:], restarts[1:])
@ -169,7 +169,7 @@ func monitorMain(runtimeOptions RuntimeOptions) {
if err == nil {
// Successful exit indicates an intentional shutdown
os.Exit(syncthing.ExitSuccess.AsInt())
os.Exit(util.ExitSuccess.AsInt())
}
if exiterr, ok := err.(*exec.ExitError); ok {
@ -177,7 +177,7 @@ func monitorMain(runtimeOptions RuntimeOptions) {
if stopped || runtimeOptions.noRestart {
os.Exit(exitCode)
}
if exitCode == syncthing.ExitUpgrade.AsInt() {
if exitCode == util.ExitUpgrade.AsInt() {
// Restart the monitor process to release the .old
// binary as part of the upgrade process.
l.Infoln("Restarting monitor...")
@ -189,7 +189,7 @@ func monitorMain(runtimeOptions RuntimeOptions) {
}
if runtimeOptions.noRestart {
os.Exit(syncthing.ExitError.AsInt())
os.Exit(util.ExitError.AsInt())
}
l.Infoln("Syncthing exited:", err)

2
go.mod
View File

@ -40,7 +40,7 @@ require (
github.com/shirou/gopsutil v3.20.10+incompatible
github.com/syncthing/notify v0.0.0-20201109091751-9a0e44181151
github.com/syndtr/goleveldb v1.0.1-0.20200815071216-d9e9293bd0f7
github.com/thejerf/suture v4.0.0+incompatible
github.com/thejerf/suture/v4 v4.0.0
github.com/urfave/cli v1.22.4
github.com/vitrun/qart v0.0.0-20160531060029-bf64b92db6b0
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897

27
go.sum
View File

@ -17,11 +17,9 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
@ -85,7 +83,6 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/d4l3k/messagediff v1.2.1 h1:ZcAIMYsUg0EAp9X+tt8/enBE/Q8Yd5kzPynLyKptt9U=
github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkEQxENCrlLo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dchest/siphash v1.2.2 h1:9DFz8tQwl9pTVt5iok/9zKyzA1Q6bRGiF3HPiEEVr9I=
github.com/dchest/siphash v1.2.2/go.mod h1:q+IRvb2gOSrUnYoPqHiyHXS0FOBBOdl6tONBlVnOnt4=
@ -114,7 +111,6 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
@ -131,7 +127,6 @@ github.com/go-ldap/ldap/v3 v3.2.4/go.mod h1:iYS1MdmrmceOJ1QOTnRXrIs7i3kloqtmGQjR
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@ -153,7 +148,6 @@ github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200j
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -174,7 +168,6 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
@ -247,11 +240,9 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
@ -264,7 +255,6 @@ github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0Q
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/marten-seemann/qpack v0.2.0/go.mod h1:F7Gl5L1jIgN1D11ucXefiuJS9UMVP2opoCp2jDKb7wc=
github.com/marten-seemann/qtls v0.10.0 h1:ECsuYUKalRL240rRD4Ri33ISb7kAQ3qGDlrrl55b2pc=
github.com/marten-seemann/qtls v0.10.0/go.mod h1:UvMd1oaYDACI99/oZUYLzMCkBXQVT0aGm99sJhbT8hs=
github.com/marten-seemann/qtls-go1-15 v0.1.0 h1:i/YPXVxz8q9umso/5y474CNcHmTpA+5DH+mFPjx6PZg=
github.com/marten-seemann/qtls-go1-15 v0.1.0/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I=
@ -309,7 +299,6 @@ github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxzi
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
@ -317,11 +306,9 @@ github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:v
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA=
github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
@ -351,7 +338,6 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
@ -435,7 +421,6 @@ github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJ
github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE=
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
@ -453,17 +438,14 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syncthing/notify v0.0.0-20201101120444-a28a0bd0f5ee h1:Q2dajND8VmNqXOi+N3IQQP77VkuXMA7tvPzXosDS1vA=
github.com/syncthing/notify v0.0.0-20201101120444-a28a0bd0f5ee/go.mod h1:Sn4ChoS7e4FxjCN1XHPVBT43AgnRLbuaB8pEc1Zcdjg=
github.com/syncthing/notify v0.0.0-20201109091751-9a0e44181151 h1:aKnLuEFWn/7u42UR82PxsPOMkoBAhq+06oRtUnK3Z1o=
github.com/syncthing/notify v0.0.0-20201109091751-9a0e44181151/go.mod h1:Sn4ChoS7e4FxjCN1XHPVBT43AgnRLbuaB8pEc1Zcdjg=
github.com/syndtr/goleveldb v1.0.1-0.20200815071216-d9e9293bd0f7 h1:udtnv1cokhJYqnUfCMCppJ71bFN9VKfG1BQ6UsYZnx8=
github.com/syndtr/goleveldb v1.0.1-0.20200815071216-d9e9293bd0f7/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM=
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/thejerf/suture v4.0.0+incompatible h1:luAwgEo87y1X30wEYa64N4SKMrsAm9qXRwNxnLVuuwg=
github.com/thejerf/suture v4.0.0+incompatible/go.mod h1:ibKwrVj+Uzf3XZdAiNWUouPaAbSoemxOHLmJmwheEMc=
github.com/thejerf/suture/v4 v4.0.0 h1:GX3X+1Qaewtj9flL2wgoTBfLA5NcmrCY39TJRpPbUrI=
github.com/thejerf/suture/v4 v4.0.0/go.mod h1:g0e8vwskm9tI0jRjxrnA6lSr0q6OfPdWJVX7G5bVWRs=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
@ -617,7 +599,6 @@ golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapK
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
@ -657,7 +638,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
@ -665,7 +645,6 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy
gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
@ -673,9 +652,7 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -33,7 +33,7 @@ import (
"github.com/julienschmidt/httprouter"
metrics "github.com/rcrowley/go-metrics"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
"github.com/vitrun/qart/qr"
"github.com/syncthing/syncthing/lib/build"
@ -82,7 +82,6 @@ type service struct {
connectionsService connections.Service
fss model.FolderSummaryService
urService *ur.Service
contr Controller
noUpgrade bool
tlsDefaultCommonName string
configChanged chan struct{} // signals intentional listener close due to config change
@ -90,25 +89,20 @@ type service struct {
startedOnce chan struct{} // the service has started successfully at least once
startupErr error
listenerAddr net.Addr
exitChan chan *util.FatalErr
guiErrors logger.Recorder
systemLog logger.Recorder
}
type Controller interface {
ExitUpgrading()
Restart()
Shutdown()
}
type Service interface {
suture.Service
config.Committer
WaitForStart() error
}
func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, evLogger events.Logger, discoverer discover.Manager, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, contr Controller, noUpgrade bool) Service {
s := &service{
func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, evLogger events.Logger, discoverer discover.Manager, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, noUpgrade bool) Service {
return &service{
id: id,
cfg: cfg,
statics: newStaticsServer(cfg.GUI().Theme, assetDir, cfg.Options().FeatureFlag(featureFlagUntrusted)),
@ -125,14 +119,12 @@ func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonNam
urService: urService,
guiErrors: errors,
systemLog: systemLog,
contr: contr,
noUpgrade: noUpgrade,
tlsDefaultCommonName: tlsDefaultCommonName,
configChanged: make(chan struct{}),
startedOnce: make(chan struct{}),
exitChan: make(chan *util.FatalErr, 1),
}
s.Service = util.AsService(s.serve, s.String())
return s
}
func (s *service) WaitForStart() error {
@ -211,7 +203,7 @@ func sendJSON(w http.ResponseWriter, jsonObject interface{}) {
fmt.Fprintf(w, "%s\n", bs)
}
func (s *service) serve(ctx context.Context) {
func (s *service) Serve(ctx context.Context) error {
listener, err := s.getListener(s.cfg.GUI())
if err != nil {
select {
@ -227,13 +219,13 @@ func (s *service) serve(ctx context.Context) {
s.startupErr = err
close(s.startedOnce)
}
return
return err
}
if listener == nil {
// Not much we can do here other than exit quickly. The supervisor
// will log an error at some point.
return
return nil
}
s.listenerAddr = listener.Addr()
@ -410,6 +402,7 @@ func (s *service) serve(ctx context.Context) {
// Wait for stop, restart or error signals
err = nil
select {
case <-ctx.Done():
// Shutting down permanently
@ -417,11 +410,14 @@ func (s *service) serve(ctx context.Context) {
case <-s.configChanged:
// Soft restart due to configuration change
l.Debugln("restarting (config changed)")
case <-serveError:
case err = <-s.exitChan:
case err = <-serveError:
// Restart due to listen/serve failure
l.Warnln("GUI/API:", err, "(restarting)")
}
srv.Close()
return err
}
// Complete implements suture.IsCompletable, which signifies to the supervisor
@ -470,6 +466,14 @@ func (s *service) CommitConfiguration(from, to config.Configuration) bool {
return true
}
func (s *service) fatal(err *util.FatalErr) {
// s.exitChan is 1-buffered and whoever is first gets handled.
select {
case s.exitChan <- err:
default:
}
}
func debugMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t0 := time.Now()
@ -874,7 +878,11 @@ func (s *service) getDebugFile(w http.ResponseWriter, r *http.Request) {
func (s *service) postSystemRestart(w http.ResponseWriter, r *http.Request) {
s.flushResponse(`{"ok": "restarting"}`, w)
go s.contr.Restart()
s.fatal(&util.FatalErr{
Err: errors.New("restart initiated by rest API"),
Status: util.ExitRestart,
})
}
func (s *service) postSystemReset(w http.ResponseWriter, r *http.Request) {
@ -900,12 +908,18 @@ func (s *service) postSystemReset(w http.ResponseWriter, r *http.Request) {
s.flushResponse(`{"ok": "resetting folder `+folder+`"}`, w)
}
go s.contr.Restart()
s.fatal(&util.FatalErr{
Err: errors.New("restart after db reset initiated by rest API"),
Status: util.ExitRestart,
})
}
func (s *service) postSystemShutdown(w http.ResponseWriter, r *http.Request) {
s.flushResponse(`{"ok": "shutting down"}`, w)
go s.contr.Shutdown()
s.fatal(&util.FatalErr{
Err: errors.New("shutdown initiated by rest API"),
Status: util.ExitSuccess,
})
}
func (s *service) flushResponse(resp string, w http.ResponseWriter) {
@ -1340,7 +1354,10 @@ func (s *service) postSystemUpgrade(w http.ResponseWriter, r *http.Request) {
}
s.flushResponse(`{"ok": "restarting"}`, w)
s.contr.ExitUpgrading()
s.fatal(&util.FatalErr{
Err: errors.New("exit after upgrade initiated by rest API"),
Status: util.ExitUpgrade,
})
}
}

View File

@ -9,6 +9,7 @@ package api
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
@ -34,7 +35,8 @@ import (
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/tlsutil"
"github.com/syncthing/syncthing/lib/ur"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/util"
"github.com/thejerf/suture/v4"
)
var (
@ -113,15 +115,14 @@ func TestStopAfterBrokenConfig(t *testing.T) {
}
w := config.Wrap("/dev/null", cfg, events.NoopLogger)
srv := New(protocol.LocalDeviceID, w, "", "syncthing", nil, nil, nil, events.NoopLogger, nil, nil, nil, nil, nil, nil, nil, false).(*service)
srv := New(protocol.LocalDeviceID, w, "", "syncthing", nil, nil, nil, events.NoopLogger, nil, nil, nil, nil, nil, nil, false).(*service)
defer os.Remove(token)
srv.started = make(chan string)
sup := suture.New("test", suture.Spec{
PassThroughPanics: true,
})
sup := suture.New("test", util.Spec())
sup.Add(srv)
sup.ServeBackground()
ctx, cancel := context.WithCancel(context.Background())
sup.ServeBackground(ctx)
<-srv.started
@ -139,9 +140,7 @@ func TestStopAfterBrokenConfig(t *testing.T) {
t.Fatal("Verify config should have failed")
}
// Nonetheless, it should be fine to Stop() it without panic.
sup.Stop()
cancel()
}
func TestAssetsDir(t *testing.T) {
@ -250,11 +249,11 @@ func TestAPIServiceRequests(t *testing.T) {
const testAPIKey = "foobarbaz"
cfg := new(mockedConfig)
cfg.gui.APIKey = testAPIKey
baseURL, sup, err := startHTTP(cfg)
baseURL, cancel, err := startHTTP(cfg)
if err != nil {
t.Fatal(err)
}
defer sup.Stop()
defer cancel()
cases := []httpTestCase{
// /rest/db
@ -519,11 +518,11 @@ func TestHTTPLogin(t *testing.T) {
cfg := new(mockedConfig)
cfg.gui.User = "üser"
cfg.gui.Password = "$2a$10$IdIZTxTg/dCNuNEGlmLynOjqg4B1FvDKuIV5e0BB3pnWVHNb8.GSq" // bcrypt of "räksmörgås" in UTF-8
baseURL, sup, err := startHTTP(cfg)
baseURL, cancel, err := startHTTP(cfg)
if err != nil {
t.Fatal(err)
}
defer sup.Stop()
defer cancel()
// Verify rejection when not using authorization
@ -581,7 +580,7 @@ func TestHTTPLogin(t *testing.T) {
}
}
func startHTTP(cfg config.Wrapper) (string, *suture.Supervisor, error) {
func startHTTP(cfg config.Wrapper) (string, context.CancelFunc, error) {
m := new(mockedModel)
assetDir := "../../gui"
eventSub := new(mockedEventSub)
@ -594,7 +593,7 @@ func startHTTP(cfg config.Wrapper) (string, *suture.Supervisor, error) {
// Instantiate the API service
urService := ur.New(cfg, m, connections, false)
svc := New(protocol.LocalDeviceID, cfg, assetDir, "syncthing", m, eventSub, diskEventSub, events.NoopLogger, discoverer, connections, urService, &mockedFolderSummaryService{}, errorLog, systemLog, nil, false).(*service)
svc := New(protocol.LocalDeviceID, cfg, assetDir, "syncthing", m, eventSub, diskEventSub, events.NoopLogger, discoverer, connections, urService, &mockedFolderSummaryService{}, errorLog, systemLog, false).(*service)
defer os.Remove(token)
svc.started = addrChan
@ -603,14 +602,15 @@ func startHTTP(cfg config.Wrapper) (string, *suture.Supervisor, error) {
PassThroughPanics: true,
})
supervisor.Add(svc)
supervisor.ServeBackground()
ctx, cancel := context.WithCancel(context.Background())
supervisor.ServeBackground(ctx)
// Make sure the API service is listening, and get the URL to use.
addr := <-addrChan
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
supervisor.Stop()
return "", nil, fmt.Errorf("weird address from API service: %w", err)
cancel()
return "", cancel, fmt.Errorf("weird address from API service: %w", err)
}
host, _, _ := net.SplitHostPort(cfg.GUI().RawAddress)
@ -619,7 +619,7 @@ func startHTTP(cfg config.Wrapper) (string, *suture.Supervisor, error) {
}
baseURL := fmt.Sprintf("http://%s", net.JoinHostPort(host, strconv.Itoa(tcpAddr.Port)))
return baseURL, supervisor, nil
return baseURL, cancel, nil
}
func TestCSRFRequired(t *testing.T) {
@ -628,11 +628,11 @@ func TestCSRFRequired(t *testing.T) {
const testAPIKey = "foobarbaz"
cfg := new(mockedConfig)
cfg.gui.APIKey = testAPIKey
baseURL, sup, err := startHTTP(cfg)
baseURL, cancel, err := startHTTP(cfg)
if err != nil {
t.Fatal("Unexpected error from getting base URL:", err)
}
defer sup.Stop()
defer cancel()
cli := &http.Client{
Timeout: time.Minute,
@ -704,11 +704,11 @@ func TestRandomString(t *testing.T) {
const testAPIKey = "foobarbaz"
cfg := new(mockedConfig)
cfg.gui.APIKey = testAPIKey
baseURL, sup, err := startHTTP(cfg)
baseURL, cancel, err := startHTTP(cfg)
if err != nil {
t.Fatal(err)
}
defer sup.Stop()
defer cancel()
cli := &http.Client{
Timeout: time.Second,
}
@ -797,11 +797,11 @@ func testConfigPost(data io.Reader) (*http.Response, error) {
const testAPIKey = "foobarbaz"
cfg := new(mockedConfig)
cfg.gui.APIKey = testAPIKey
baseURL, sup, err := startHTTP(cfg)
baseURL, cancel, err := startHTTP(cfg)
if err != nil {
return nil, err
}
defer sup.Stop()
defer cancel()
cli := &http.Client{
Timeout: time.Second,
}
@ -818,11 +818,11 @@ func TestHostCheck(t *testing.T) {
cfg := new(mockedConfig)
cfg.gui.RawAddress = "127.0.0.1:0"
baseURL, sup, err := startHTTP(cfg)
baseURL, cancel, err := startHTTP(cfg)
if err != nil {
t.Fatal(err)
}
defer sup.Stop()
defer cancel()
// A normal HTTP get to the localhost-bound service should succeed
@ -879,11 +879,11 @@ func TestHostCheck(t *testing.T) {
cfg = new(mockedConfig)
cfg.gui.RawAddress = "127.0.0.1:0"
cfg.gui.InsecureSkipHostCheck = true
baseURL, sup, err = startHTTP(cfg)
baseURL, cancel, err = startHTTP(cfg)
if err != nil {
t.Fatal(err)
}
defer sup.Stop()
defer cancel()
// A request with a suspicious Host header should be allowed
@ -903,11 +903,11 @@ func TestHostCheck(t *testing.T) {
cfg = new(mockedConfig)
cfg.gui.RawAddress = "0.0.0.0:0"
cfg.gui.InsecureSkipHostCheck = true
baseURL, sup, err = startHTTP(cfg)
baseURL, cancel, err = startHTTP(cfg)
if err != nil {
t.Fatal(err)
}
defer sup.Stop()
defer cancel()
// A request with a suspicious Host header should be allowed
@ -931,11 +931,11 @@ func TestHostCheck(t *testing.T) {
cfg = new(mockedConfig)
cfg.gui.RawAddress = "[::1]:0"
baseURL, sup, err = startHTTP(cfg)
baseURL, cancel, err = startHTTP(cfg)
if err != nil {
t.Fatal(err)
}
defer sup.Stop()
defer cancel()
// A normal HTTP get to the localhost-bound service should succeed
@ -1026,11 +1026,11 @@ func TestAccessControlAllowOriginHeader(t *testing.T) {
const testAPIKey = "foobarbaz"
cfg := new(mockedConfig)
cfg.gui.APIKey = testAPIKey
baseURL, sup, err := startHTTP(cfg)
baseURL, cancel, err := startHTTP(cfg)
if err != nil {
t.Fatal(err)
}
defer sup.Stop()
defer cancel()
cli := &http.Client{
Timeout: time.Second,
}
@ -1057,11 +1057,11 @@ func TestOptionsRequest(t *testing.T) {
const testAPIKey = "foobarbaz"
cfg := new(mockedConfig)
cfg.gui.APIKey = testAPIKey
baseURL, sup, err := startHTTP(cfg)
baseURL, cancel, err := startHTTP(cfg)
if err != nil {
t.Fatal(err)
}
defer sup.Stop()
defer cancel()
cli := &http.Client{
Timeout: time.Second,
}
@ -1093,7 +1093,7 @@ func TestEventMasks(t *testing.T) {
cfg := new(mockedConfig)
defSub := new(mockedEventSub)
diskSub := new(mockedEventSub)
svc := New(protocol.LocalDeviceID, cfg, "", "syncthing", nil, defSub, diskSub, events.NoopLogger, nil, nil, nil, nil, nil, nil, nil, false).(*service)
svc := New(protocol.LocalDeviceID, cfg, "", "syncthing", nil, defSub, diskSub, events.NoopLogger, nil, nil, nil, nil, nil, nil, false).(*service)
defer os.Remove(token)
if mask := svc.getEventMask(""); mask != DefaultEventMask {
@ -1253,11 +1253,11 @@ func TestConfigChanges(t *testing.T) {
defer os.Remove(tmpFile.Name())
w := config.Wrap(tmpFile.Name(), cfg, events.NoopLogger)
tmpFile.Close()
baseURL, sup, err := startHTTP(w)
baseURL, cancel, err := startHTTP(w)
if err != nil {
t.Fatal("Unexpected error from getting base URL:", err)
}
defer sup.Stop()
defer cancel()
cli := &http.Client{
Timeout: time.Second,

View File

@ -7,6 +7,8 @@
package api
import (
"context"
"github.com/syncthing/syncthing/lib/connections"
)
@ -24,9 +26,7 @@ func (m *mockedConnections) NATType() string {
return ""
}
func (m *mockedConnections) Serve() {}
func (m *mockedConnections) Stop() {}
func (m *mockedConnections) Serve(ctx context.Context) error { return nil }
func (m *mockedConnections) ExternalAddresses() []string { return nil }

View File

@ -17,13 +17,10 @@ type mockedCachingMux struct{}
// from suture.Service
func (m *mockedCachingMux) Serve() {
func (m *mockedCachingMux) Serve(ctx context.Context) error {
select {}
}
func (m *mockedCachingMux) Stop() {
}
// from events.Finder
func (m *mockedCachingMux) Lookup(ctx context.Context, deviceID protocol.DeviceID) (direct []string, err error) {

View File

@ -7,6 +7,7 @@
package api
import (
"context"
"net"
"time"
@ -124,8 +125,7 @@ func (m *mockedModel) WatchError(folder string) error {
return nil
}
func (m *mockedModel) Serve() {}
func (m *mockedModel) Stop() {}
func (m *mockedModel) Serve(ctx context.Context) error { return nil }
func (m *mockedModel) Index(deviceID protocol.DeviceID, folder string, files []protocol.FileInfo) error {
return nil
@ -167,9 +167,7 @@ func (m *mockedModel) DBSnapshot(_ string) (*db.Snapshot, error) {
type mockedFolderSummaryService struct{}
func (m *mockedFolderSummaryService) Serve() {}
func (m *mockedFolderSummaryService) Stop() {}
func (m *mockedFolderSummaryService) Serve(context.Context) error { return nil }
func (m *mockedFolderSummaryService) Summary(folder string) (map[string]interface{}, error) {
return map[string]interface{}{"mocked": true}, nil

View File

@ -12,7 +12,7 @@ import (
"net"
"time"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
"github.com/syncthing/syncthing/lib/util"
)
@ -44,24 +44,25 @@ type cast struct {
// caller needs to set reader and writer with the addReader and addWriter
// methods to get a functional implementation of Interface.
func newCast(name string) *cast {
return &cast{
Supervisor: suture.New(name, suture.Spec{
// Don't retry too frenetically: an error to open a socket or
// whatever is usually something that is either permanent or takes
// a while to get solved...
FailureThreshold: 2,
FailureBackoff: 60 * time.Second,
// Only log restarts in debug mode.
Log: func(line string) {
l.Debugln(line)
},
PassThroughPanics: true,
}),
name: name,
inbox: make(chan []byte),
outbox: make(chan recv, 16),
stopped: make(chan struct{}),
spec := util.Spec()
// Don't retry too frenetically: an error to open a socket or
// whatever is usually something that is either permanent or takes
// a while to get solved...
spec.FailureThreshold = 2
spec.FailureBackoff = 60 * time.Second
// Only log restarts in debug mode.
spec.EventHook = func(e suture.Event) {
l.Debugln(e)
}
c := &cast{
Supervisor: suture.New(name, spec),
name: name,
inbox: make(chan []byte),
outbox: make(chan recv, 16),
stopped: make(chan struct{}),
}
util.OnSupervisorDone(c.Supervisor, func() { close(c.stopped) })
return c
}
func (c *cast) addReader(svc func(context.Context) error) {
@ -75,17 +76,7 @@ func (c *cast) addWriter(svc func(ctx context.Context) error) {
}
func (c *cast) createService(svc func(context.Context) error, suffix string) util.ServiceWithError {
return util.AsServiceWithError(func(ctx context.Context) error {
l.Debugln("Starting", c.name, suffix)
err := svc(ctx)
l.Debugf("Stopped %v %v: %v", c.name, suffix, err)
return err
}, fmt.Sprintf("%s/%s", c, suffix))
}
func (c *cast) Stop() {
c.Supervisor.Stop()
close(c.stopped)
return util.AsService(svc, fmt.Sprintf("%s/%s", c, suffix))
}
func (c *cast) String() string {

View File

@ -41,7 +41,7 @@ func writeBroadcasts(ctx context.Context, inbox <-chan []byte, port int) error {
select {
case bs = <-inbox:
case <-doneCtx.Done():
return nil
return doneCtx.Err()
}
intfs, err := net.Interfaces()
@ -138,7 +138,7 @@ func readBroadcasts(ctx context.Context, outbox chan<- recv, port int) error {
select {
case outbox <- recv{c, addr}:
case <-doneCtx.Done():
return nil
return doneCtx.Err()
default:
l.Debugln("dropping message")
}

View File

@ -56,7 +56,7 @@ func writeMulticasts(ctx context.Context, inbox <-chan []byte, addr string) erro
select {
case bs = <-inbox:
case <-doneCtx.Done():
return nil
return doneCtx.Err()
}
intfs, err := net.Interfaces()
@ -87,7 +87,7 @@ func writeMulticasts(ctx context.Context, inbox <-chan []byte, addr string) erro
select {
case <-doneCtx.Done():
return nil
return doneCtx.Err()
default:
}
}
@ -144,7 +144,7 @@ func readMulticasts(ctx context.Context, outbox chan<- recv, addr string) error
for {
select {
case <-doneCtx.Done():
return nil
return doneCtx.Err()
default:
}
n, _, addr, err := pconn.ReadFrom(bs)

View File

@ -91,8 +91,7 @@ func (t *quicListener) serve(ctx context.Context) error {
svc, conn := stun.New(t.cfg, t, packetConn)
defer func() { _ = conn.Close() }()
go svc.Serve()
defer svc.Stop()
go svc.Serve(ctx)
registry.Register(t.uri.Scheme, conn)
defer registry.Unregister(t.uri.Scheme, conn)
@ -115,7 +114,7 @@ func (t *quicListener) serve(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
return ctx.Err()
default:
}
@ -206,7 +205,7 @@ func (f *quicListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.
conns: conns,
factory: f,
}
l.ServiceWithError = util.AsServiceWithError(l.serve, l.String())
l.ServiceWithError = util.AsService(l.serve, l.String())
l.nat.Store(stun.NATUnknown)
return l
}

View File

@ -53,8 +53,7 @@ func (t *relayListener) serve(ctx context.Context) error {
t.mut.Lock()
t.client = clnt
go clnt.Serve()
defer clnt.Stop()
go clnt.Serve(ctx)
t.mut.Unlock()
// Start with nil, so that we send a addresses changed notification as soon as we connect somewhere.
@ -120,7 +119,7 @@ func (t *relayListener) serve(ctx context.Context) error {
}
case <-ctx.Done():
return nil
return ctx.Err()
}
}
}
@ -185,7 +184,7 @@ func (f *relayListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls
conns: conns,
factory: f,
}
t.ServiceWithError = util.AsServiceWithError(t.serve, t.String())
t.ServiceWithError = util.AsService(t.serve, t.String())
return t
}

View File

@ -31,7 +31,7 @@ import (
_ "github.com/syncthing/syncthing/lib/upnp"
"github.com/pkg/errors"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
"golang.org/x/time/rate"
)
@ -132,13 +132,12 @@ type service struct {
}
func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *tls.Config, discoverer discover.Finder, bepProtocolName string, tlsDefaultCommonName string, evLogger events.Logger) Service {
spec := util.Spec()
spec.EventHook = func(e suture.Event) {
l.Infoln(e)
}
service := &service{
Supervisor: suture.New("connections.Service", suture.Spec{
Log: func(line string) {
l.Infoln(line)
},
PassThroughPanics: true,
}),
Supervisor: suture.New("connections.Service", spec),
connectionStatusHandler: newConnectionStatusHandler(),
cfg: cfg,
@ -162,8 +161,8 @@ func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *t
// due to config are done by removing and adding services, so are
// not subject to these limitations.
listenerSupervisor: suture.New("c.S.listenerSupervisor", suture.Spec{
Log: func(line string) {
l.Infoln(line)
EventHook: func(e suture.Event) {
l.Infoln(e)
},
FailureThreshold: 2,
FailureBackoff: 600 * time.Second,
@ -189,21 +188,20 @@ func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *t
service.Add(service.listenerSupervisor)
service.Add(service.natService)
util.OnSupervisorDone(service.Supervisor, func() {
service.cfg.Unsubscribe(service.limiter)
service.cfg.Unsubscribe(service)
})
return service
}
func (s *service) Stop() {
s.cfg.Unsubscribe(s.limiter)
s.cfg.Unsubscribe(s)
s.Supervisor.Stop()
}
func (s *service) handle(ctx context.Context) {
func (s *service) handle(ctx context.Context) error {
var c internalConn
for {
select {
case <-ctx.Done():
return
return ctx.Err()
case c = <-s.conns:
}
@ -338,9 +336,10 @@ func (s *service) handle(ctx context.Context) {
s.model.AddConnection(modelConn, hello)
continue
}
return nil
}
func (s *service) connect(ctx context.Context) {
func (s *service) connect(ctx context.Context) error {
nextDial := make(map[string]time.Time)
// Used as delay for the first few connection attempts, increases
@ -371,7 +370,7 @@ func (s *service) connect(ctx context.Context) {
for _, deviceCfg := range cfg.Devices {
select {
case <-ctx.Done():
return
return ctx.Err()
default:
}
@ -503,9 +502,10 @@ func (s *service) connect(ctx context.Context) {
select {
case <-time.After(sleep):
case <-ctx.Done():
return
return ctx.Err()
}
}
return nil
}
func (s *service) isLANHost(host string) bool {

View File

@ -18,6 +18,8 @@ import (
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/nat"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/thejerf/suture/v4"
)
// Connection is what we expose to the outside. It is a protocol.Connection
@ -181,8 +183,7 @@ type ListenerAddresses struct {
}
type genericListener interface {
Serve()
Stop()
suture.Service
URI() *url.URL
// A given address can potentially be mutated by the listener.
// For example we bind to tcp://0.0.0.0, but that for example might return

View File

@ -207,7 +207,7 @@ func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.C
natService: natService,
factory: f,
}
l.ServiceWithError = util.AsServiceWithError(l.serve, l.String())
l.ServiceWithError = util.AsService(l.serve, l.String())
return l
}

View File

@ -25,7 +25,7 @@ import (
"github.com/syncthing/syncthing/lib/sha256"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
)
const (
@ -68,14 +68,13 @@ type Lowlevel struct {
}
func NewLowlevel(backend backend.Backend, opts ...Option) *Lowlevel {
spec := util.Spec()
// Only log restarts in debug mode.
spec.EventHook = func(e suture.Event) {
l.Debugln(e)
}
db := &Lowlevel{
Supervisor: suture.New("db.Lowlevel", suture.Spec{
// Only log restarts in debug mode.
Log: func(line string) {
l.Debugln(line)
},
PassThroughPanics: true,
}),
Supervisor: suture.New("db.Lowlevel", spec),
Backend: backend,
folderIdx: newSmallIndex(backend, []byte{KeyTypeFolderIdx}),
deviceIdx: newSmallIndex(backend, []byte{KeyTypeDeviceIdx}),
@ -586,7 +585,7 @@ func (db *Lowlevel) dropPrefix(prefix []byte) error {
return t.Commit()
}
func (db *Lowlevel) gcRunner(ctx context.Context) {
func (db *Lowlevel) gcRunner(ctx context.Context) error {
// Calculate the time for the next GC run. Even if we should run GC
// directly, give the system a while to get up and running and do other
// stuff first. (We might have migrations and stuff which would be
@ -602,7 +601,7 @@ func (db *Lowlevel) gcRunner(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
return ctx.Err()
case <-t.C:
if err := db.gcIndirect(ctx); err != nil {
l.Warnln("Database indirection GC failed:", err)

View File

@ -10,7 +10,7 @@ import (
stdsync "sync"
"time"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
"github.com/syncthing/syncthing/lib/protocol"
)

View File

@ -18,6 +18,14 @@ import (
"github.com/syncthing/syncthing/lib/protocol"
)
func setupCache() *manager {
cfg := config.New(protocol.LocalDeviceID)
cfg.Options.LocalAnnEnabled = false
cfg.Options.GlobalAnnEnabled = false
return NewManager(protocol.LocalDeviceID, config.Wrap("", cfg, events.NoopLogger), tls.Certificate{}, events.NoopLogger, nil).(*manager)
}
func TestCacheUnique(t *testing.T) {
addresses0 := []string{"tcp://192.0.2.44:22000", "tcp://192.0.2.42:22000"}
addresses1 := []string{"tcp://192.0.2.43:22000", "tcp://192.0.2.42:22000"}
@ -33,13 +41,7 @@ func TestCacheUnique(t *testing.T) {
"tcp://192.0.2.44:22000",
}
cfg := config.New(protocol.LocalDeviceID)
cfg.Options.LocalAnnEnabled = false
cfg.Options.GlobalAnnEnabled = false
c := NewManager(protocol.LocalDeviceID, config.Wrap("", cfg, events.NoopLogger), tls.Certificate{}, events.NoopLogger, nil).(*manager)
c.ServeBackground()
defer c.Stop()
c := setupCache()
// Add a fake discovery service and verify we get its answers through the
// cache.
@ -93,13 +95,7 @@ func (f *fakeDiscovery) Cache() map[protocol.DeviceID]CacheEntry {
}
func TestCacheSlowLookup(t *testing.T) {
cfg := config.New(protocol.LocalDeviceID)
cfg.Options.LocalAnnEnabled = false
cfg.Options.GlobalAnnEnabled = false
c := NewManager(protocol.LocalDeviceID, config.Wrap("", cfg, events.NoopLogger), tls.Certificate{}, events.NoopLogger, nil).(*manager)
c.ServeBackground()
defer c.Stop()
c := setupCache()
// Add a slow discovery service.

View File

@ -11,7 +11,7 @@ import (
"time"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
)
// A Finder provides lookup services of some kind.

View File

@ -21,16 +21,12 @@ import (
stdsync "sync"
"time"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/util"
)
type globalClient struct {
suture.Service
server string
addrList AddressLister
announceClient httpClient
@ -133,7 +129,6 @@ func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, evLo
noLookup: opts.noLookup,
evLogger: evLogger,
}
cl.Service = util.AsService(cl.serve, cl.String())
if !opts.noAnnounce {
// If we are supposed to annonce, it's an error until we've done so.
cl.setError(errors.New("not announced"))
@ -193,12 +188,12 @@ func (c *globalClient) String() string {
return "global@" + c.server
}
func (c *globalClient) serve(ctx context.Context) {
func (c *globalClient) Serve(ctx context.Context) error {
if c.noAnnounce {
// We're configured to not do announcements, only lookups. To maintain
// the same interface, we just pause here if Serve() is run.
<-ctx.Done()
return
return ctx.Err()
}
timer := time.NewTimer(5 * time.Second)
@ -231,7 +226,7 @@ func (c *globalClient) serve(ctx context.Context) {
c.sendAnnouncement(ctx, timer)
case <-ctx.Done():
return
return ctx.Err()
}
}
}

View File

@ -200,8 +200,9 @@ func TestGlobalAnnounce(t *testing.T) {
t.Fatal(err)
}
go disco.Serve()
defer disco.Stop()
ctx, cancel := context.WithCancel(context.Background())
go disco.Serve(ctx)
defer cancel()
// The discovery thing should attempt an announcement immediately. We wait
// for it to succeed, a while.
@ -223,8 +224,9 @@ func testLookup(url string) ([]string, error) {
if err != nil {
return nil, err
}
go disco.Serve()
defer disco.Stop()
ctx, cancel := context.WithCancel(context.Background())
go disco.Serve(ctx)
defer cancel()
return disco.Lookup(context.Background(), protocol.LocalDeviceID)
}

View File

@ -25,7 +25,7 @@ import (
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/rand"
"github.com/syncthing/syncthing/lib/util"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
)
type localClient struct {
@ -52,9 +52,7 @@ const (
func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, evLogger events.Logger) (FinderService, error) {
c := &localClient{
Supervisor: suture.New("local", suture.Spec{
PassThroughPanics: true,
}),
Supervisor: suture.New("local", util.Spec()),
myID: id,
addrList: addrList,
evLogger: evLogger,
@ -137,7 +135,7 @@ func (c *localClient) announcementPkt(instanceID int64, msg []byte) ([]byte, boo
return msg, true
}
func (c *localClient) sendLocalAnnouncements(ctx context.Context) {
func (c *localClient) sendLocalAnnouncements(ctx context.Context) error {
var msg []byte
var ok bool
instanceID := rand.Int63()
@ -150,18 +148,18 @@ func (c *localClient) sendLocalAnnouncements(ctx context.Context) {
case <-c.localBcastTick:
case <-c.forcedBcastTick:
case <-ctx.Done():
return
return ctx.Err()
}
}
}
func (c *localClient) recvAnnouncements(ctx context.Context) {
func (c *localClient) recvAnnouncements(ctx context.Context) error {
b := c.beacon
warnedAbout := make(map[string]bool)
for {
select {
case <-ctx.Done():
return
return ctx.Err()
default:
}

View File

@ -8,6 +8,7 @@ package discover
import (
"bytes"
"context"
"net"
"testing"
@ -20,8 +21,9 @@ func TestLocalInstanceID(t *testing.T) {
if err != nil {
t.Fatal(err)
}
go c.Serve()
defer c.Stop()
ctx, cancel := context.WithCancel(context.Background())
go c.Serve(ctx)
defer cancel()
lc := c.(*localClient)

View File

@ -13,7 +13,7 @@ import (
"sort"
"time"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
@ -46,10 +46,8 @@ type manager struct {
}
func NewManager(myID protocol.DeviceID, cfg config.Wrapper, cert tls.Certificate, evLogger events.Logger, lister AddressLister) Manager {
return &manager{
Supervisor: suture.New("discover.Manager", suture.Spec{
PassThroughPanics: true,
}),
m := &manager{
Supervisor: suture.New("discover.Manager", util.Spec()),
myID: myID,
cfg: cfg,
cert: cert,
@ -59,13 +57,16 @@ func NewManager(myID protocol.DeviceID, cfg config.Wrapper, cert tls.Certificate
finders: make(map[string]cachedFinder),
mut: sync.NewRWMutex(),
}
m.Add(util.AsService(m.serve, m.String()))
return m
}
func (m *manager) Serve() {
func (m *manager) serve(ctx context.Context) error {
m.cfg.Subscribe(m)
defer m.cfg.Unsubscribe(m)
m.CommitConfiguration(config.Configuration{}, m.cfg.RawCopy())
m.Supervisor.Serve()
<-ctx.Done()
m.cfg.Unsubscribe(m)
return nil
}
func (m *manager) addLocked(identity string, finder Finder, cacheTime, negCacheTime time.Duration) {

View File

@ -15,10 +15,9 @@ import (
"runtime"
"time"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
)
type EventType int64
@ -219,7 +218,6 @@ type Logger interface {
}
type logger struct {
suture.Service
subs []*subscription
nextSubscriptionIDs []int
nextGlobalID int
@ -267,7 +265,6 @@ func NewLogger() Logger {
funcs: make(chan func(context.Context)),
toUnsubscribe: make(chan *subscription),
}
l.Service = util.AsService(l.serve, l.String())
// Make sure the timer is in the stopped state and hasn't fired anything
// into the channel.
if !l.timeout.Stop() {
@ -276,7 +273,7 @@ func NewLogger() Logger {
return l
}
func (l *logger) serve(ctx context.Context) {
func (l *logger) Serve(ctx context.Context) error {
loop:
for {
select {
@ -302,6 +299,8 @@ loop:
for _, s := range l.subs {
close(s.events)
}
return nil
}
func (l *logger) Log(t EventType, data interface{}) {
@ -535,7 +534,7 @@ type noopLogger struct{}
var NoopLogger Logger = &noopLogger{}
func (*noopLogger) Serve() {}
func (*noopLogger) Serve(ctx context.Context) error { return nil }
func (*noopLogger) Stop() {}

View File

@ -7,6 +7,7 @@
package events
import (
"context"
"encoding/json"
"fmt"
"sync"
@ -27,10 +28,16 @@ func TestNewLogger(t *testing.T) {
}
}
func TestSubscriber(t *testing.T) {
func setupLogger() (Logger, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
l := NewLogger()
defer l.Stop()
go l.Serve()
go l.Serve(ctx)
return l, cancel
}
func TestSubscriber(t *testing.T) {
l, cancel := setupLogger()
defer cancel()
s := l.Subscribe(0)
defer s.Unsubscribe()
@ -40,9 +47,8 @@ func TestSubscriber(t *testing.T) {
}
func TestTimeout(t *testing.T) {
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
s := l.Subscribe(0)
defer s.Unsubscribe()
@ -53,9 +59,8 @@ func TestTimeout(t *testing.T) {
}
func TestEventBeforeSubscribe(t *testing.T) {
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
l.Log(DeviceConnected, "foo")
s := l.Subscribe(0)
@ -68,9 +73,8 @@ func TestEventBeforeSubscribe(t *testing.T) {
}
func TestEventAfterSubscribe(t *testing.T) {
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
s := l.Subscribe(AllEvents)
defer s.Unsubscribe()
@ -95,9 +99,8 @@ func TestEventAfterSubscribe(t *testing.T) {
}
func TestEventAfterSubscribeIgnoreMask(t *testing.T) {
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
s := l.Subscribe(DeviceDisconnected)
defer s.Unsubscribe()
@ -110,9 +113,8 @@ func TestEventAfterSubscribeIgnoreMask(t *testing.T) {
}
func TestBufferOverflow(t *testing.T) {
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
s := l.Subscribe(AllEvents)
defer s.Unsubscribe()
@ -135,9 +137,8 @@ func TestBufferOverflow(t *testing.T) {
}
func TestUnsubscribe(t *testing.T) {
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
s := l.Subscribe(AllEvents)
l.Log(DeviceConnected, "foo")
@ -157,9 +158,8 @@ func TestUnsubscribe(t *testing.T) {
}
func TestGlobalIDs(t *testing.T) {
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
s := l.Subscribe(AllEvents)
defer s.Unsubscribe()
@ -189,9 +189,8 @@ func TestGlobalIDs(t *testing.T) {
}
func TestSubscriptionIDs(t *testing.T) {
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
s := l.Subscribe(DeviceConnected)
defer s.Unsubscribe()
@ -231,9 +230,8 @@ func TestSubscriptionIDs(t *testing.T) {
}
func TestBufferedSub(t *testing.T) {
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
s := l.Subscribe(AllEvents)
defer s.Unsubscribe()
@ -262,9 +260,8 @@ func TestBufferedSub(t *testing.T) {
}
func BenchmarkBufferedSub(b *testing.B) {
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
s := l.Subscribe(AllEvents)
defer s.Unsubscribe()
@ -318,9 +315,8 @@ func BenchmarkBufferedSub(b *testing.B) {
}
func TestSinceUsesSubscriptionId(t *testing.T) {
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
s := l.Subscribe(DeviceConnected)
defer s.Unsubscribe()
@ -375,9 +371,8 @@ func TestUnsubscribeContention(t *testing.T) {
senders = 1000
)
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
// Start listeners. These will poll until the stop channel is closed,
// then exit and unsubscribe.
@ -444,9 +439,8 @@ func TestUnsubscribeContention(t *testing.T) {
}
func BenchmarkLogEvent(b *testing.B) {
l := NewLogger()
defer l.Stop()
go l.Serve()
l, cancel := setupLogger()
defer cancel()
s := l.Subscribe(AllEvents)
defer s.Unsubscribe()

View File

@ -31,7 +31,7 @@ type fakeConnection struct {
files []protocol.FileInfo
fileData map[string][]byte
folder string
model *model
model *testModel
indexFn func(context.Context, string, []protocol.FileInfo)
requestFn func(ctx context.Context, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error)
closeFn func(error)
@ -201,7 +201,7 @@ func (f *fakeConnection) sendIndexUpdate() {
f.model.IndexUpdate(f.id, f.folder, f.files)
}
func addFakeConn(m *model, dev protocol.DeviceID) *fakeConnection {
func addFakeConn(m *testModel, dev protocol.DeviceID) *fakeConnection {
fc := &fakeConnection{id: dev, model: m}
m.AddConnection(fc, protocol.Hello{})

View File

@ -31,12 +31,9 @@ import (
"github.com/syncthing/syncthing/lib/util"
"github.com/syncthing/syncthing/lib/versioner"
"github.com/syncthing/syncthing/lib/watchaggregator"
"github.com/thejerf/suture"
)
type folder struct {
suture.Service
stateTracker
config.FolderConfiguration
*stats.FolderStatisticsReference
@ -135,7 +132,7 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf
return f
}
func (f *folder) serve(ctx context.Context) {
func (f *folder) Serve(ctx context.Context) error {
atomic.AddInt32(&f.model.foldersRunning, 1)
defer atomic.AddInt32(&f.model.foldersRunning, -1)
@ -168,7 +165,7 @@ func (f *folder) serve(ctx context.Context) {
select {
case <-f.ctx.Done():
close(f.done)
return
return nil
case <-f.pullScheduled:
f.pull()

View File

@ -441,7 +441,7 @@ func setupKnownFiles(t *testing.T, ffs fs.Filesystem, data []byte) []protocol.Fi
return knownFiles
}
func setupROFolder(t *testing.T) (*model, *receiveOnlyFolder) {
func setupROFolder(t *testing.T) (*testModel, *receiveOnlyFolder) {
t.Helper()
w := createTmpWrapper(defaultCfg)
@ -455,6 +455,7 @@ func setupROFolder(t *testing.T) (*model, *receiveOnlyFolder) {
m := newModel(w, myID, "syncthing", "dev", db.NewLowlevel(backend.OpenMemory()), nil)
m.ServeBackground()
<-m.started
must(t, m.ScanFolder("ro"))
m.fmut.RLock()

View File

@ -13,7 +13,6 @@ import (
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/util"
"github.com/syncthing/syncthing/lib/versioner"
)
@ -30,7 +29,6 @@ func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher,
folder: newFolder(model, fset, ignores, cfg, evLogger, ioLimiter, nil),
}
f.folder.puller = f
f.folder.Service = util.AsService(f.serve, f.String())
return f
}

View File

@ -28,7 +28,6 @@ import (
"github.com/syncthing/syncthing/lib/scanner"
"github.com/syncthing/syncthing/lib/sha256"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
"github.com/syncthing/syncthing/lib/versioner"
"github.com/syncthing/syncthing/lib/weakhash"
)
@ -140,7 +139,6 @@ func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matche
writeLimiter: newByteSemaphore(cfg.MaxConcurrentWrites),
}
f.folder.puller = f
f.folder.Service = util.AsService(f.serve, f.String())
if f.Copiers == 0 {
f.Copiers = defaultCopiers

View File

@ -91,10 +91,12 @@ func createFile(t *testing.T, name string, fs fs.Filesystem) protocol.FileInfo {
}
// Sets up a folder and model, but makes sure the services aren't actually running.
func setupSendReceiveFolder(files ...protocol.FileInfo) (*model, *sendReceiveFolder) {
func setupSendReceiveFolder(files ...protocol.FileInfo) (*testModel, *sendReceiveFolder) {
w, fcfg := tmpDefaultWrapper()
// Initialise model and stop immediately.
model := setupModel(w)
model.Supervisor.Stop()
model.cancel()
<-model.stopped
f := model.folderRunners[fcfg.ID].(*sendReceiveFolder)
f.tempPullErrors = make(map[string]string)
f.ctx = context.Background()
@ -107,8 +109,7 @@ func setupSendReceiveFolder(files ...protocol.FileInfo) (*model, *sendReceiveFol
return model, f
}
func cleanupSRFolder(f *sendReceiveFolder, m *model) {
m.evLogger.Stop()
func cleanupSRFolder(f *sendReceiveFolder, m *testModel) {
os.Remove(m.cfg.ConfigPath())
os.RemoveAll(f.Filesystem().URI())
}

View File

@ -12,7 +12,7 @@ import (
"strings"
"time"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db"
@ -52,9 +52,7 @@ type folderSummaryService struct {
func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID, evLogger events.Logger) FolderSummaryService {
service := &folderSummaryService{
Supervisor: suture.New("folderSummaryService", suture.Spec{
PassThroughPanics: true,
}),
Supervisor: suture.New("folderSummaryService", util.Spec()),
cfg: cfg,
model: m,
id: id,
@ -169,7 +167,7 @@ func (c *folderSummaryService) OnEventRequest() {
// listenForUpdates subscribes to the event bus and makes note of folders that
// need their data recalculated.
func (c *folderSummaryService) listenForUpdates(ctx context.Context) {
func (c *folderSummaryService) listenForUpdates(ctx context.Context) error {
sub := c.evLogger.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged | events.DownloadProgress)
defer sub.Unsubscribe()
@ -180,7 +178,7 @@ func (c *folderSummaryService) listenForUpdates(ctx context.Context) {
case ev := <-sub.C():
c.processUpdate(ev)
case <-ctx.Done():
return
return ctx.Err()
}
}
}
@ -261,7 +259,7 @@ func (c *folderSummaryService) processUpdate(ev events.Event) {
// calculateSummaries periodically recalculates folder summaries and
// completion percentage, and sends the results on the event bus.
func (c *folderSummaryService) calculateSummaries(ctx context.Context) {
func (c *folderSummaryService) calculateSummaries(ctx context.Context) error {
const pumpInterval = 2 * time.Second
pump := time.NewTimer(pumpInterval)
@ -272,7 +270,7 @@ func (c *folderSummaryService) calculateSummaries(ctx context.Context) {
for _, folder := range c.foldersToHandle() {
select {
case <-ctx.Done():
return
return ctx.Err()
default:
}
c.sendSummary(ctx, folder)
@ -288,7 +286,7 @@ func (c *folderSummaryService) calculateSummaries(ctx context.Context) {
c.sendSummary(ctx, folder)
case <-ctx.Done():
return
return ctx.Err()
}
}
}

View File

@ -12,17 +12,15 @@ import (
"sync"
"time"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/util"
)
type indexSender struct {
suture.Service
conn protocol.Connection
folder string
folderIsReceiveEncrypted bool
@ -36,7 +34,7 @@ type indexSender struct {
resumeChan chan *db.FileSet
}
func (s *indexSender) serve(ctx context.Context) {
func (s *indexSender) Serve(ctx context.Context) error {
var err error
l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence)
@ -59,9 +57,9 @@ func (s *indexSender) serve(ctx context.Context) {
for err == nil {
select {
case <-ctx.Done():
return
return ctx.Err()
case <-s.connClosed:
return
return nil
default:
}
@ -72,9 +70,9 @@ func (s *indexSender) serve(ctx context.Context) {
if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
select {
case <-ctx.Done():
return
return ctx.Err()
case <-s.connClosed:
return
return nil
case <-evChan:
case <-ticker.C:
case <-s.pauseChan:
@ -95,6 +93,8 @@ func (s *indexSender) serve(ctx context.Context) {
// time to batch them up a little.
time.Sleep(250 * time.Millisecond)
}
return nil
}
// Complete implements the suture.IsCompletable interface. When Serve terminates
@ -333,7 +333,6 @@ func (r *indexSenderRegistry) startLocked(folderID string, fset *db.FileSet, sta
pauseChan: make(chan struct{}),
resumeChan: make(chan *db.FileSet),
}
is.Service = util.AsService(is.serve, is.String())
is.token = r.sup.Add(is)
r.indexSenders[folderID] = is
}

View File

@ -22,7 +22,7 @@ import (
"unicode"
"github.com/pkg/errors"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections"
@ -36,6 +36,7 @@ import (
"github.com/syncthing/syncthing/lib/stats"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/ur/contract"
"github.com/syncthing/syncthing/lib/util"
"github.com/syncthing/syncthing/lib/versioner"
)
@ -46,6 +47,7 @@ const (
)
type service interface {
suture.Service
BringToFront(string)
Override()
Revert()
@ -53,8 +55,6 @@ type service interface {
SchedulePull() // something relevant changed, we should try a pull
Jobs(page, perpage int) ([]string, []string, int) // In progress, Queued, skipped
Scan(subs []string) error
Serve()
Stop()
Errors() []FileError
WatchError() error
ScheduleForceRescan(path string)
@ -154,7 +154,9 @@ type model struct {
remotePausedFolders map[protocol.DeviceID]map[string]struct{} // deviceID -> folders
indexSenders map[protocol.DeviceID]*indexSenderRegistry
foldersRunning int32 // for testing only
// for testing only
foldersRunning int32
started chan struct{}
}
type folderFactory func(*model, *db.FileSet, *ignore.Matcher, config.FolderConfiguration, versioner.Versioner, fs.Filesystem, events.Logger, *byteSemaphore) service
@ -192,13 +194,12 @@ var (
// where it sends index information to connected peers and responds to requests
// for file data without altering the local folder in any way.
func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersion string, ldb *db.Lowlevel, protectedFiles []string, evLogger events.Logger) Model {
spec := util.Spec()
spec.EventHook = func(e suture.Event) {
l.Debugln(e)
}
m := &model{
Supervisor: suture.New("model", suture.Spec{
Log: func(line string) {
l.Debugln(line)
},
PassThroughPanics: true,
}),
Supervisor: suture.New("model", spec),
// constructor parameters
cfg: cfg,
@ -237,26 +238,20 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
remotePausedFolders: make(map[protocol.DeviceID]map[string]struct{}),
indexSenders: make(map[protocol.DeviceID]*indexSenderRegistry),
// for testing only
started: make(chan struct{}),
}
for devID := range cfg.Devices() {
m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID.String())
}
m.Add(m.progressEmitter)
m.Add(util.AsService(m.serve, m.String()))
return m
}
func (m *model) Serve() {
m.onServe()
m.Supervisor.Serve()
}
func (m *model) ServeBackground() {
m.onServe()
m.Supervisor.ServeBackground()
}
func (m *model) onServe() {
func (m *model) serve(ctx context.Context) error {
// Add and start folders
cacheIgnoredFiles := m.cfg.Options().CacheIgnoredFiles
for _, folderCfg := range m.cfg.Folders() {
@ -267,11 +262,11 @@ func (m *model) onServe() {
m.newFolder(folderCfg, cacheIgnoredFiles)
}
m.cfg.Subscribe(m)
}
func (m *model) Stop() {
close(m.started)
<-ctx.Done()
m.cfg.Unsubscribe(m)
m.Supervisor.Stop()
m.pmut.RLock()
closed := make([]chan struct{}, 0, len(m.conn))
for id, conn := range m.conn {
@ -282,6 +277,7 @@ func (m *model) Stop() {
for _, c := range closed {
<-c
}
return nil
}
// StartDeadlockDetector starts a deadlock detector on the models locks which

View File

@ -120,7 +120,7 @@ func createTmpWrapper(cfg config.Configuration) config.Wrapper {
return wrapper
}
func newState(cfg config.Configuration) *model {
func newState(cfg config.Configuration) *testModel {
wcfg := createTmpWrapper(cfg)
m := setupModel(wcfg)
@ -1396,7 +1396,7 @@ func TestAutoAcceptEnc(t *testing.T) {
}
}
func changeIgnores(t *testing.T, m *model, expected []string) {
func changeIgnores(t *testing.T, m *testModel, expected []string) {
arrEqual := func(a, b []string) bool {
if len(a) != len(b) {
return false
@ -4205,7 +4205,7 @@ func TestNeedMetaAfterIndexReset(t *testing.T) {
func TestCcCheckEncryption(t *testing.T) {
w, fcfg := tmpDefaultWrapper()
m := setupModel(w)
m.Stop()
m.cancel()
defer cleanupModel(m)
pw := "foo"

View File

@ -11,18 +11,13 @@ import (
"fmt"
"time"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
)
type ProgressEmitter struct {
suture.Service
cfg config.Wrapper
registry map[string]map[string]*sharedPullerState // folder: name: puller
interval time.Duration
@ -60,7 +55,6 @@ func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmi
evLogger: evLogger,
mut: sync.NewMutex(),
}
t.Service = util.AsService(t.serve, t.String())
t.CommitConfiguration(config.Configuration{}, cfg.RawCopy())
@ -69,7 +63,7 @@ func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmi
// serve starts the progress emitter which starts emitting DownloadProgress
// events as the progress happens.
func (t *ProgressEmitter) serve(ctx context.Context) {
func (t *ProgressEmitter) Serve(ctx context.Context) error {
t.cfg.Subscribe(t)
defer t.cfg.Unsubscribe(t)
@ -79,7 +73,7 @@ func (t *ProgressEmitter) serve(ctx context.Context) {
select {
case <-ctx.Done():
l.Debugln("progress emitter: stopping")
return
return nil
case <-t.timer.C:
t.mut.Lock()
l.Debugln("progress emitter: timer - looking after", len(t.registry))

View File

@ -53,9 +53,10 @@ func expectTimeout(w events.Subscription, t *testing.T) {
}
func TestProgressEmitter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
evLogger := events.NewLogger()
go evLogger.Serve()
defer evLogger.Stop()
go evLogger.Serve(ctx)
defer cancel()
w := evLogger.Subscribe(events.DownloadProgress)
@ -66,8 +67,7 @@ func TestProgressEmitter(t *testing.T) {
})
p := NewProgressEmitter(c, evLogger)
go p.Serve()
defer p.Stop()
go p.Serve(ctx)
p.interval = 0
expectTimeout(w, t)
@ -118,9 +118,10 @@ func TestSendDownloadProgressMessages(t *testing.T) {
fc := &fakeConnection{}
ctx, cancel := context.WithCancel(context.Background())
evLogger := events.NewLogger()
go evLogger.Serve()
defer evLogger.Stop()
go evLogger.Serve(ctx)
defer cancel()
p := NewProgressEmitter(c, evLogger)
p.temporaryIndexSubscribe(fc, []string{"folder", "folder2"})

View File

@ -7,6 +7,7 @@
package model
import (
"context"
"io/ioutil"
"os"
"testing"
@ -95,13 +96,13 @@ func testFolderConfigFake() config.FolderConfiguration {
return cfg
}
func setupModelWithConnection() (*model, *fakeConnection, config.FolderConfiguration) {
func setupModelWithConnection() (*testModel, *fakeConnection, config.FolderConfiguration) {
w, fcfg := tmpDefaultWrapper()
m, fc := setupModelWithConnectionFromWrapper(w)
return m, fc, fcfg
}
func setupModelWithConnectionFromWrapper(w config.Wrapper) (*model, *fakeConnection) {
func setupModelWithConnectionFromWrapper(w config.Wrapper) (*testModel, *fakeConnection) {
m := setupModel(w)
fc := addFakeConn(m, device1)
@ -112,31 +113,57 @@ func setupModelWithConnectionFromWrapper(w config.Wrapper) (*model, *fakeConnect
return m, fc
}
func setupModel(w config.Wrapper) *model {
func setupModel(w config.Wrapper) *testModel {
db := db.NewLowlevel(backend.OpenMemory())
m := newModel(w, myID, "syncthing", "dev", db, nil)
m.ServeBackground()
<-m.started
m.ScanFolders()
return m
}
func newModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersion string, ldb *db.Lowlevel, protectedFiles []string) *model {
evLogger := events.NewLogger()
m := NewModel(cfg, id, clientName, clientVersion, ldb, protectedFiles, evLogger).(*model)
go evLogger.Serve()
return m
type testModel struct {
*model
cancel context.CancelFunc
evCancel context.CancelFunc
stopped chan struct{}
}
func cleanupModel(m *model) {
m.Stop()
func newModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersion string, ldb *db.Lowlevel, protectedFiles []string) *testModel {
evLogger := events.NewLogger()
m := NewModel(cfg, id, clientName, clientVersion, ldb, protectedFiles, evLogger).(*model)
ctx, cancel := context.WithCancel(context.Background())
go evLogger.Serve(ctx)
return &testModel{
model: m,
evCancel: cancel,
stopped: make(chan struct{}),
}
}
func (m *testModel) ServeBackground() {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel
go func() {
m.model.Serve(ctx)
close(m.stopped)
}()
<-m.started
}
func cleanupModel(m *testModel) {
if m.cancel != nil {
m.cancel()
<-m.stopped
}
m.evCancel()
m.db.Close()
m.evLogger.Stop()
os.Remove(m.cfg.ConfigPath())
}
func cleanupModelAndRemoveDir(m *model, dir string) {
func cleanupModelAndRemoveDir(m *testModel, dir string) {
cleanupModel(m)
os.RemoveAll(dir)
}
@ -223,7 +250,7 @@ func dbSnapshot(t *testing.T, m Model, folder string) *db.Snapshot {
// reloads when asked to, instead of checking file mtimes. This is
// because we will be changing the files on disk often enough that the
// mtimes will be unreliable to determine change status.
func folderIgnoresAlwaysReload(m *model, fcfg config.FolderConfiguration) {
func folderIgnoresAlwaysReload(m *testModel, fcfg config.FolderConfiguration) {
m.removeFolder(fcfg)
fset := db.NewFileSet(fcfg.ID, fcfg.Filesystem(), m.db)
ignores := ignore.New(fcfg.Filesystem(), ignore.WithCache(true), ignore.WithChangeDetector(newAlwaysChanged()))
@ -250,7 +277,7 @@ func basicClusterConfig(local, remote protocol.DeviceID, folders ...string) prot
return cc
}
func localIndexUpdate(m *model, folder string, fs []protocol.FileInfo) {
func localIndexUpdate(m *testModel, folder string, fs []protocol.FileInfo) {
m.fmut.RLock()
fset := m.folderFiles[folder]
m.fmut.RUnlock()

View File

@ -15,19 +15,14 @@ import (
stdsync "sync"
"time"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
)
// Service runs a loop for discovery of IGDs (Internet Gateway Devices) and
// setup/renewal of a port mapping.
type Service struct {
suture.Service
id protocol.DeviceID
cfg config.Wrapper
processScheduled chan struct{}
@ -45,8 +40,6 @@ func NewService(id protocol.DeviceID, cfg config.Wrapper) *Service {
mut: sync.NewRWMutex(),
}
s.Service = util.AsService(s.serve, s.String())
cfg.Subscribe(s)
cfgCopy := cfg.RawCopy()
s.CommitConfiguration(cfgCopy, cfgCopy)
return s
@ -70,12 +63,10 @@ func (s *Service) CommitConfiguration(from, to config.Configuration) bool {
return true
}
func (s *Service) Stop() {
s.cfg.Unsubscribe(s)
s.Service.Stop()
}
func (s *Service) Serve(ctx context.Context) error {
s.cfg.Subscribe(s)
defer s.cfg.Unsubscribe(s)
func (s *Service) serve(ctx context.Context) {
announce := stdsync.Once{}
timer := time.NewTimer(0)
@ -97,7 +88,7 @@ func (s *Service) serve(ctx context.Context) {
mapping.clearAddresses()
}
s.mut.RUnlock()
return
return ctx.Err()
}
s.mut.RLock()
enabled := s.enabled
@ -351,7 +342,7 @@ func (s *Service) tryNATDevice(ctx context.Context, natd Device, intPort, extPor
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
return Address{}, nil
return Address{}, ctx.Err()
default:
}

View File

@ -13,7 +13,7 @@ import (
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
)
type relayClientFactory func(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient
@ -61,7 +61,7 @@ func newCommonClient(invitations chan protocol.SessionInvitation, serve func(con
defer c.cleanup()
return serve(ctx)
}
c.ServiceWithError = util.AsServiceWithError(newServe, creator)
c.ServiceWithError = util.AsService(newServe, creator)
if c.invitations == nil {
c.closeInvitationsOnFinish = true
c.invitations = make(chan protocol.SessionInvitation)

View File

@ -93,7 +93,7 @@ func (c *dynamicClient) serve(ctx context.Context) error {
c.client = client
c.mut.Unlock()
c.client.Serve()
c.client.Serve(ctx)
c.mut.Lock()
c.client = nil
@ -104,15 +104,6 @@ func (c *dynamicClient) serve(ctx context.Context) error {
return errors.New("could not find a connectable relay")
}
func (c *dynamicClient) Stop() {
c.mut.RLock()
if c.client != nil {
c.client.Stop()
}
c.mut.RUnlock()
c.commonClient.Stop()
}
func (c *dynamicClient) Error() error {
c.mut.RLock()
defer c.mut.RUnlock()

View File

@ -120,11 +120,12 @@ func TestRelay(ctx context.Context, uri *url.URL, certs []tls.Certificate, sleep
close(invs)
return fmt.Errorf("creating client: %w", err)
}
go c.Serve()
defer func() {
c.Stop()
ctx, cancel := context.WithCancel(context.Background())
go func() {
c.Serve(ctx)
close(invs)
}()
defer cancel()
for i := 0; i < times; i++ {
_, err = GetInvitationFromRelay(ctx, uri, id, certs, timeout)

View File

@ -69,7 +69,8 @@ func TestWalkSub(t *testing.T) {
t.Fatal(err)
}
cfg := testConfig()
cfg, cancel := testConfig()
defer cancel()
cfg.Subs = []string{"dir2"}
cfg.Matcher = ignores
fchan := Walk(context.TODO(), cfg)
@ -103,7 +104,8 @@ func TestWalk(t *testing.T) {
}
t.Log(ignores)
cfg := testConfig()
cfg, cancel := testConfig()
defer cancel()
cfg.Matcher = ignores
fchan := Walk(context.TODO(), cfg)
@ -487,7 +489,8 @@ func TestWalkReceiveOnly(t *testing.T) {
}
func walkDir(fs fs.Filesystem, dir string, cfiler CurrentFiler, matcher *ignore.Matcher, localFlags uint32) []protocol.FileInfo {
cfg := testConfig()
cfg, cancel := testConfig()
defer cancel()
cfg.Filesystem = fs
cfg.Subs = []string{dir}
cfg.AutoNormalize = true
@ -596,7 +599,8 @@ func TestStopWalk(t *testing.T) {
const numHashers = 4
ctx, cancel := context.WithCancel(context.Background())
cfg := testConfig()
cfg, cfgCancel := testConfig()
defer cfgCancel()
cfg.Filesystem = fs
cfg.Hashers = numHashers
cfg.ProgressTickIntervalS = -1 // Don't attempt to build the full list of files before starting to scan...
@ -725,7 +729,8 @@ func TestIssue4841(t *testing.T) {
}
fd.Close()
cfg := testConfig()
cfg, cancel := testConfig()
defer cancel()
cfg.Filesystem = fs
cfg.AutoNormalize = true
cfg.CurrentFiler = fakeCurrentFiler{"foo": {
@ -761,7 +766,8 @@ func TestNotExistingError(t *testing.T) {
t.Fatalf("Lstat returned error %v, while nothing should exist there.", err)
}
cfg := testConfig()
cfg, cancel := testConfig()
defer cancel()
cfg.Subs = []string{sub}
fchan := Walk(context.TODO(), cfg)
for f := range fchan {
@ -891,12 +897,13 @@ func (fcf fakeCurrentFiler) CurrentFile(name string) (protocol.FileInfo, bool) {
return f, ok
}
func testConfig() Config {
func testConfig() (Config, context.CancelFunc) {
evLogger := events.NewLogger()
go evLogger.Serve()
ctx, cancel := context.WithCancel(context.Background())
go evLogger.Serve(ctx)
return Config{
Filesystem: testFs,
Hashers: 2,
EventLogger: evLogger,
}
}, cancel
}

View File

@ -14,7 +14,6 @@ import (
"github.com/AudriusButkevicius/pfilter"
"github.com/ccding/go-stun/stun"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/util"
@ -60,8 +59,6 @@ type Subscriber interface {
}
type Service struct {
suture.Service
name string
cfg config.Wrapper
subscriber Subscriber
@ -105,28 +102,24 @@ func New(cfg config.Wrapper, subscriber Subscriber, conn net.PacketConn) (*Servi
natType: NATUnknown,
addr: nil,
}
s.Service = util.AsService(s.serve, s.String())
return s, otherDataConn
}
func (s *Service) Stop() {
_ = s.stunConn.Close()
s.Service.Stop()
}
func (s *Service) serve(ctx context.Context) {
func (s *Service) Serve(ctx context.Context) error {
defer func() {
s.setNATType(NATUnknown)
s.setExternalAddress(nil, "")
}()
util.OnDone(ctx, func() { _ = s.stunConn.Close() })
timer := time.NewTimer(time.Millisecond)
for {
disabled:
select {
case <-ctx.Done():
return
return ctx.Err()
case <-timer.C:
}
@ -146,7 +139,7 @@ func (s *Service) serve(ctx context.Context) {
// Have we been asked to stop?
select {
case <-ctx.Done():
return
return ctx.Err()
default:
}

View File

@ -12,49 +12,40 @@ import (
"fmt"
"io"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/util"
)
// The auditService subscribes to events and writes these in JSON format, one
// event per line, to the specified writer.
type auditService struct {
suture.Service
w io.Writer // audit destination
sub events.Subscription
w io.Writer // audit destination
evLogger events.Logger
}
func newAuditService(w io.Writer, evLogger events.Logger) *auditService {
s := &auditService{
w: w,
sub: evLogger.Subscribe(events.AllEvents),
return &auditService{
w: w,
evLogger: evLogger,
}
s.Service = util.AsService(s.serve, s.String())
return s
}
// serve runs the audit service.
func (s *auditService) serve(ctx context.Context) {
func (s *auditService) Serve(ctx context.Context) error {
sub := s.evLogger.Subscribe(events.AllEvents)
defer sub.Unsubscribe()
enc := json.NewEncoder(s.w)
for {
select {
case ev := <-s.sub.C():
case ev := <-sub.C():
enc.Encode(ev)
case <-ctx.Done():
return
return ctx.Err()
}
}
}
// Stop stops the audit service.
func (s *auditService) Stop() {
s.Service.Stop()
s.sub.Unsubscribe()
}
func (s *auditService) String() string {
return fmt.Sprintf("auditService@%p", s)
}

View File

@ -8,6 +8,7 @@ package syncthing
import (
"bytes"
"context"
"strings"
"testing"
"time"
@ -18,8 +19,9 @@ import (
func TestAuditService(t *testing.T) {
buf := new(bytes.Buffer)
evLogger := events.NewLogger()
go evLogger.Serve()
defer evLogger.Stop()
ctx, cancel := context.WithCancel(context.Background())
go evLogger.Serve(ctx)
defer cancel()
sub := evLogger.Subscribe(events.AllEvents)
defer sub.Unsubscribe()
@ -28,8 +30,16 @@ func TestAuditService(t *testing.T) {
// Make sure the event goes through before creating the service
<-sub.C()
auditCtx, auditCancel := context.WithCancel(context.Background())
service := newAuditService(buf, evLogger)
go service.Serve()
done := make(chan struct{})
go func() {
service.Serve(auditCtx)
close(done)
}()
// Subscription needs to happen in service.Serve
time.Sleep(10 * time.Millisecond)
// Event that should end up in the audit log
evLogger.Log(events.ConfigSaved, "the second event")
@ -37,7 +47,8 @@ func TestAuditService(t *testing.T) {
// We need to give the events time to arrive, since the channels are buffered etc.
time.Sleep(10 * time.Millisecond)
service.Stop()
auditCancel()
<-done
// This event should not be logged, since we have stopped.
evLogger.Log(events.ConfigSaved, "the third event")

View File

@ -9,6 +9,7 @@ package syncthing
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
@ -19,7 +20,7 @@ import (
"sync"
"time"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
"github.com/syncthing/syncthing/lib/api"
"github.com/syncthing/syncthing/lib/build"
@ -39,6 +40,7 @@ import (
"github.com/syncthing/syncthing/lib/tlsutil"
"github.com/syncthing/syncthing/lib/upgrade"
"github.com/syncthing/syncthing/lib/ur"
"github.com/syncthing/syncthing/lib/util"
)
const (
@ -50,20 +52,6 @@ const (
deviceCertLifetimeDays = 20 * 365
)
type ExitStatus int
func (s ExitStatus) AsInt() int {
return int(s)
}
const (
ExitSuccess ExitStatus = 0
ExitError ExitStatus = 1
ExitNoUpgradeAvailable ExitStatus = 2
ExitRestart ExitStatus = 3
ExitUpgrade ExitStatus = 4
)
type Options struct {
AssetDir string
AuditWriter io.Writer
@ -78,18 +66,18 @@ type Options struct {
}
type App struct {
myID protocol.DeviceID
mainService *suture.Supervisor
cfg config.Wrapper
ll *db.Lowlevel
evLogger events.Logger
cert tls.Certificate
opts Options
exitStatus ExitStatus
err error
stopOnce sync.Once
stop chan struct{}
stopped chan struct{}
myID protocol.DeviceID
mainService *suture.Supervisor
cfg config.Wrapper
ll *db.Lowlevel
evLogger events.Logger
cert tls.Certificate
opts Options
exitStatus util.ExitStatus
err error
stopOnce sync.Once
mainServiceCancel context.CancelFunc
stopped chan struct{}
}
func New(cfg config.Wrapper, dbBackend backend.Backend, evLogger events.Logger, cert tls.Certificate, opts Options) *App {
@ -99,7 +87,6 @@ func New(cfg config.Wrapper, dbBackend backend.Backend, evLogger events.Logger,
evLogger: evLogger,
opts: opts,
cert: cert,
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
close(a.stopped) // Hasn't been started, so shouldn't block on Wait.
@ -112,20 +99,20 @@ func New(cfg config.Wrapper, dbBackend backend.Backend, evLogger events.Logger,
func (a *App) Start() error {
// Create a main service manager. We'll add things to this as we go along.
// We want any logging it does to go through our log system.
a.mainService = suture.New("main", suture.Spec{
Log: func(line string) {
l.Debugln(line)
},
PassThroughPanics: true,
})
spec := util.Spec()
spec.EventHook = func(e suture.Event) {
l.Debugln(e)
}
a.mainService = suture.New("main", spec)
// Start the supervisor and wait for it to stop to handle cleanup.
a.stopped = make(chan struct{})
a.mainService.ServeBackground()
go a.run()
ctx, cancel := context.WithCancel(context.Background())
a.mainServiceCancel = cancel
go a.run(ctx)
if err := a.startup(); err != nil {
a.stopWithErr(ExitError, err)
a.stopWithErr(util.ExitError, err)
return err
}
@ -343,14 +330,9 @@ func (a *App) startup() error {
return nil
}
func (a *App) run() {
<-a.stop
if shouldDebug() {
l.Debugln("Services before stop:")
printServiceTree(os.Stdout, a.mainService, 0)
}
a.mainService.Stop()
func (a *App) run(ctx context.Context) {
err := a.mainService.Serve(ctx)
a.handleMainServiceError(err)
done := make(chan struct{})
go func() {
@ -368,9 +350,23 @@ func (a *App) run() {
close(a.stopped)
}
func (a *App) handleMainServiceError(err error) {
if err == nil || errors.Is(err, context.Canceled) {
return
}
var fatalErr *util.FatalErr
if errors.As(err, &fatalErr) {
a.exitStatus = fatalErr.Status
a.err = fatalErr.Err
return
}
a.err = err
a.exitStatus = util.ExitError
}
// Wait blocks until the app stops running. Also returns if the app hasn't been
// started yet.
func (a *App) Wait() ExitStatus {
func (a *App) Wait() util.ExitStatus {
<-a.stopped
return a.exitStatus
}
@ -379,7 +375,7 @@ func (a *App) Wait() ExitStatus {
// for the app to stop before returning.
func (a *App) Error() error {
select {
case <-a.stop:
case <-a.stopped:
return a.err
default:
}
@ -388,15 +384,19 @@ func (a *App) Error() error {
// Stop stops the app and sets its exit status to given reason, unless the app
// was already stopped before. In any case it returns the effective exit status.
func (a *App) Stop(stopReason ExitStatus) ExitStatus {
func (a *App) Stop(stopReason util.ExitStatus) util.ExitStatus {
return a.stopWithErr(stopReason, nil)
}
func (a *App) stopWithErr(stopReason ExitStatus, err error) ExitStatus {
func (a *App) stopWithErr(stopReason util.ExitStatus, err error) util.ExitStatus {
a.stopOnce.Do(func() {
a.exitStatus = stopReason
a.err = err
close(a.stop)
if shouldDebug() {
l.Debugln("Services before stop:")
printServiceTree(os.Stdout, a.mainService, 0)
}
a.mainServiceCancel()
})
<-a.stopped
return a.exitStatus
@ -416,7 +416,7 @@ func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscri
summaryService := model.NewFolderSummaryService(a.cfg, m, a.myID, a.evLogger)
a.mainService.Add(summaryService)
apiSvc := api.New(a.myID, a.cfg, a.opts.AssetDir, tlsDefaultCommonName, m, defaultSub, diskSub, a.evLogger, discoverer, connectionsService, urService, summaryService, errors, systemLog, &controller{a}, a.opts.NoUpgrade)
apiSvc := api.New(a.myID, a.cfg, a.opts.AssetDir, tlsDefaultCommonName, m, defaultSub, diskSub, a.evLogger, discoverer, connectionsService, urService, summaryService, errors, systemLog, a.opts.NoUpgrade)
a.mainService.Add(apiSvc)
if err := apiSvc.WaitForStart(); err != nil {
@ -440,21 +440,6 @@ func checkShortIDs(cfg config.Wrapper) error {
return nil
}
// Implements api.Controller
type controller struct{ *App }
func (e *controller) Restart() {
e.Stop(ExitRestart)
}
func (e *controller) Shutdown() {
e.Stop(ExitSuccess)
}
func (e *controller) ExitUpgrading() {
e.Stop(ExitUpgrade)
}
type supervisor interface{ Services() []suture.Service }
func printServiceTree(w io.Writer, sup supervisor, level int) {

View File

@ -18,6 +18,7 @@ import (
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/tlsutil"
"github.com/syncthing/syncthing/lib/util"
)
func tempCfgFilename(t *testing.T) string {
@ -86,7 +87,7 @@ func TestStartupFail(t *testing.T) {
}
done := make(chan struct{})
var waitE ExitStatus
var waitE util.ExitStatus
go func() {
waitE = app.Wait()
close(done)
@ -98,8 +99,8 @@ func TestStartupFail(t *testing.T) {
case <-done:
}
if waitE != ExitError {
t.Errorf("Got exit status %v, expected %v", waitE, ExitError)
if waitE != util.ExitError {
t.Errorf("Got exit status %v, expected %v", waitE, util.ExitError)
}
if err = app.Error(); err != startErr {

View File

@ -10,49 +10,38 @@ import (
"context"
"fmt"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/util"
)
// The verbose logging service subscribes to events and prints these in
// verbose format to the console using INFO level.
type verboseService struct {
suture.Service
sub events.Subscription
evLogger events.Logger
}
func newVerboseService(evLogger events.Logger) *verboseService {
s := &verboseService{
sub: evLogger.Subscribe(events.AllEvents),
return &verboseService{
evLogger: evLogger,
}
s.Service = util.AsService(s.serve, s.String())
return s
}
// serve runs the verbose logging service.
func (s *verboseService) serve(ctx context.Context) {
func (s *verboseService) Serve(ctx context.Context) error {
sub := s.evLogger.Subscribe(events.AllEvents)
defer sub.Unsubscribe()
for {
select {
case ev := <-s.sub.C():
case ev := <-sub.C():
formatted := s.formatEvent(ev)
if formatted != "" {
l.Verboseln(formatted)
}
case <-ctx.Done():
return
return ctx.Err()
}
}
}
// Stop stops the verbose logging service.
func (s *verboseService) Stop() {
s.Service.Stop()
s.sub.Unsubscribe()
}
func (s *verboseService) formatEvent(ev events.Event) string {
switch ev.Type {
case events.DownloadProgress, events.LocalIndexUpdated:

View File

@ -17,9 +17,8 @@ import (
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/util"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
)
var (
@ -45,18 +44,15 @@ type FailureHandler interface {
}
func NewFailureHandler(cfg config.Wrapper, evLogger events.Logger) FailureHandler {
h := &failureHandler{
return &failureHandler{
cfg: cfg,
evLogger: evLogger,
optsChan: make(chan config.OptionsConfiguration),
buf: make(map[string]*failureStat),
}
h.Service = util.AsServiceWithError(h.serve, h.String())
return h
}
type failureHandler struct {
suture.Service
cfg config.Wrapper
evLogger events.Logger
optsChan chan config.OptionsConfiguration
@ -68,7 +64,7 @@ type failureStat struct {
count int
}
func (h *failureHandler) serve(ctx context.Context) error {
func (h *failureHandler) Serve(ctx context.Context) error {
go func() {
select {
case h.optsChan <- h.cfg.Options():

View File

@ -29,9 +29,6 @@ import (
"github.com/syncthing/syncthing/lib/scanner"
"github.com/syncthing/syncthing/lib/upgrade"
"github.com/syncthing/syncthing/lib/ur/contract"
"github.com/syncthing/syncthing/lib/util"
"github.com/thejerf/suture"
)
// Current version number of the usage report, for acceptance purposes. If
@ -42,7 +39,6 @@ const Version = 3
var StartTime = time.Now()
type Service struct {
suture.Service
cfg config.Wrapper
model model.Model
connectionsService connections.Service
@ -51,15 +47,13 @@ type Service struct {
}
func New(cfg config.Wrapper, m model.Model, connectionsService connections.Service, noUpgrade bool) *Service {
svc := &Service{
return &Service{
cfg: cfg,
model: m,
connectionsService: connectionsService,
noUpgrade: noUpgrade,
forceRun: make(chan struct{}, 1), // Buffered to prevent locking
}
svc.Service = util.AsService(svc.serve, svc.String())
return svc
}
// ReportData returns the data to be sent in a usage report with the currently
@ -362,7 +356,7 @@ func (s *Service) sendUsageReport(ctx context.Context) error {
return nil
}
func (s *Service) serve(ctx context.Context) {
func (s *Service) Serve(ctx context.Context) error {
s.cfg.Subscribe(s)
defer s.cfg.Unsubscribe(s)
@ -370,7 +364,7 @@ func (s *Service) serve(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
return ctx.Err()
case <-s.forceRun:
t.Reset(0)
case <-t.C:

View File

@ -18,7 +18,7 @@ import (
"github.com/syncthing/syncthing/lib/sync"
"github.com/thejerf/suture"
"github.com/thejerf/suture/v4"
)
type defaultParser interface {
@ -229,13 +229,35 @@ func AddressUnspecifiedLess(a, b net.Addr) bool {
return aIsUnspecified
}
// AsService wraps the given function to implement suture.Service by calling
// that function on serve and closing the passed channel when Stop is called.
func AsService(fn func(ctx context.Context), creator string) suture.Service {
return asServiceWithError(func(ctx context.Context) error {
fn(ctx)
return nil
}, creator)
type FatalErr struct {
Err error
Status ExitStatus
}
func (e *FatalErr) Error() string {
return e.Err.Error()
}
func (e *FatalErr) Unwrap() error {
return e.Err
}
func (e *FatalErr) Is(target error) bool {
return target == suture.ErrTerminateSupervisorTree
}
type ExitStatus int
const (
ExitSuccess ExitStatus = 0
ExitError ExitStatus = 1
ExitNoUpgradeAvailable ExitStatus = 2
ExitRestart ExitStatus = 3
ExitUpgrade ExitStatus = 4
)
func (s ExitStatus) AsInt() int {
return int(s)
}
type ServiceWithError interface {
@ -245,76 +267,35 @@ type ServiceWithError interface {
SetError(error)
}
// AsServiceWithError does the same as AsService, except that it keeps track
// of an error returned by the given function.
func AsServiceWithError(fn func(ctx context.Context) error, creator string) ServiceWithError {
return asServiceWithError(fn, creator)
}
func asServiceWithError(fn func(ctx context.Context) error, creator string) ServiceWithError {
ctx, cancel := context.WithCancel(context.Background())
s := &service{
serve: fn,
ctx: ctx,
cancel: cancel,
stopped: make(chan struct{}),
// AsService wraps the given function to implement suture.Service. In addition
// it keeps track of the returned error and allows querying and setting that error.
func AsService(fn func(ctx context.Context) error, creator string) ServiceWithError {
return &service{
creator: creator,
serve: fn,
mut: sync.NewMutex(),
}
close(s.stopped) // not yet started, don't block on Stop()
return s
}
type service struct {
creator string
serve func(ctx context.Context) error
ctx context.Context
cancel context.CancelFunc
stopped chan struct{}
err error
mut sync.Mutex
}
func (s *service) Serve() {
func (s *service) Serve(ctx context.Context) error {
s.mut.Lock()
select {
case <-s.ctx.Done():
s.mut.Unlock()
return
default:
}
s.err = nil
s.stopped = make(chan struct{})
s.mut.Unlock()
var err error
defer func() {
if err == context.Canceled {
err = nil
}
s.mut.Lock()
s.err = err
close(s.stopped)
s.mut.Unlock()
}()
err = s.serve(s.ctx)
}
err := s.serve(ctx)
func (s *service) Stop() {
s.mut.Lock()
select {
case <-s.ctx.Done():
s.mut.Unlock()
panic(fmt.Sprintf("Stop called more than once on %v", s))
default:
s.cancel()
}
// Cache s.stopped in a variable while we hold the mutex
// to prevent a data race with Serve's resetting it.
stopped := s.stopped
s.err = err
s.mut.Unlock()
<-stopped
return err
}
func (s *service) Error() error {
@ -331,6 +312,37 @@ func (s *service) SetError(err error) {
func (s *service) String() string {
return fmt.Sprintf("Service@%p created by %v", s, s.creator)
}
// OnDone calls fn when ctx is cancelled.
func OnDone(ctx context.Context, fn func()) {
go func() {
<-ctx.Done()
fn()
}()
}
type doneService struct {
fn func()
}
func (s *doneService) Serve(ctx context.Context) error {
<-ctx.Done()
s.fn()
return nil
}
// OnSupervisorDone calls fn when sup is done.
func OnSupervisorDone(sup *suture.Supervisor, fn func()) {
sup.Add(&doneService{fn})
}
func Spec() suture.Spec {
return suture.Spec{
PassThroughPanics: true,
DontPropagateTermination: false,
}
}
func CallWithContext(ctx context.Context, fn func() error) error {

View File

@ -7,8 +7,6 @@
package util
import (
"context"
"strings"
"testing"
)
@ -271,23 +269,6 @@ func TestInspecifiedAddressLess(t *testing.T) {
}
}
func TestUtilStopTwicePanic(t *testing.T) {
name := "foo"
s := AsService(func(ctx context.Context) {
<-ctx.Done()
}, name)
go s.Serve()
s.Stop()
defer func() {
if r := recover(); r == nil || !strings.Contains(r.(string), name) {
t.Fatalf(`expected panic containing "%v", got "%v"`, name, r)
}
}()
s.Stop()
}
func TestFillNil(t *testing.T) {
type A struct {
Slice []int

View File

@ -152,8 +152,9 @@ func TestAggregate(t *testing.T) {
// TestInProgress checks that ignoring files currently edited by Syncthing works
func TestInProgress(t *testing.T) {
evLogger := events.NewLogger()
go evLogger.Serve()
defer evLogger.Stop()
ctx, cancel := context.WithCancel(context.Background())
go evLogger.Serve(ctx)
defer cancel()
testCase := func(c chan<- fs.Event) {
evLogger.Log(events.ItemStarted, map[string]string{
"item": "inprogress",