lib: Replace done channel with contexts in and add names to util services (#6166)

This commit is contained in:
Simon Frei 2019-11-21 08:41:15 +01:00 committed by GitHub
parent 552ea68672
commit 90d85fd0a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 240 additions and 218 deletions

View File

@ -8,6 +8,7 @@ package api
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
@ -136,7 +137,7 @@ func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonNam
configChanged: make(chan struct{}),
startedOnce: make(chan struct{}),
}
s.Service = util.AsService(s.serve)
s.Service = util.AsService(s.serve, s.String())
return s
}
@ -207,7 +208,7 @@ func sendJSON(w http.ResponseWriter, jsonObject interface{}) {
fmt.Fprintf(w, "%s\n", bs)
}
func (s *service) serve(stop chan struct{}) {
func (s *service) serve(ctx context.Context) {
listener, err := s.getListener(s.cfg.GUI())
if err != nil {
select {
@ -381,7 +382,7 @@ func (s *service) serve(stop chan struct{}) {
// Wait for stop, restart or error signals
select {
case <-stop:
case <-ctx.Done():
// Shutting down permanently
l.Debugln("shutting down (stop)")
case <-s.configChanged:

View File

@ -7,6 +7,7 @@
package beacon
import (
"context"
"fmt"
"net"
"time"
@ -63,23 +64,23 @@ func newCast(name string) *cast {
}
}
func (c *cast) addReader(svc func(chan struct{}) error) {
func (c *cast) addReader(svc func(context.Context) error) {
c.reader = c.createService(svc, "reader")
c.Add(c.reader)
}
func (c *cast) addWriter(svc func(stop chan struct{}) error) {
func (c *cast) addWriter(svc func(ctx context.Context) error) {
c.writer = c.createService(svc, "writer")
c.Add(c.writer)
}
func (c *cast) createService(svc func(chan struct{}) error, suffix string) util.ServiceWithError {
return util.AsServiceWithError(func(stop chan struct{}) error {
func (c *cast) createService(svc func(context.Context) error, suffix string) util.ServiceWithError {
return util.AsServiceWithError(func(ctx context.Context) error {
l.Debugln("Starting", c.name, suffix)
err := svc(stop)
err := svc(ctx)
l.Debugf("Stopped %v %v: %v", c.name, suffix, err)
return err
})
}, fmt.Sprintf("%s/%s", c, suffix))
}
func (c *cast) Stop() {

View File

@ -7,34 +7,32 @@
package beacon
import (
"context"
"net"
"time"
)
func NewBroadcast(port int) Interface {
c := newCast("broadcastBeacon")
c.addReader(func(stop chan struct{}) error {
return readBroadcasts(c.outbox, port, stop)
c.addReader(func(ctx context.Context) error {
return readBroadcasts(ctx, c.outbox, port)
})
c.addWriter(func(stop chan struct{}) error {
return writeBroadcasts(c.inbox, port, stop)
c.addWriter(func(ctx context.Context) error {
return writeBroadcasts(ctx, c.inbox, port)
})
return c
}
func writeBroadcasts(inbox <-chan []byte, port int, stop chan struct{}) error {
func writeBroadcasts(ctx context.Context, inbox <-chan []byte, port int) error {
conn, err := net.ListenUDP("udp4", nil)
if err != nil {
l.Debugln(err)
return err
}
done := make(chan struct{})
defer close(done)
doneCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-stop:
case <-done:
}
<-doneCtx.Done()
conn.Close()
}()
@ -42,7 +40,7 @@ func writeBroadcasts(inbox <-chan []byte, port int, stop chan struct{}) error {
var bs []byte
select {
case bs = <-inbox:
case <-stop:
case <-doneCtx.Done():
return nil
}
@ -99,19 +97,17 @@ func writeBroadcasts(inbox <-chan []byte, port int, stop chan struct{}) error {
}
}
func readBroadcasts(outbox chan<- recv, port int, stop chan struct{}) error {
func readBroadcasts(ctx context.Context, outbox chan<- recv, port int) error {
conn, err := net.ListenUDP("udp4", &net.UDPAddr{Port: port})
if err != nil {
l.Debugln(err)
return err
}
done := make(chan struct{})
defer close(done)
doneCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-stop:
case <-done:
}
<-doneCtx.Done()
conn.Close()
}()
@ -129,7 +125,7 @@ func readBroadcasts(outbox chan<- recv, port int, stop chan struct{}) error {
copy(c, bs)
select {
case outbox <- recv{c, addr}:
case <-stop:
case <-doneCtx.Done():
return nil
default:
l.Debugln("dropping message")

View File

@ -7,6 +7,7 @@
package beacon
import (
"context"
"errors"
"net"
"time"
@ -16,16 +17,16 @@ import (
func NewMulticast(addr string) Interface {
c := newCast("multicastBeacon")
c.addReader(func(stop chan struct{}) error {
return readMulticasts(c.outbox, addr, stop)
c.addReader(func(ctx context.Context) error {
return readMulticasts(ctx, c.outbox, addr)
})
c.addWriter(func(stop chan struct{}) error {
return writeMulticasts(c.inbox, addr, stop)
c.addWriter(func(ctx context.Context) error {
return writeMulticasts(ctx, c.inbox, addr)
})
return c
}
func writeMulticasts(inbox <-chan []byte, addr string, stop chan struct{}) error {
func writeMulticasts(ctx context.Context, inbox <-chan []byte, addr string) error {
gaddr, err := net.ResolveUDPAddr("udp6", addr)
if err != nil {
l.Debugln(err)
@ -37,13 +38,10 @@ func writeMulticasts(inbox <-chan []byte, addr string, stop chan struct{}) error
l.Debugln(err)
return err
}
done := make(chan struct{})
defer close(done)
doneCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-stop:
case <-done:
}
<-doneCtx.Done()
conn.Close()
}()
@ -57,7 +55,7 @@ func writeMulticasts(inbox <-chan []byte, addr string, stop chan struct{}) error
var bs []byte
select {
case bs = <-inbox:
case <-stop:
case <-doneCtx.Done():
return nil
}
@ -84,7 +82,7 @@ func writeMulticasts(inbox <-chan []byte, addr string, stop chan struct{}) error
success++
select {
case <-stop:
case <-doneCtx.Done():
return nil
default:
}
@ -96,7 +94,7 @@ func writeMulticasts(inbox <-chan []byte, addr string, stop chan struct{}) error
}
}
func readMulticasts(outbox chan<- recv, addr string, stop chan struct{}) error {
func readMulticasts(ctx context.Context, outbox chan<- recv, addr string) error {
gaddr, err := net.ResolveUDPAddr("udp6", addr)
if err != nil {
l.Debugln(err)
@ -108,13 +106,10 @@ func readMulticasts(outbox chan<- recv, addr string, stop chan struct{}) error {
l.Debugln(err)
return err
}
done := make(chan struct{})
defer close(done)
doneCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-stop:
case <-done:
}
<-doneCtx.Done()
conn.Close()
}()
@ -144,7 +139,7 @@ func readMulticasts(outbox chan<- recv, addr string, stop chan struct{}) error {
bs := make([]byte, 65536)
for {
select {
case <-stop:
case <-doneCtx.Done():
return nil
default:
}

View File

@ -78,13 +78,9 @@ func (t *quicListener) OnExternalAddressChanged(address *stun.Host, via string)
}
}
func (t *quicListener) serve(stop chan struct{}) error {
func (t *quicListener) serve(ctx context.Context) error {
network := strings.Replace(t.uri.Scheme, "quic", "udp", -1)
// Convert the stop channel into a context
ctx, cancel := context.WithCancel(context.Background())
go func() { <-stop; cancel() }()
packetConn, err := net.ListenPacket(network, t.uri.Host)
if err != nil {
l.Infoln("Listen (BEP/quic):", err)
@ -205,7 +201,7 @@ func (f *quicListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.
conns: conns,
factory: f,
}
l.ServiceWithError = util.AsServiceWithError(l.serve)
l.ServiceWithError = util.AsServiceWithError(l.serve, l.String())
l.nat.Store(stun.NATUnknown)
return l
}

View File

@ -7,6 +7,7 @@
package connections
import (
"context"
"crypto/tls"
"net/url"
"sync"
@ -40,7 +41,7 @@ type relayListener struct {
mut sync.RWMutex
}
func (t *relayListener) serve(stop chan struct{}) error {
func (t *relayListener) serve(ctx context.Context) error {
clnt, err := client.NewClient(t.uri, t.tlsCfg.Certificates, nil, 10*time.Second)
if err != nil {
l.Infoln("Listen (BEP/relay):", err)
@ -112,7 +113,7 @@ func (t *relayListener) serve(stop chan struct{}) error {
t.notifyAddressesChanged(t)
}
case <-stop:
case <-ctx.Done():
return nil
}
}
@ -178,7 +179,7 @@ func (f *relayListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls
conns: conns,
factory: f,
}
t.ServiceWithError = util.AsServiceWithError(t.serve)
t.ServiceWithError = util.AsServiceWithError(t.serve, t.String())
return t
}

View File

@ -7,6 +7,7 @@
package connections
import (
"context"
"crypto/tls"
"errors"
"fmt"
@ -185,18 +186,18 @@ func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *t
// the common handling regardless of whether the connection was
// incoming or outgoing.
service.Add(util.AsService(service.connect))
service.Add(util.AsService(service.handle))
service.Add(util.AsService(service.connect, fmt.Sprintf("%s/connect", service)))
service.Add(util.AsService(service.handle, fmt.Sprintf("%s/handle", service)))
service.Add(service.listenerSupervisor)
return service
}
func (s *service) handle(stop chan struct{}) {
func (s *service) handle(ctx context.Context) {
var c internalConn
for {
select {
case <-stop:
case <-ctx.Done():
return
case c = <-s.conns:
}
@ -324,7 +325,7 @@ func (s *service) handle(stop chan struct{}) {
}
}
func (s *service) connect(stop chan struct{}) {
func (s *service) connect(ctx context.Context) {
nextDial := make(map[string]time.Time)
// Used as delay for the first few connection attempts, increases
@ -480,7 +481,7 @@ func (s *service) connect(stop chan struct{}) {
select {
case <-time.After(sleep):
case <-stop:
case <-ctx.Done():
return
}
}

View File

@ -7,6 +7,7 @@
package connections
import (
"context"
"crypto/tls"
"net"
"net/url"
@ -42,7 +43,7 @@ type tcpListener struct {
mut sync.RWMutex
}
func (t *tcpListener) serve(stop chan struct{}) error {
func (t *tcpListener) serve(ctx context.Context) error {
tcaddr, err := net.ResolveTCPAddr(t.uri.Scheme, t.uri.Host)
if err != nil {
l.Infoln("Listen (BEP/tcp):", err)
@ -76,7 +77,7 @@ func (t *tcpListener) serve(stop chan struct{}) error {
listener.SetDeadline(time.Now().Add(time.Second))
conn, err := listener.Accept()
select {
case <-stop:
case <-ctx.Done():
if err == nil {
conn.Close()
}
@ -183,7 +184,7 @@ func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.C
natService: natService,
factory: f,
}
l.ServiceWithError = util.AsServiceWithError(l.serve)
l.ServiceWithError = util.AsServiceWithError(l.serve, l.String())
return l
}

View File

@ -8,6 +8,7 @@ package discover
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
@ -128,7 +129,7 @@ func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, evLo
noLookup: opts.noLookup,
evLogger: evLogger,
}
cl.Service = util.AsService(cl.serve)
cl.Service = util.AsService(cl.serve, cl.String())
if !opts.noAnnounce {
// If we are supposed to annonce, it's an error until we've done so.
cl.setError(errors.New("not announced"))
@ -188,11 +189,11 @@ func (c *globalClient) String() string {
return "global@" + c.server
}
func (c *globalClient) serve(stop chan struct{}) {
func (c *globalClient) serve(ctx context.Context) {
if c.noAnnounce {
// We're configured to not do announcements, only lookups. To maintain
// the same interface, we just pause here if Serve() is run.
<-stop
<-ctx.Done()
return
}
@ -212,7 +213,7 @@ func (c *globalClient) serve(stop chan struct{}) {
case <-timer.C:
c.sendAnnouncement(timer)
case <-stop:
case <-ctx.Done():
return
}
}

View File

@ -10,8 +10,10 @@
package discover
import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"net"
"net/url"
@ -81,9 +83,9 @@ func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, evLogge
c.beacon = beacon.NewMulticast(addr)
}
c.Add(c.beacon)
c.Add(util.AsService(c.recvAnnouncements))
c.Add(util.AsService(c.recvAnnouncements, fmt.Sprintf("%s/recv", c)))
c.Add(util.AsService(c.sendLocalAnnouncements))
c.Add(util.AsService(c.sendLocalAnnouncements, fmt.Sprintf("%s/sendLocal", c)))
return c, nil
}
@ -135,7 +137,7 @@ func (c *localClient) announcementPkt(instanceID int64, msg []byte) ([]byte, boo
return msg, true
}
func (c *localClient) sendLocalAnnouncements(stop chan struct{}) {
func (c *localClient) sendLocalAnnouncements(ctx context.Context) {
var msg []byte
var ok bool
instanceID := rand.Int63()
@ -147,18 +149,18 @@ func (c *localClient) sendLocalAnnouncements(stop chan struct{}) {
select {
case <-c.localBcastTick:
case <-c.forcedBcastTick:
case <-stop:
case <-ctx.Done():
return
}
}
}
func (c *localClient) recvAnnouncements(stop chan struct{}) {
func (c *localClient) recvAnnouncements(ctx context.Context) {
b := c.beacon
warnedAbout := make(map[string]bool)
for {
select {
case <-stop:
case <-ctx.Done():
return
default:
}

View File

@ -8,8 +8,10 @@
package events
import (
"context"
"encoding/json"
"errors"
"fmt"
"runtime"
"time"
@ -258,7 +260,7 @@ func NewLogger() Logger {
funcs: make(chan func()),
toUnsubscribe: make(chan *subscription),
}
l.Service = util.AsService(l.serve)
l.Service = util.AsService(l.serve, l.String())
// Make sure the timer is in the stopped state and hasn't fired anything
// into the channel.
if !l.timeout.Stop() {
@ -267,7 +269,7 @@ func NewLogger() Logger {
return l
}
func (l *logger) serve(stop chan struct{}) {
func (l *logger) serve(ctx context.Context) {
loop:
for {
select {
@ -282,7 +284,7 @@ loop:
case s := <-l.toUnsubscribe:
l.unsubscribe(s)
case <-stop:
case <-ctx.Done():
break loop
}
}
@ -388,6 +390,10 @@ func (l *logger) unsubscribe(s *subscription) {
close(s.events)
}
func (l *logger) String() string {
return fmt.Sprintf("events.Logger/@%p", l)
}
// Poll returns an event from the subscription or an error if the poll times
// out of the event channel is closed. Poll should not be called concurrently
// from multiple goroutines for a single subscription.

View File

@ -55,12 +55,12 @@ func (f *BasicFilesystem) Watch(name string, ignore Matcher, ctx context.Context
}
errChan := make(chan error)
go f.watchLoop(name, roots, backendChan, outChan, errChan, ignore, ctx)
go f.watchLoop(ctx, name, roots, backendChan, outChan, errChan, ignore)
return outChan, errChan, nil
}
func (f *BasicFilesystem) watchLoop(name string, roots []string, backendChan chan notify.EventInfo, outChan chan<- Event, errChan chan<- error, ignore Matcher, ctx context.Context) {
func (f *BasicFilesystem) watchLoop(ctx context.Context, name string, roots []string, backendChan chan notify.EventInfo, outChan chan<- Event, errChan chan<- error, ignore Matcher) {
for {
// Detect channel overflow
if len(backendChan) == backendBuffer {

View File

@ -178,7 +178,7 @@ func TestWatchWinRoot(t *testing.T) {
}
cancel()
}()
fs.watchLoop(".", roots, backendChan, outChan, errChan, fakeMatcher{}, ctx)
fs.watchLoop(ctx, ".", roots, backendChan, outChan, errChan, fakeMatcher{})
}()
// filepath.Dir as watch has a /... suffix
@ -219,7 +219,7 @@ func expectErrorForPath(t *testing.T, path string) {
// testFs is Filesystem, but we need BasicFilesystem here
fs := newBasicFilesystem(testDirAbs)
go fs.watchLoop(".", []string{testDirAbs}, backendChan, outChan, errChan, fakeMatcher{}, ctx)
go fs.watchLoop(ctx, ".", []string{testDirAbs}, backendChan, outChan, errChan, fakeMatcher{})
backendChan <- fakeEventInfo(path)
@ -244,7 +244,7 @@ func TestWatchSubpath(t *testing.T) {
fs := newBasicFilesystem(testDirAbs)
abs, _ := fs.rooted("sub")
go fs.watchLoop("sub", []string{testDirAbs}, backendChan, outChan, errChan, fakeMatcher{}, ctx)
go fs.watchLoop(ctx, "sub", []string{testDirAbs}, backendChan, outChan, errChan, fakeMatcher{})
backendChan <- fakeEventInfo(filepath.Join(abs, "file"))

View File

@ -49,7 +49,6 @@ type folder struct {
fset *db.FileSet
ignores *ignore.Matcher
ctx context.Context
cancel context.CancelFunc
scanInterval time.Duration
scanTimer *time.Timer
@ -80,8 +79,6 @@ type puller interface {
}
func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger) folder {
ctx, cancel := context.WithCancel(context.Background())
return folder{
stateTracker: newStateTracker(cfg.ID, evLogger),
FolderConfiguration: cfg,
@ -91,8 +88,6 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf
shortID: model.shortID,
fset: fset,
ignores: ignores,
ctx: ctx,
cancel: cancel,
scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
@ -109,10 +104,12 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf
}
}
func (f *folder) serve(_ chan struct{}) {
func (f *folder) serve(ctx context.Context) {
atomic.AddInt32(&f.model.foldersRunning, 1)
defer atomic.AddInt32(&f.model.foldersRunning, -1)
f.ctx = ctx
l.Debugln(f, "starting")
defer l.Debugln(f, "exiting")
@ -256,11 +253,6 @@ func (f *folder) Delay(next time.Duration) {
f.scanDelay <- next
}
func (f *folder) Stop() {
f.cancel()
f.Service.Stop()
}
// CheckHealth checks the folder for common errors, updates the folder state
// and returns the current folder error, or nil if the folder is healthy.
func (f *folder) CheckHealth() error {
@ -643,7 +635,7 @@ func (f *folder) monitorWatch(ctx context.Context) {
failTimer.Reset(time.Minute)
continue
}
watchaggregator.Aggregate(eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, f.evLogger, aggrCtx)
watchaggregator.Aggregate(aggrCtx, eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, f.evLogger)
l.Debugln("Started filesystem watcher for folder", f.Description())
case err = <-errChan:
f.setWatchError(err)

View File

@ -30,7 +30,7 @@ func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher,
folder: newFolder(model, fset, ignores, cfg, evLogger),
}
f.folder.puller = f
f.folder.Service = util.AsService(f.serve)
f.folder.Service = util.AsService(f.serve, f.String())
return f
}

View File

@ -118,7 +118,7 @@ func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matche
pullErrorsMut: sync.NewMutex(),
}
f.folder.puller = f
f.folder.Service = util.AsService(f.serve)
f.folder.Service = util.AsService(f.serve, f.String())
if f.Copiers == 0 {
f.Copiers = defaultCopiers

View File

@ -7,6 +7,7 @@
package model
import (
"context"
"fmt"
"strings"
"time"
@ -63,8 +64,8 @@ func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID,
lastEventReqMut: sync.NewMutex(),
}
service.Add(util.AsService(service.listenForUpdates))
service.Add(util.AsService(service.calculateSummaries))
service.Add(util.AsService(service.listenForUpdates, fmt.Sprintf("%s/listenForUpdates", service)))
service.Add(util.AsService(service.calculateSummaries, fmt.Sprintf("%s/calculateSummaries", service)))
return service
}
@ -145,7 +146,7 @@ func (c *folderSummaryService) OnEventRequest() {
// listenForUpdates subscribes to the event bus and makes note of folders that
// need their data recalculated.
func (c *folderSummaryService) listenForUpdates(stop chan struct{}) {
func (c *folderSummaryService) listenForUpdates(ctx context.Context) {
sub := c.evLogger.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged | events.DownloadProgress)
defer sub.Unsubscribe()
@ -155,7 +156,7 @@ func (c *folderSummaryService) listenForUpdates(stop chan struct{}) {
select {
case ev := <-sub.C():
c.processUpdate(ev)
case <-stop:
case <-ctx.Done():
return
}
}
@ -234,7 +235,7 @@ func (c *folderSummaryService) processUpdate(ev events.Event) {
// calculateSummaries periodically recalculates folder summaries and
// completion percentage, and sends the results on the event bus.
func (c *folderSummaryService) calculateSummaries(stop chan struct{}) {
func (c *folderSummaryService) calculateSummaries(ctx context.Context) {
const pumpInterval = 2 * time.Second
pump := time.NewTimer(pumpInterval)
@ -255,7 +256,7 @@ func (c *folderSummaryService) calculateSummaries(stop chan struct{}) {
case folder := <-c.immediate:
c.sendSummary(folder)
case <-stop:
case <-ctx.Done():
return
}
}

View File

@ -1227,7 +1227,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
dropSymlinks: dropSymlinks,
evLogger: m.evLogger,
}
is.Service = util.AsService(is.serve)
is.Service = util.AsService(is.serve, is.String())
// The token isn't tracked as the service stops when the connection
// terminates and is automatically removed from supervisor (by
// implementing suture.IsCompletable).
@ -1970,7 +1970,7 @@ type indexSender struct {
connClosed chan struct{}
}
func (s *indexSender) serve(stop chan struct{}) {
func (s *indexSender) serve(ctx context.Context) {
var err error
l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.dev, s.conn, s.prevSequence)
@ -1991,7 +1991,7 @@ func (s *indexSender) serve(stop chan struct{}) {
for err == nil {
select {
case <-stop:
case <-ctx.Done():
return
case <-s.connClosed:
return
@ -2004,7 +2004,7 @@ func (s *indexSender) serve(stop chan struct{}) {
// sending for.
if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
select {
case <-stop:
case <-ctx.Done():
return
case <-s.connClosed:
return
@ -2037,7 +2037,7 @@ func (s *indexSender) sendIndexTo() error {
initial := s.prevSequence == 0
batch := newFileInfoBatch(nil)
batch.flushFn = func(fs []protocol.FileInfo) error {
l.Debugf("Sending indexes for %s to %s at %s: %d files (<%d bytes)", s.folder, s.dev, s.conn, len(batch.infos), batch.size)
l.Debugf("%v: Sending %d files (<%d bytes)", s, len(batch.infos), batch.size)
if initial {
initial = false
return s.conn.Index(s.folder, fs)
@ -2099,6 +2099,10 @@ func (s *indexSender) sendIndexTo() error {
return err
}
func (s *indexSender) String() string {
return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.dev, s.conn)
}
func (m *model) requestGlobal(ctx context.Context, deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
m.pmut.RLock()
nc, ok := m.conn[deviceID]

View File

@ -7,6 +7,7 @@
package model
import (
"context"
"fmt"
"time"
@ -47,7 +48,7 @@ func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmi
evLogger: evLogger,
mut: sync.NewMutex(),
}
t.Service = util.AsService(t.serve)
t.Service = util.AsService(t.serve, t.String())
t.CommitConfiguration(config.Configuration{}, cfg.RawCopy())
cfg.Subscribe(t)
@ -57,12 +58,12 @@ func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmi
// serve starts the progress emitter which starts emitting DownloadProgress
// events as the progress happens.
func (t *ProgressEmitter) serve(stop chan struct{}) {
func (t *ProgressEmitter) serve(ctx context.Context) {
var lastUpdate time.Time
var lastCount, newCount int
for {
select {
case <-stop:
case <-ctx.Done():
l.Debugln("progress emitter: stopping")
return
case <-t.timer.C:

View File

@ -7,6 +7,7 @@
package nat
import (
"context"
"sync"
"time"
)
@ -19,7 +20,7 @@ func Register(provider DiscoverFunc) {
providers = append(providers, provider)
}
func discoverAll(renewal, timeout time.Duration, stop chan struct{}) map[string]Device {
func discoverAll(ctx context.Context, renewal, timeout time.Duration) map[string]Device {
wg := &sync.WaitGroup{}
wg.Add(len(providers))
@ -32,7 +33,7 @@ func discoverAll(renewal, timeout time.Duration, stop chan struct{}) map[string]
for _, dev := range f(renewal, timeout) {
select {
case c <- dev:
case <-stop:
case <-ctx.Done():
return
}
}
@ -50,7 +51,7 @@ func discoverAll(renewal, timeout time.Duration, stop chan struct{}) map[string]
return
}
nats[dev.ID()] = dev
case <-stop:
case <-ctx.Done():
return
}
}

View File

@ -7,6 +7,7 @@
package nat
import (
"context"
"fmt"
"hash/fnv"
"math/rand"
@ -43,11 +44,11 @@ func NewService(id protocol.DeviceID, cfg config.Wrapper) *Service {
timer: time.NewTimer(0),
mut: sync.NewRWMutex(),
}
s.Service = util.AsService(s.serve)
s.Service = util.AsService(s.serve, s.String())
return s
}
func (s *Service) serve(stop chan struct{}) {
func (s *Service) serve(ctx context.Context) {
announce := stdsync.Once{}
s.mut.Lock()
@ -57,7 +58,7 @@ func (s *Service) serve(stop chan struct{}) {
for {
select {
case <-s.timer.C:
if found := s.process(stop); found != -1 {
if found := s.process(ctx); found != -1 {
announce.Do(func() {
suffix := "s"
if found == 1 {
@ -66,7 +67,7 @@ func (s *Service) serve(stop chan struct{}) {
l.Infoln("Detected", found, "NAT service"+suffix)
})
}
case <-stop:
case <-ctx.Done():
s.timer.Stop()
s.mut.RLock()
for _, mapping := range s.mappings {
@ -78,7 +79,7 @@ func (s *Service) serve(stop chan struct{}) {
}
}
func (s *Service) process(stop chan struct{}) int {
func (s *Service) process(ctx context.Context) int {
// toRenew are mappings which are due for renewal
// toUpdate are the remaining mappings, which will only be updated if one of
// the old IGDs has gone away, or a new IGD has appeared, but only if we
@ -120,14 +121,14 @@ func (s *Service) process(stop chan struct{}) int {
return -1
}
nats := discoverAll(time.Duration(s.cfg.Options().NATRenewalM)*time.Minute, time.Duration(s.cfg.Options().NATTimeoutS)*time.Second, stop)
nats := discoverAll(ctx, time.Duration(s.cfg.Options().NATRenewalM)*time.Minute, time.Duration(s.cfg.Options().NATTimeoutS)*time.Second)
for _, mapping := range toRenew {
s.updateMapping(mapping, nats, true, stop)
s.updateMapping(ctx, mapping, nats, true)
}
for _, mapping := range toUpdate {
s.updateMapping(mapping, nats, false, stop)
s.updateMapping(ctx, mapping, nats, false)
}
return len(nats)
@ -177,17 +178,17 @@ func (s *Service) RemoveMapping(mapping *Mapping) {
// acquire mappings for natds which the mapping was unaware of before.
// Optionally takes renew flag which indicates whether or not we should renew
// mappings with existing natds
func (s *Service) updateMapping(mapping *Mapping, nats map[string]Device, renew bool, stop chan struct{}) {
func (s *Service) updateMapping(ctx context.Context, mapping *Mapping, nats map[string]Device, renew bool) {
var added, removed []Address
renewalTime := time.Duration(s.cfg.Options().NATRenewalM) * time.Minute
mapping.expires = time.Now().Add(renewalTime)
newAdded, newRemoved := s.verifyExistingMappings(mapping, nats, renew, stop)
newAdded, newRemoved := s.verifyExistingMappings(ctx, mapping, nats, renew)
added = append(added, newAdded...)
removed = append(removed, newRemoved...)
newAdded, newRemoved = s.acquireNewMappings(mapping, nats, stop)
newAdded, newRemoved = s.acquireNewMappings(ctx, mapping, nats)
added = append(added, newAdded...)
removed = append(removed, newRemoved...)
@ -196,14 +197,14 @@ func (s *Service) updateMapping(mapping *Mapping, nats map[string]Device, renew
}
}
func (s *Service) verifyExistingMappings(mapping *Mapping, nats map[string]Device, renew bool, stop chan struct{}) ([]Address, []Address) {
func (s *Service) verifyExistingMappings(ctx context.Context, mapping *Mapping, nats map[string]Device, renew bool) ([]Address, []Address) {
var added, removed []Address
leaseTime := time.Duration(s.cfg.Options().NATLeaseM) * time.Minute
for id, address := range mapping.addressMap() {
select {
case <-stop:
case <-ctx.Done():
return nil, nil
default:
}
@ -225,7 +226,7 @@ func (s *Service) verifyExistingMappings(mapping *Mapping, nats map[string]Devic
l.Debugf("Renewing %s -> %s mapping on %s", mapping, address, id)
addr, err := s.tryNATDevice(nat, mapping.address.Port, address.Port, leaseTime, stop)
addr, err := s.tryNATDevice(ctx, nat, mapping.address.Port, address.Port, leaseTime)
if err != nil {
l.Debugf("Failed to renew %s -> mapping on %s", mapping, address, id)
mapping.removeAddress(id)
@ -247,7 +248,7 @@ func (s *Service) verifyExistingMappings(mapping *Mapping, nats map[string]Devic
return added, removed
}
func (s *Service) acquireNewMappings(mapping *Mapping, nats map[string]Device, stop chan struct{}) ([]Address, []Address) {
func (s *Service) acquireNewMappings(ctx context.Context, mapping *Mapping, nats map[string]Device) ([]Address, []Address) {
var added, removed []Address
leaseTime := time.Duration(s.cfg.Options().NATLeaseM) * time.Minute
@ -255,7 +256,7 @@ func (s *Service) acquireNewMappings(mapping *Mapping, nats map[string]Device, s
for id, nat := range nats {
select {
case <-stop:
case <-ctx.Done():
return nil, nil
default:
}
@ -274,7 +275,7 @@ func (s *Service) acquireNewMappings(mapping *Mapping, nats map[string]Device, s
l.Debugf("Acquiring %s mapping on %s", mapping, id)
addr, err := s.tryNATDevice(nat, mapping.address.Port, 0, leaseTime, stop)
addr, err := s.tryNATDevice(ctx, nat, mapping.address.Port, 0, leaseTime)
if err != nil {
l.Debugf("Failed to acquire %s mapping on %s", mapping, id)
continue
@ -291,7 +292,7 @@ func (s *Service) acquireNewMappings(mapping *Mapping, nats map[string]Device, s
// tryNATDevice tries to acquire a port mapping for the given internal address to
// the given external port. If external port is 0, picks a pseudo-random port.
func (s *Service) tryNATDevice(natd Device, intPort, extPort int, leaseTime time.Duration, stop chan struct{}) (Address, error) {
func (s *Service) tryNATDevice(ctx context.Context, natd Device, intPort, extPort int, leaseTime time.Duration) (Address, error) {
var err error
var port int
@ -313,7 +314,7 @@ func (s *Service) tryNATDevice(natd Device, intPort, extPort int, leaseTime time
for i := 0; i < 10; i++ {
select {
case <-stop:
case <-ctx.Done():
return Address{}, nil
default:
}
@ -343,6 +344,10 @@ findIP:
}, nil
}
func (s *Service) String() string {
return fmt.Sprintf("nat.Service@%p", s)
}
func hash(input string) int64 {
h := fnv.New64a()
h.Write([]byte(input))

View File

@ -3,6 +3,7 @@
package client
import (
"context"
"crypto/tls"
"fmt"
"net/url"
@ -51,16 +52,16 @@ type commonClient struct {
mut sync.RWMutex
}
func newCommonClient(invitations chan protocol.SessionInvitation, serve func(chan struct{}) error) commonClient {
func newCommonClient(invitations chan protocol.SessionInvitation, serve func(context.Context) error, creator string) commonClient {
c := commonClient{
invitations: invitations,
mut: sync.NewRWMutex(),
}
newServe := func(stop chan struct{}) error {
newServe := func(ctx context.Context) error {
defer c.cleanup()
return serve(stop)
return serve(ctx)
}
c.ServiceWithError = util.AsServiceWithError(newServe)
c.ServiceWithError = util.AsServiceWithError(newServe, creator)
if c.invitations == nil {
c.closeInvitationsOnFinish = true
c.invitations = make(chan protocol.SessionInvitation)

View File

@ -3,6 +3,7 @@
package client
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
@ -32,11 +33,11 @@ func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan pr
certs: certs,
timeout: timeout,
}
c.commonClient = newCommonClient(invitations, c.serve)
c.commonClient = newCommonClient(invitations, c.serve, c.String())
return c
}
func (c *dynamicClient) serve(stop chan struct{}) error {
func (c *dynamicClient) serve(ctx context.Context) error {
uri := *c.pooladdr
// Trim off the `dynamic+` prefix
@ -69,9 +70,9 @@ func (c *dynamicClient) serve(stop chan struct{}) error {
addrs = append(addrs, ruri.String())
}
for _, addr := range relayAddressesOrder(addrs, stop) {
for _, addr := range relayAddressesOrder(ctx, addrs) {
select {
case <-stop:
case <-ctx.Done():
l.Debugln(c, "stopping")
return nil
default:
@ -148,7 +149,7 @@ type dynamicAnnouncement struct {
// the closest 50ms, and puts them in buckets of 50ms latency ranges. Then
// shuffles each bucket, and returns all addresses starting with the ones from
// the lowest latency bucket, ending with the highest latency buceket.
func relayAddressesOrder(input []string, stop chan struct{}) []string {
func relayAddressesOrder(ctx context.Context, input []string) []string {
buckets := make(map[int][]string)
for _, relay := range input {
@ -162,7 +163,7 @@ func relayAddressesOrder(input []string, stop chan struct{}) []string {
buckets[id] = append(buckets[id], relay)
select {
case <-stop:
case <-ctx.Done():
return nil
default:
}

View File

@ -3,6 +3,7 @@
package client
import (
"context"
"crypto/tls"
"fmt"
"net"
@ -39,11 +40,11 @@ func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan pro
messageTimeout: time.Minute * 2,
connectTimeout: timeout,
}
c.commonClient = newCommonClient(invitations, c.serve)
c.commonClient = newCommonClient(invitations, c.serve, c.String())
return c
}
func (c *staticClient) serve(stop chan struct{}) error {
func (c *staticClient) serve(ctx context.Context) error {
if err := c.connect(); err != nil {
l.Infof("Could not connect to relay %s: %s", c.uri, err)
return err
@ -72,7 +73,7 @@ func (c *staticClient) serve(stop chan struct{}) error {
messages := make(chan interface{})
errors := make(chan error, 1)
go messageReader(c.conn, messages, errors, stop)
go messageReader(ctx, c.conn, messages, errors)
timeout := time.NewTimer(c.messageTimeout)
@ -106,7 +107,7 @@ func (c *staticClient) serve(stop chan struct{}) error {
return fmt.Errorf("protocol error: unexpected message %v", msg)
}
case <-stop:
case <-ctx.Done():
l.Debugln(c, "stopping")
return nil
@ -241,7 +242,7 @@ func performHandshakeAndValidation(conn *tls.Conn, uri *url.URL) error {
return nil
}
func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error, stop chan struct{}) {
func messageReader(ctx context.Context, conn net.Conn, messages chan<- interface{}, errors chan<- error) {
for {
msg, err := protocol.ReadMessage(conn)
if err != nil {
@ -250,7 +251,7 @@ func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- err
}
select {
case messages <- msg:
case <-stop:
case <-ctx.Done():
return
}
}

View File

@ -7,6 +7,7 @@
package stun
import (
"context"
"net"
"sync/atomic"
"time"
@ -104,7 +105,7 @@ func New(cfg config.Wrapper, subscriber Subscriber, conn net.PacketConn) (*Servi
natType: NATUnknown,
addr: nil,
}
s.Service = util.AsService(s.serve)
s.Service = util.AsService(s.serve, s.String())
return s, otherDataConn
}
@ -113,7 +114,7 @@ func (s *Service) Stop() {
s.Service.Stop()
}
func (s *Service) serve(stop chan struct{}) {
func (s *Service) serve(ctx context.Context) {
for {
disabled:
s.setNATType(NATUnknown)
@ -121,7 +122,7 @@ func (s *Service) serve(stop chan struct{}) {
if s.cfg.Options().IsStunDisabled() {
select {
case <-stop:
case <-ctx.Done():
return
case <-time.After(time.Second):
continue
@ -134,12 +135,12 @@ func (s *Service) serve(stop chan struct{}) {
// This blocks until we hit an exit condition or there are issues with the STUN server.
// This returns a boolean signifying if a different STUN server should be tried (oppose to the whole thing
// shutting down and this winding itself down.
if !s.runStunForServer(addr, stop) {
if !s.runStunForServer(ctx, addr) {
// Check exit conditions.
// Have we been asked to stop?
select {
case <-stop:
case <-ctx.Done():
return
default:
}
@ -165,13 +166,13 @@ func (s *Service) serve(stop chan struct{}) {
// Chillout for a while.
select {
case <-time.After(stunRetryInterval):
case <-stop:
case <-ctx.Done():
return
}
}
}
func (s *Service) runStunForServer(addr string, stop chan struct{}) (tryNext bool) {
func (s *Service) runStunForServer(ctx context.Context, addr string) (tryNext bool) {
l.Debugf("Running stun for %s via %s", s, addr)
// Resolve the address, so that in case the server advertises two
@ -209,10 +210,10 @@ func (s *Service) runStunForServer(addr string, stop chan struct{}) (tryNext boo
return false
}
return s.stunKeepAlive(addr, extAddr, stop)
return s.stunKeepAlive(ctx, addr, extAddr)
}
func (s *Service) stunKeepAlive(addr string, extAddr *Host, stop chan struct{}) (tryNext bool) {
func (s *Service) stunKeepAlive(ctx context.Context, addr string, extAddr *Host) (tryNext bool) {
var err error
nextSleep := time.Duration(s.cfg.Options().StunKeepaliveStartS) * time.Second
@ -255,7 +256,7 @@ func (s *Service) stunKeepAlive(addr string, extAddr *Host, stop chan struct{})
select {
case <-time.After(sleepFor):
case <-stop:
case <-ctx.Done():
l.Debugf("%s stopping, aborting stun", s)
return false
}

View File

@ -7,7 +7,9 @@
package syncthing
import (
"context"
"encoding/json"
"fmt"
"io"
"github.com/thejerf/suture"
@ -29,19 +31,19 @@ func newAuditService(w io.Writer, evLogger events.Logger) *auditService {
w: w,
sub: evLogger.Subscribe(events.AllEvents),
}
s.Service = util.AsService(s.serve)
s.Service = util.AsService(s.serve, s.String())
return s
}
// serve runs the audit service.
func (s *auditService) serve(stop chan struct{}) {
func (s *auditService) serve(ctx context.Context) {
enc := json.NewEncoder(s.w)
for {
select {
case ev := <-s.sub.C():
enc.Encode(ev)
case <-stop:
case <-ctx.Done():
return
}
}
@ -52,3 +54,7 @@ func (s *auditService) Stop() {
s.Service.Stop()
s.sub.Unsubscribe()
}
func (s *auditService) String() string {
return fmt.Sprintf("auditService@%p", s)
}

View File

@ -7,6 +7,7 @@
package syncthing
import (
"context"
"fmt"
"github.com/thejerf/suture"
@ -26,12 +27,12 @@ func newVerboseService(evLogger events.Logger) *verboseService {
s := &verboseService{
sub: evLogger.Subscribe(events.AllEvents),
}
s.Service = util.AsService(s.serve)
s.Service = util.AsService(s.serve, s.String())
return s
}
// serve runs the verbose logging service.
func (s *verboseService) serve(stop chan struct{}) {
func (s *verboseService) serve(ctx context.Context) {
for {
select {
case ev := <-s.sub.C():
@ -39,7 +40,7 @@ func (s *verboseService) serve(stop chan struct{}) {
if formatted != "" {
l.Verboseln(formatted)
}
case <-stop:
case <-ctx.Done():
return
}
}
@ -187,3 +188,7 @@ func (s *verboseService) formatEvent(ev events.Event) string {
return fmt.Sprintf("%s %#v", ev.Type, ev)
}
func (s *verboseService) String() string {
return fmt.Sprintf("verboseService@%p", s)
}

View File

@ -57,7 +57,7 @@ func New(cfg config.Wrapper, m model.Model, connectionsService connections.Servi
noUpgrade: noUpgrade,
forceRun: make(chan struct{}, 1), // Buffered to prevent locking
}
svc.Service = util.AsService(svc.serve)
svc.Service = util.AsService(svc.serve, svc.String())
cfg.Subscribe(svc)
return svc
}
@ -384,11 +384,11 @@ func (s *Service) sendUsageReport() error {
return err
}
func (s *Service) serve(stop chan struct{}) {
func (s *Service) serve(ctx context.Context) {
t := time.NewTimer(time.Duration(s.cfg.Options().URInitialDelayS) * time.Second)
for {
select {
case <-stop:
case <-ctx.Done():
return
case <-s.forceRun:
t.Reset(0)

View File

@ -7,10 +7,10 @@
package util
import (
"context"
"fmt"
"net/url"
"reflect"
"runtime"
"strconv"
"strings"
@ -178,11 +178,11 @@ func Address(network, host string) string {
// AsService wraps the given function to implement suture.Service by calling
// that function on serve and closing the passed channel when Stop is called.
func AsService(fn func(stop chan struct{})) suture.Service {
return asServiceWithError(func(stop chan struct{}) error {
fn(stop)
func AsService(fn func(ctx context.Context), creator string) suture.Service {
return asServiceWithError(func(ctx context.Context) error {
fn(ctx)
return nil
})
}, creator)
}
type ServiceWithError interface {
@ -194,25 +194,18 @@ type ServiceWithError interface {
// AsServiceWithError does the same as AsService, except that it keeps track
// of an error returned by the given function.
func AsServiceWithError(fn func(stop chan struct{}) error) ServiceWithError {
return asServiceWithError(fn)
func AsServiceWithError(fn func(ctx context.Context) error, creator string) ServiceWithError {
return asServiceWithError(fn, creator)
}
// caller retrieves information about the creator of the service, i.e. the stack
// two levels up from itself.
func caller() string {
pc := make([]uintptr, 1)
_ = runtime.Callers(4, pc)
f, _ := runtime.CallersFrames(pc).Next()
return f.Function
}
func asServiceWithError(fn func(stop chan struct{}) error) ServiceWithError {
func asServiceWithError(fn func(ctx context.Context) error, creator string) ServiceWithError {
ctx, cancel := context.WithCancel(context.Background())
s := &service{
caller: caller(),
serve: fn,
stop: make(chan struct{}),
ctx: ctx,
cancel: cancel,
stopped: make(chan struct{}),
creator: creator,
mut: sync.NewMutex(),
}
close(s.stopped) // not yet started, don't block on Stop()
@ -220,9 +213,10 @@ func asServiceWithError(fn func(stop chan struct{}) error) ServiceWithError {
}
type service struct {
caller string
serve func(stop chan struct{}) error
stop chan struct{}
creator string
serve func(ctx context.Context) error
ctx context.Context
cancel context.CancelFunc
stopped chan struct{}
err error
mut sync.Mutex
@ -231,7 +225,7 @@ type service struct {
func (s *service) Serve() {
s.mut.Lock()
select {
case <-s.stop:
case <-s.ctx.Done():
s.mut.Unlock()
return
default:
@ -247,16 +241,16 @@ func (s *service) Serve() {
close(s.stopped)
s.mut.Unlock()
}()
err = s.serve(s.stop)
err = s.serve(s.ctx)
}
func (s *service) Stop() {
s.mut.Lock()
select {
case <-s.stop:
case <-s.ctx.Done():
panic(fmt.Sprintf("Stop called more than once on %v", s))
default:
close(s.stop)
s.cancel()
}
s.mut.Unlock()
<-s.stopped
@ -275,5 +269,5 @@ func (s *service) SetError(err error) {
}
func (s *service) String() string {
return fmt.Sprintf("Service@%p created by %v", s, s.caller)
return fmt.Sprintf("Service@%p created by %v", s, s.creator)
}

View File

@ -7,6 +7,7 @@
package util
import (
"context"
"strings"
"testing"
)
@ -227,17 +228,17 @@ func TestCopyMatching(t *testing.T) {
}
func TestUtilStopTwicePanic(t *testing.T) {
s := AsService(func(stop chan struct{}) {
<-stop
})
name := "foo"
s := AsService(func(ctx context.Context) {
<-ctx.Done()
}, name)
go s.Serve()
s.Stop()
defer func() {
expected := "lib/util.TestUtilStopTwicePanic"
if r := recover(); r == nil || !strings.Contains(r.(string), expected) {
t.Fatalf(`expected panic containing "%v", got "%v"`, expected, r)
if r := recover(); r == nil || !strings.Contains(r.(string), name) {
t.Fatalf(`expected panic containing "%v", got "%v"`, name, r)
}
}()
s.Stop()

View File

@ -7,6 +7,8 @@
package versioner
import (
"context"
"fmt"
"sort"
"strconv"
"time"
@ -65,13 +67,13 @@ func NewStaggered(folderID string, folderFs fs.Filesystem, params map[string]str
},
mutex: sync.NewMutex(),
}
s.Service = util.AsService(s.serve)
s.Service = util.AsService(s.serve, s.String())
l.Debugf("instantiated %#v", s)
return s
}
func (v *Staggered) serve(stop chan struct{}) {
func (v *Staggered) serve(ctx context.Context) {
v.clean()
if v.testCleanDone != nil {
close(v.testCleanDone)
@ -83,7 +85,7 @@ func (v *Staggered) serve(stop chan struct{}) {
select {
case <-tck.C:
v.clean()
case <-stop:
case <-ctx.Done():
return
}
}
@ -230,3 +232,7 @@ func (v *Staggered) GetVersions() (map[string][]FileVersion, error) {
func (v *Staggered) Restore(filepath string, versionTime time.Time) error {
return restoreFile(v.versionsFs, v.folderFs, filepath, versionTime, TagFilename)
}
func (v *Staggered) String() string {
return fmt.Sprintf("Staggered/@%p", v)
}

View File

@ -7,6 +7,7 @@
package versioner
import (
"context"
"fmt"
"strconv"
"time"
@ -38,7 +39,7 @@ func NewTrashcan(folderID string, folderFs fs.Filesystem, params map[string]stri
versionsFs: fsFromParams(folderFs, params),
cleanoutDays: cleanoutDays,
}
s.Service = util.AsService(s.serve)
s.Service = util.AsService(s.serve, s.String())
l.Debugf("instantiated %#v", s)
return s
@ -52,7 +53,7 @@ func (t *Trashcan) Archive(filePath string) error {
})
}
func (t *Trashcan) serve(stop chan struct{}) {
func (t *Trashcan) serve(ctx context.Context) {
l.Debugln(t, "starting")
defer l.Debugln(t, "stopping")
@ -62,7 +63,7 @@ func (t *Trashcan) serve(stop chan struct{}) {
for {
select {
case <-stop:
case <-ctx.Done():
return
case <-timer.C:

View File

@ -109,7 +109,7 @@ type aggregator struct {
ctx context.Context
}
func newAggregator(folderCfg config.FolderConfiguration, ctx context.Context) *aggregator {
func newAggregator(ctx context.Context, folderCfg config.FolderConfiguration) *aggregator {
a := &aggregator{
folderID: folderCfg.ID,
folderCfgUpdate: make(chan config.FolderConfiguration),
@ -125,8 +125,8 @@ func newAggregator(folderCfg config.FolderConfiguration, ctx context.Context) *a
return a
}
func Aggregate(in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg config.Wrapper, evLogger events.Logger, ctx context.Context) {
a := newAggregator(folderCfg, ctx)
func Aggregate(ctx context.Context, in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg config.Wrapper, evLogger events.Logger) {
a := newAggregator(ctx, folderCfg)
// Necessary for unit tests where the backend is mocked
go a.mainLoop(in, out, cfg, evLogger)

View File

@ -67,7 +67,7 @@ func TestAggregate(t *testing.T) {
folderCfg.ID = "Aggregate"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := newAggregator(folderCfg, ctx)
a := newAggregator(ctx, folderCfg)
// checks whether maxFilesPerDir events in one dir are kept as is
for i := 0; i < maxFilesPerDir; i++ {
@ -95,7 +95,7 @@ func TestAggregate(t *testing.T) {
compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
// again test aggregation in "parent" but with event in subdirs
a = newAggregator(folderCfg, ctx)
a = newAggregator(ctx, folderCfg)
for i := 0; i < maxFilesPerDir; i++ {
a.newEvent(fs.Event{
Name: filepath.Join("parent", strconv.Itoa(i)),
@ -109,7 +109,7 @@ func TestAggregate(t *testing.T) {
compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
// test aggregation in root
a = newAggregator(folderCfg, ctx)
a = newAggregator(ctx, folderCfg)
for i := 0; i < maxFiles; i++ {
a.newEvent(fs.Event{
Name: strconv.Itoa(i),
@ -132,7 +132,7 @@ func TestAggregate(t *testing.T) {
}, inProgress)
compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
a = newAggregator(folderCfg, ctx)
a = newAggregator(ctx, folderCfg)
filesPerDir := maxFilesPerDir / 2
dirs := make([]string, maxFiles/filesPerDir+1)
for i := 0; i < maxFiles/filesPerDir+1; i++ {
@ -293,7 +293,7 @@ func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), e
folderCfg := defaultFolderCfg.Copy()
folderCfg.ID = name
a := newAggregator(folderCfg, ctx)
a := newAggregator(ctx, folderCfg)
a.notifyTimeout = testNotifyTimeout
startTime := time.Now()