lib/events, lib/model: Unflake test and prevent deadlock on event unsubscribing (#6261)

This commit is contained in:
Simon Frei 2020-01-11 08:14:05 +01:00 committed by Jakob Borg
parent 119d76d035
commit 71882765f2
2 changed files with 47 additions and 29 deletions

View File

@ -220,7 +220,7 @@ type logger struct {
nextGlobalID int nextGlobalID int
timeout *time.Timer timeout *time.Timer
events chan Event events chan Event
funcs chan func() funcs chan func(context.Context)
toUnsubscribe chan *subscription toUnsubscribe chan *subscription
stop chan struct{} stop chan struct{}
} }
@ -246,6 +246,7 @@ type subscription struct {
events chan Event events chan Event
toUnsubscribe chan *subscription toUnsubscribe chan *subscription
timeout *time.Timer timeout *time.Timer
ctx context.Context
} }
var ( var (
@ -257,7 +258,7 @@ func NewLogger() Logger {
l := &logger{ l := &logger{
timeout: time.NewTimer(time.Second), timeout: time.NewTimer(time.Second),
events: make(chan Event, BufferSize), events: make(chan Event, BufferSize),
funcs: make(chan func()), funcs: make(chan func(context.Context)),
toUnsubscribe: make(chan *subscription), toUnsubscribe: make(chan *subscription),
} }
l.Service = util.AsService(l.serve, l.String()) l.Service = util.AsService(l.serve, l.String())
@ -279,7 +280,7 @@ loop:
case fn := <-l.funcs: case fn := <-l.funcs:
// Subscriptions are handled here. // Subscriptions are handled here.
fn() fn(ctx)
case s := <-l.toUnsubscribe: case s := <-l.toUnsubscribe:
l.unsubscribe(s) l.unsubscribe(s)
@ -339,7 +340,7 @@ func (l *logger) sendEvent(e Event) {
func (l *logger) Subscribe(mask EventType) Subscription { func (l *logger) Subscribe(mask EventType) Subscription {
res := make(chan Subscription) res := make(chan Subscription)
l.funcs <- func() { l.funcs <- func(ctx context.Context) {
dl.Debugln("subscribe", mask) dl.Debugln("subscribe", mask)
s := &subscription{ s := &subscription{
@ -347,6 +348,7 @@ func (l *logger) Subscribe(mask EventType) Subscription {
events: make(chan Event, BufferSize), events: make(chan Event, BufferSize),
toUnsubscribe: l.toUnsubscribe, toUnsubscribe: l.toUnsubscribe,
timeout: time.NewTimer(0), timeout: time.NewTimer(0),
ctx: ctx,
} }
// We need to create the timeout timer in the stopped, non-fired state so // We need to create the timeout timer in the stopped, non-fired state so
@ -431,7 +433,10 @@ func (s *subscription) C() <-chan Event {
} }
func (s *subscription) Unsubscribe() { func (s *subscription) Unsubscribe() {
s.toUnsubscribe <- s select {
case s.toUnsubscribe <- s:
case <-s.ctx.Done():
}
} }
type bufferedSubscription struct { type bufferedSubscription struct {

View File

@ -1549,21 +1549,28 @@ func TestEmptyIgnores(t *testing.T) {
} }
} }
func waitForState(t *testing.T, m *model, folder, status string) { func waitForState(t *testing.T, sub events.Subscription, folder, expected string) {
t.Helper() t.Helper()
timeout := time.Now().Add(2 * time.Second) timeout := time.After(5 * time.Second)
var err error var error string
for !time.Now().After(timeout) { for {
_, _, err = m.State(folder) select {
if err == nil && status == "" { case ev := <-sub.C():
return data := ev.Data.(map[string]interface{})
if data["folder"].(string) == folder {
if data["error"] == nil {
error = ""
} else {
error = data["error"].(string)
}
if error == expected {
return
}
}
case <-timeout:
t.Fatalf("Timed out waiting for status: %s, current status: %v", expected, error)
} }
if err != nil && err.Error() == status {
return
}
time.Sleep(10 * time.Millisecond)
} }
t.Fatalf("Timed out waiting for status: %s, current status: %v", status, err)
} }
func TestROScanRecovery(t *testing.T) { func TestROScanRecovery(t *testing.T) {
@ -1594,27 +1601,29 @@ func TestROScanRecovery(t *testing.T) {
testOs.RemoveAll(fcfg.Path) testOs.RemoveAll(fcfg.Path)
m := newModel(cfg, myID, "syncthing", "dev", ldb, nil) m := newModel(cfg, myID, "syncthing", "dev", ldb, nil)
sub := m.evLogger.Subscribe(events.StateChanged)
defer sub.Unsubscribe()
m.ServeBackground() m.ServeBackground()
defer cleanupModel(m) defer cleanupModel(m)
waitForState(t, m, "default", "folder path missing") waitForState(t, sub, "default", "folder path missing")
testOs.Mkdir(fcfg.Path, 0700) testOs.Mkdir(fcfg.Path, 0700)
waitForState(t, m, "default", "folder marker missing") waitForState(t, sub, "default", "folder marker missing")
fd := testOs.Create(filepath.Join(fcfg.Path, config.DefaultMarkerName)) fd := testOs.Create(filepath.Join(fcfg.Path, config.DefaultMarkerName))
fd.Close() fd.Close()
waitForState(t, m, "default", "") waitForState(t, sub, "default", "")
testOs.Remove(filepath.Join(fcfg.Path, config.DefaultMarkerName)) testOs.Remove(filepath.Join(fcfg.Path, config.DefaultMarkerName))
waitForState(t, m, "default", "folder marker missing") waitForState(t, sub, "default", "folder marker missing")
testOs.Remove(fcfg.Path) testOs.Remove(fcfg.Path)
waitForState(t, m, "default", "folder path missing") waitForState(t, sub, "default", "folder path missing")
} }
func TestRWScanRecovery(t *testing.T) { func TestRWScanRecovery(t *testing.T) {
@ -1645,27 +1654,29 @@ func TestRWScanRecovery(t *testing.T) {
testOs.RemoveAll(fcfg.Path) testOs.RemoveAll(fcfg.Path)
m := newModel(cfg, myID, "syncthing", "dev", ldb, nil) m := newModel(cfg, myID, "syncthing", "dev", ldb, nil)
sub := m.evLogger.Subscribe(events.StateChanged)
defer sub.Unsubscribe()
m.ServeBackground() m.ServeBackground()
defer cleanupModel(m) defer cleanupModel(m)
waitForState(t, m, "default", "folder path missing") waitForState(t, sub, "default", "folder path missing")
testOs.Mkdir(fcfg.Path, 0700) testOs.Mkdir(fcfg.Path, 0700)
waitForState(t, m, "default", "folder marker missing") waitForState(t, sub, "default", "folder marker missing")
fd := testOs.Create(filepath.Join(fcfg.Path, config.DefaultMarkerName)) fd := testOs.Create(filepath.Join(fcfg.Path, config.DefaultMarkerName))
fd.Close() fd.Close()
waitForState(t, m, "default", "") waitForState(t, sub, "default", "")
testOs.Remove(filepath.Join(fcfg.Path, config.DefaultMarkerName)) testOs.Remove(filepath.Join(fcfg.Path, config.DefaultMarkerName))
waitForState(t, m, "default", "folder marker missing") waitForState(t, sub, "default", "folder marker missing")
testOs.Remove(fcfg.Path) testOs.Remove(fcfg.Path)
waitForState(t, m, "default", "folder path missing") waitForState(t, sub, "default", "folder path missing")
} }
func TestGlobalDirectoryTree(t *testing.T) { func TestGlobalDirectoryTree(t *testing.T) {
@ -2739,16 +2750,18 @@ func TestCustomMarkerName(t *testing.T) {
defer testOs.RemoveAll(fcfg.Path) defer testOs.RemoveAll(fcfg.Path)
m := newModel(cfg, myID, "syncthing", "dev", ldb, nil) m := newModel(cfg, myID, "syncthing", "dev", ldb, nil)
sub := m.evLogger.Subscribe(events.StateChanged)
defer sub.Unsubscribe()
m.ServeBackground() m.ServeBackground()
defer cleanupModel(m) defer cleanupModel(m)
waitForState(t, m, "default", "folder path missing") waitForState(t, sub, "default", "folder path missing")
testOs.Mkdir(fcfg.Path, 0700) testOs.Mkdir(fcfg.Path, 0700)
fd := testOs.Create(filepath.Join(fcfg.Path, "myfile")) fd := testOs.Create(filepath.Join(fcfg.Path, "myfile"))
fd.Close() fd.Close()
waitForState(t, m, "default", "") waitForState(t, sub, "default", "")
} }
func TestRemoveDirWithContent(t *testing.T) { func TestRemoveDirWithContent(t *testing.T) {