diff --git a/lib/events/events.go b/lib/events/events.go index 1e364cb89..72ea6446f 100644 --- a/lib/events/events.go +++ b/lib/events/events.go @@ -220,7 +220,7 @@ type logger struct { nextGlobalID int timeout *time.Timer events chan Event - funcs chan func() + funcs chan func(context.Context) toUnsubscribe chan *subscription stop chan struct{} } @@ -246,6 +246,7 @@ type subscription struct { events chan Event toUnsubscribe chan *subscription timeout *time.Timer + ctx context.Context } var ( @@ -257,7 +258,7 @@ func NewLogger() Logger { l := &logger{ timeout: time.NewTimer(time.Second), events: make(chan Event, BufferSize), - funcs: make(chan func()), + funcs: make(chan func(context.Context)), toUnsubscribe: make(chan *subscription), } l.Service = util.AsService(l.serve, l.String()) @@ -279,7 +280,7 @@ loop: case fn := <-l.funcs: // Subscriptions are handled here. - fn() + fn(ctx) case s := <-l.toUnsubscribe: l.unsubscribe(s) @@ -339,7 +340,7 @@ func (l *logger) sendEvent(e Event) { func (l *logger) Subscribe(mask EventType) Subscription { res := make(chan Subscription) - l.funcs <- func() { + l.funcs <- func(ctx context.Context) { dl.Debugln("subscribe", mask) s := &subscription{ @@ -347,6 +348,7 @@ func (l *logger) Subscribe(mask EventType) Subscription { events: make(chan Event, BufferSize), toUnsubscribe: l.toUnsubscribe, timeout: time.NewTimer(0), + ctx: ctx, } // We need to create the timeout timer in the stopped, non-fired state so @@ -431,7 +433,10 @@ func (s *subscription) C() <-chan Event { } func (s *subscription) Unsubscribe() { - s.toUnsubscribe <- s + select { + case s.toUnsubscribe <- s: + case <-s.ctx.Done(): + } } type bufferedSubscription struct { diff --git a/lib/model/model_test.go b/lib/model/model_test.go index dae429bad..68cc7d5cd 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -1549,21 +1549,28 @@ func TestEmptyIgnores(t *testing.T) { } } -func waitForState(t *testing.T, m *model, folder, status string) { +func waitForState(t *testing.T, sub events.Subscription, folder, expected string) { t.Helper() - timeout := time.Now().Add(2 * time.Second) - var err error - for !time.Now().After(timeout) { - _, _, err = m.State(folder) - if err == nil && status == "" { - return + timeout := time.After(5 * time.Second) + var error string + for { + select { + case ev := <-sub.C(): + data := ev.Data.(map[string]interface{}) + if data["folder"].(string) == folder { + if data["error"] == nil { + error = "" + } else { + error = data["error"].(string) + } + if error == expected { + return + } + } + case <-timeout: + t.Fatalf("Timed out waiting for status: %s, current status: %v", expected, error) } - if err != nil && err.Error() == status { - return - } - time.Sleep(10 * time.Millisecond) } - t.Fatalf("Timed out waiting for status: %s, current status: %v", status, err) } func TestROScanRecovery(t *testing.T) { @@ -1594,27 +1601,29 @@ func TestROScanRecovery(t *testing.T) { testOs.RemoveAll(fcfg.Path) m := newModel(cfg, myID, "syncthing", "dev", ldb, nil) + sub := m.evLogger.Subscribe(events.StateChanged) + defer sub.Unsubscribe() m.ServeBackground() defer cleanupModel(m) - waitForState(t, m, "default", "folder path missing") + waitForState(t, sub, "default", "folder path missing") testOs.Mkdir(fcfg.Path, 0700) - waitForState(t, m, "default", "folder marker missing") + waitForState(t, sub, "default", "folder marker missing") fd := testOs.Create(filepath.Join(fcfg.Path, config.DefaultMarkerName)) fd.Close() - waitForState(t, m, "default", "") + waitForState(t, sub, "default", "") testOs.Remove(filepath.Join(fcfg.Path, config.DefaultMarkerName)) - waitForState(t, m, "default", "folder marker missing") + waitForState(t, sub, "default", "folder marker missing") testOs.Remove(fcfg.Path) - waitForState(t, m, "default", "folder path missing") + waitForState(t, sub, "default", "folder path missing") } func TestRWScanRecovery(t *testing.T) { @@ -1645,27 +1654,29 @@ func TestRWScanRecovery(t *testing.T) { testOs.RemoveAll(fcfg.Path) m := newModel(cfg, myID, "syncthing", "dev", ldb, nil) + sub := m.evLogger.Subscribe(events.StateChanged) + defer sub.Unsubscribe() m.ServeBackground() defer cleanupModel(m) - waitForState(t, m, "default", "folder path missing") + waitForState(t, sub, "default", "folder path missing") testOs.Mkdir(fcfg.Path, 0700) - waitForState(t, m, "default", "folder marker missing") + waitForState(t, sub, "default", "folder marker missing") fd := testOs.Create(filepath.Join(fcfg.Path, config.DefaultMarkerName)) fd.Close() - waitForState(t, m, "default", "") + waitForState(t, sub, "default", "") testOs.Remove(filepath.Join(fcfg.Path, config.DefaultMarkerName)) - waitForState(t, m, "default", "folder marker missing") + waitForState(t, sub, "default", "folder marker missing") testOs.Remove(fcfg.Path) - waitForState(t, m, "default", "folder path missing") + waitForState(t, sub, "default", "folder path missing") } func TestGlobalDirectoryTree(t *testing.T) { @@ -2739,16 +2750,18 @@ func TestCustomMarkerName(t *testing.T) { defer testOs.RemoveAll(fcfg.Path) m := newModel(cfg, myID, "syncthing", "dev", ldb, nil) + sub := m.evLogger.Subscribe(events.StateChanged) + defer sub.Unsubscribe() m.ServeBackground() defer cleanupModel(m) - waitForState(t, m, "default", "folder path missing") + waitForState(t, sub, "default", "folder path missing") testOs.Mkdir(fcfg.Path, 0700) fd := testOs.Create(filepath.Join(fcfg.Path, "myfile")) fd.Close() - waitForState(t, m, "default", "") + waitForState(t, sub, "default", "") } func TestRemoveDirWithContent(t *testing.T) {