lib/versioner: Convert Staggered to a suture.Service (fixes #3820)

This commit is contained in:
Jakob Borg 2017-02-07 14:31:13 +01:00
parent c4ba580cbb
commit 73f9c7d174
2 changed files with 37 additions and 24 deletions

View File

@ -33,9 +33,10 @@ type Staggered struct {
folderPath string
interval [4]Interval
mutex sync.Mutex
}
var testCleanDone chan struct{}
stop chan struct{}
testCleanDone chan struct{}
}
func NewStaggered(folderID, folderPath string, params map[string]string) Versioner {
maxAge, err := strconv.ParseInt(params["maxAge"], 10, 0)
@ -57,7 +58,7 @@ func NewStaggered(folderID, folderPath string, params map[string]string) Version
versionsDir = params["versionsPath"]
}
s := Staggered{
s := &Staggered{
versionsPath: versionsDir,
cleanInterval: cleanInterval,
folderPath: folderPath,
@ -68,27 +69,36 @@ func NewStaggered(folderID, folderPath string, params map[string]string) Version
{604800, maxAge}, // next year -> 1 week between versions
},
mutex: sync.NewMutex(),
stop: make(chan struct{}),
}
l.Debugf("instantiated %#v", s)
go func() {
// TODO: This should be converted to a Serve() method.
s.clean()
if testCleanDone != nil {
close(testCleanDone)
}
tck := time.NewTicker(time.Duration(cleanInterval) * time.Second)
defer tck.Stop()
for range tck.C {
s.clean()
}
}()
return s
}
func (v Staggered) clean() {
func (v *Staggered) Serve() {
v.clean()
if v.testCleanDone != nil {
close(v.testCleanDone)
}
tck := time.NewTicker(time.Duration(v.cleanInterval) * time.Second)
defer tck.Stop()
for {
select {
case <-tck.C:
v.clean()
case <-v.stop:
return
}
}
}
func (v *Staggered) Stop() {
close(v.stop)
}
func (v *Staggered) clean() {
l.Debugln("Versioner clean: Waiting for lock on", v.versionsPath)
v.mutex.Lock()
defer v.mutex.Unlock()
@ -157,7 +167,7 @@ func (v Staggered) clean() {
l.Debugln("Cleaner: Finished cleaning", v.versionsPath)
}
func (v Staggered) expire(versions []string) {
func (v *Staggered) expire(versions []string) {
l.Debugln("Versioner: Expiring versions", versions)
for _, file := range v.toRemove(versions, time.Now()) {
if fi, err := osutil.Lstat(file); err != nil {
@ -174,7 +184,7 @@ func (v Staggered) expire(versions []string) {
}
}
func (v Staggered) toRemove(versions []string, now time.Time) []string {
func (v *Staggered) toRemove(versions []string, now time.Time) []string {
var prevAge int64
firstFile := true
var remove []string
@ -226,7 +236,7 @@ func (v Staggered) toRemove(versions []string, now time.Time) []string {
// Archive moves the named file away to a version archive. If this function
// returns nil, the named file does not exist any more (has been archived).
func (v Staggered) Archive(filePath string) error {
func (v *Staggered) Archive(filePath string) error {
l.Debugln("Waiting for lock on ", v.versionsPath)
v.mutex.Lock()
defer v.mutex.Unlock()

View File

@ -62,9 +62,12 @@ func TestStaggeredVersioningVersionCount(t *testing.T) {
os.MkdirAll("testdata/.stversions", 0755)
defer os.RemoveAll("testdata")
testCleanDone = make(chan struct{})
v := NewStaggered("", "testdata", map[string]string{"maxAge": strconv.Itoa(365 * 86400)}).(Staggered)
<-testCleanDone
v := NewStaggered("", "testdata", map[string]string{"maxAge": strconv.Itoa(365 * 86400)}).(*Staggered)
v.testCleanDone = make(chan struct{})
defer v.Stop()
go v.Serve()
<-v.testCleanDone
rem := v.toRemove(files, now)
if diff, equal := messagediff.PrettyDiff(delete, rem); !equal {